Skip to content

Commit 4bc723f

Browse files
Merge 1c1b382 into 7b891d8
2 parents 7b891d8 + 1c1b382 commit 4bc723f

File tree

12 files changed

+164
-58
lines changed

12 files changed

+164
-58
lines changed

ydb/core/protos/data_events.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ message TEvWrite {
9595
repeated uint32 ColumnIds = 3 [packed = true];
9696
optional uint64 PayloadIndex = 4;
9797
optional EDataFormat PayloadFormat = 5;
98+
optional string PayloadSchema = 6;
9899
}
99100

100101
// Transaction operations

ydb/core/protos/feature_flags.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,4 +162,5 @@ message TFeatureFlags {
162162
optional bool EnableExternalDataSourcesOnServerless = 143 [default = true];
163163
optional bool EnableSparsedColumns = 144 [default = false];
164164
optional bool EnableParameterizedDecimal = 145 [default = false];
165+
optional bool EnableImmediateWritingOnBulkUpsert = 146 [default = false];
165166
}

ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ void TTxWrite::Complete(const TActorContext& ctx) {
163163
Self->Counters.GetCSCounters().OnSuccessWriteResponse();
164164
}
165165
Self->Counters.GetTabletCounters()->IncCounter(COUNTER_IMMEDIATE_TX_COMPLETED);
166+
Self->SetupIndexation();
166167
}
167168

168169
} // namespace NKikimr::NColumnShard

ydb/core/tx/columnshard/operations/write.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ void TWriteOperation::Start(TColumnShard& owner, const ui64 tableId, const NEvWr
3535
NEvWrite::TWriteData(writeMeta, data, owner.TablesManager.GetPrimaryIndex()->GetReplaceKey(),
3636
owner.StoragesManager->GetInsertOperator()->StartWritingAction(NOlap::NBlobOperations::EConsumer::WRITING_OPERATOR)),
3737
schema, owner.GetLastTxSnapshot(), owner.Counters.GetCSCounters().WritingCounters);
38-
NConveyor::TCompServiceOperator::SendTaskToExecute(task);
38+
NConveyor::TInsertServiceOperator::AsyncTaskToExecute(task);
3939

4040
Status = EOperationStatus::Started;
4141
}

ydb/core/tx/columnshard/operations/write_data.cpp

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,26 +12,43 @@ bool TArrowData::Parse(const NKikimrDataEvents::TEvWrite_TOperation& proto, cons
1212
}
1313
IncomingData = payload.GetDataFromPayload(proto.GetPayloadIndex());
1414
if (proto.HasType()) {
15-
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "invalid_modification_type")("proto", proto.DebugString());
1615
auto type = TEnumOperator<NEvWrite::EModificationType>::DeserializeFromProto(proto.GetType());
1716
if (!type) {
17+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "invalid_modification_type")("proto", proto.DebugString());
1818
return false;
1919
}
2020
ModificationType = *type;
2121
}
2222

23-
std::vector<ui32> columns;
24-
for (auto&& columnId : proto.GetColumnIds()) {
25-
columns.emplace_back(columnId);
23+
if (proto.HasPayloadSchema()) {
24+
PayloadSchema = NArrow::DeserializeSchema(proto.GetPayloadSchema());
25+
} else {
26+
std::vector<ui32> columns;
27+
for (auto&& columnId : proto.GetColumnIds()) {
28+
columns.emplace_back(columnId);
29+
}
30+
if (columns.empty()) {
31+
BatchSchema = IndexSchema;
32+
} else {
33+
BatchSchema = std::make_shared<NOlap::TFilteredSnapshotSchema>(IndexSchema, columns);
34+
}
35+
if (BatchSchema->GetColumnsCount() != columns.size()) {
36+
return false;
37+
}
2638
}
27-
BatchSchema = std::make_shared<NOlap::TFilteredSnapshotSchema>(IndexSchema, columns);
2839
OriginalDataSize = IncomingData.size();
29-
return BatchSchema->GetColumnsCount() == columns.size() && !IncomingData.empty();
40+
return !!IncomingData;
3041
}
3142

3243
TConclusion<std::shared_ptr<arrow::RecordBatch>> TArrowData::ExtractBatch() {
3344
Y_ABORT_UNLESS(!!IncomingData);
34-
auto result = NArrow::DeserializeBatch(IncomingData, std::make_shared<arrow::Schema>(BatchSchema->GetSchema()->fields()));
45+
std::shared_ptr<arrow::RecordBatch> result;
46+
if (PayloadSchema) {
47+
result = NArrow::DeserializeBatch(IncomingData, PayloadSchema);
48+
} else {
49+
result = NArrow::DeserializeBatch(IncomingData, std::make_shared<arrow::Schema>(BatchSchema->GetSchema()->fields()));
50+
}
51+
3552
IncomingData = "";
3653
return result;
3754
}

ydb/core/tx/columnshard/operations/write_data.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ class TArrowData : public NEvWrite::IDataContainer {
3030
private:
3131
NOlap::ISnapshotSchema::TPtr IndexSchema;
3232
NOlap::ISnapshotSchema::TPtr BatchSchema;
33+
std::shared_ptr<arrow::Schema> PayloadSchema;
3334
TString IncomingData;
3435
NEvWrite::EModificationType ModificationType = NEvWrite::EModificationType::Upsert;
3536
};

ydb/core/tx/data_events/columnshard_splitter.h

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
11
#pragma once
22

3+
#include "events.h"
34
#include "shards_splitter.h"
5+
#include "payload_helper.h"
46

57
#include <ydb/core/tx/sharding/sharding.h>
68
#include <ydb/core/tx/columnshard/columnshard.h>
79
#include <ydb/core/formats/arrow/size_calcer.h>
810
#include <ydb/core/formats/arrow/arrow_helpers.h>
911
#include <ydb/core/scheme/scheme_types_proto.h>
1012

11-
1213
namespace NKikimr::NEvWrite {
1314

14-
class TColumnShardShardsSplitter : public IShardsSplitter {
15-
class TShardInfo : public IShardInfo {
15+
class TColumnShardShardsSplitter: public IShardsSplitter {
16+
class TShardInfo: public IShardInfo {
1617
private:
1718
const TString SchemaData;
1819
const TString Data;
@@ -23,25 +24,38 @@ class TColumnShardShardsSplitter : public IShardsSplitter {
2324
: SchemaData(schemaData)
2425
, Data(data)
2526
, RowsCount(rowsCount)
26-
, GranuleShardingVersion(granuleShardingVersion)
27-
{}
27+
, GranuleShardingVersion(granuleShardingVersion) {
28+
}
2829

29-
ui64 GetBytes() const override {
30+
virtual ui64 GetBytes() const override {
3031
return Data.size();
3132
}
3233

33-
ui32 GetRowsCount() const override {
34+
virtual ui32 GetRowsCount() const override {
3435
return RowsCount;
3536
}
3637

37-
const TString& GetData() const override {
38+
virtual const TString& GetData() const override {
3839
return Data;
3940
}
4041

41-
void Serialize(TEvWrite& evWrite) const override {
42+
virtual void Serialize(TEvColumnShard::TEvWrite& evWrite) const override {
4243
evWrite.SetArrowData(SchemaData, Data);
4344
evWrite.Record.SetGranuleShardingVersion(GranuleShardingVersion);
4445
}
46+
virtual void Serialize(NEvents::TDataEvents::TEvWrite& evWrite, const ui64 tableId, const ui64 schemaVersion) const override {
47+
TPayloadWriter<NEvents::TDataEvents::TEvWrite> writer(evWrite);
48+
TString data = Data;
49+
writer.AddDataToPayload(std::move(data));
50+
51+
auto* operation = evWrite.Record.AddOperations();
52+
operation->SetPayloadSchema(SchemaData);
53+
operation->SetType(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE);
54+
operation->SetPayloadFormat(NKikimrDataEvents::FORMAT_ARROW);
55+
operation->SetPayloadIndex(0);
56+
operation->MutableTableId()->SetTableId(tableId);
57+
operation->MutableTableId()->SetSchemaVersion(schemaVersion);
58+
}
4559
};
4660

4761
private:

ydb/core/tx/data_events/shard_writer.cpp

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@
77

88
namespace NKikimr::NEvWrite {
99

10-
TWritersController::TWritersController(const ui32 writesCount, const NActors::TActorIdentity& longTxActorId, const NLongTxService::TLongTxId& longTxId)
10+
TWritersController::TWritersController(const ui32 writesCount, const NActors::TActorIdentity& longTxActorId, const NLongTxService::TLongTxId& longTxId, const bool immediateWrite)
1111
: WritesCount(writesCount)
1212
, LongTxActorId(longTxActorId)
13+
, ImmediateWrite(immediateWrite)
1314
, LongTxId(longTxId)
1415
{
1516
Y_ABORT_UNLESS(writesCount);
@@ -39,28 +40,62 @@ namespace NKikimr::NEvWrite {
3940
}
4041
}
4142

42-
TShardWriter::TShardWriter(const ui64 shardId, const ui64 tableId, const TString& dedupId, const IShardInfo::TPtr& data,
43-
const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController, const ui32 writePartIdx, const EModificationType mType)
43+
TShardWriter::TShardWriter(const ui64 shardId, const ui64 tableId, const ui64 schemaVersion, const TString& dedupId, const IShardInfo::TPtr& data,
44+
const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController, const ui32 writePartIdx, const EModificationType mType, const bool immediateWrite)
4445
: ShardId(shardId)
4546
, WritePartIdx(writePartIdx)
4647
, TableId(tableId)
48+
, SchemaVersion(schemaVersion)
4749
, DedupId(dedupId)
4850
, DataForShard(data)
4951
, ExternalController(externalController)
5052
, LeaderPipeCache(MakePipePerNodeCacheID(false))
5153
, ActorSpan(parentSpan.BuildChildrenSpan("ShardWriter"))
5254
, ModificationType(mType)
55+
, ImmediateWrite(immediateWrite)
5356
{
5457
}
5558

59+
void TShardWriter::SendWriteRequest() {
60+
if (ImmediateWrite) {
61+
auto ev = MakeHolder<NEvents::TDataEvents::TEvWrite>(NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
62+
DataForShard->Serialize(*ev, TableId, SchemaVersion);
63+
SendToTablet(std::move(ev));
64+
} else {
65+
auto ev = MakeHolder<TEvColumnShard::TEvWrite>(SelfId(), ExternalController->GetLongTxId(), TableId, DedupId, "", WritePartIdx, ModificationType);
66+
DataForShard->Serialize(*ev);
67+
SendToTablet(std::move(ev));
68+
}
69+
}
70+
5671
void TShardWriter::Bootstrap() {
57-
auto ev = MakeHolder<TEvWrite>(SelfId(), ExternalController->GetLongTxId(), TableId, DedupId, "", WritePartIdx, ModificationType);
58-
DataForShard->Serialize(*ev);
59-
SendToTablet(std::move(ev));
72+
SendWriteRequest();
6073
Become(&TShardWriter::StateMain);
6174
}
6275

63-
void TShardWriter::Handle(TEvWriteResult::TPtr& ev) {
76+
void TShardWriter::Handle(NEvents::TDataEvents::TEvWriteResult::TPtr& ev) {
77+
const auto* msg = ev->Get();
78+
Y_ABORT_UNLESS(msg->Record.GetOrigin() == ShardId);
79+
80+
const auto ydbStatus = msg->GetStatus();
81+
if (ydbStatus == NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED) {
82+
if (RetryWriteRequest(true)) {
83+
return;
84+
}
85+
}
86+
87+
auto gPassAway = PassAwayGuard();
88+
if (ydbStatus != NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED) {
89+
ExternalController->OnFail(Ydb::StatusIds::INTERNAL_ERROR,
90+
TStringBuilder() << "Cannot write data into shard " << ShardId << " in longTx " <<
91+
ExternalController->GetLongTxId().ToString());
92+
return;
93+
}
94+
95+
ExternalController->OnSuccess(ShardId, 0, WritePartIdx);
96+
}
97+
98+
void TShardWriter::Handle(TEvColumnShard::TEvWriteResult::TPtr& ev) {
6499
const auto* msg = ev->Get();
65100
Y_ABORT_UNLESS(msg->Record.GetOrigin() == ShardId);
66101

@@ -113,9 +148,7 @@ namespace NKikimr::NEvWrite {
113148
Schedule(OverloadTimeout(), new TEvents::TEvWakeup());
114149
} else {
115150
++NumRetries;
116-
auto ev = MakeHolder<TEvWrite>(SelfId(), ExternalController->GetLongTxId(), TableId, DedupId, "", WritePartIdx, ModificationType);
117-
DataForShard->Serialize(*ev);
118-
SendToTablet(std::move(ev));
151+
SendWriteRequest();
119152
}
120153
return true;
121154
}

ydb/core/tx/data_events/shard_writer.h

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
#pragma once
22

3-
#include "shards_splitter.h"
43
#include "common/modification_type.h"
4+
#include "events.h"
5+
#include "shards_splitter.h"
56

67
#include <ydb/library/accessor/accessor.h>
78
#include <ydb/core/base/tablet_pipecache.h>
@@ -89,13 +90,17 @@ class TWritersController {
8990
NActors::TActorIdentity LongTxActorId;
9091
std::vector<TWriteIdForShard> WriteIds;
9192
const TMonotonic StartInstant = TMonotonic::Now();
93+
const bool ImmediateWrite = false;
9294
YDB_READONLY_DEF(NLongTxService::TLongTxId, LongTxId);
9395
YDB_READONLY(std::shared_ptr<TCSUploadCounters>, Counters, std::make_shared<TCSUploadCounters>());
9496
void SendReply() {
9597
if (FailsCount.Val()) {
9698
Counters->OnFailedFullReply(TMonotonic::Now() - StartInstant);
9799
AFL_VERIFY(Code);
98100
LongTxActorId.Send(LongTxActorId, new TEvPrivate::TEvShardsWriteResult(*Code, Issues));
101+
} else if (ImmediateWrite) {
102+
Counters->OnSucceedFullReply(TMonotonic::Now() - StartInstant);
103+
LongTxActorId.Send(LongTxActorId, new TEvPrivate::TEvShardsWriteResult(Ydb::StatusIds::SUCCESS));
99104
} else {
100105
Counters->OnSucceedFullReply(TMonotonic::Now() - StartInstant);
101106
auto req = MakeHolder<NLongTxService::TEvLongTxService::TEvAttachColumnShardWrites>(LongTxId);
@@ -129,7 +134,7 @@ class TWritersController {
129134

130135
};
131136

132-
TWritersController(const ui32 writesCount, const NActors::TActorIdentity& longTxActorId, const NLongTxService::TLongTxId& longTxId);
137+
TWritersController(const ui32 writesCount, const NActors::TActorIdentity& longTxActorId, const NLongTxService::TLongTxId& longTxId, const bool immediateWrite);
133138
void OnSuccess(const ui64 shardId, const ui64 writeId, const ui32 writePartId);
134139
void OnFail(const Ydb::StatusIds::StatusCode code, const TString& message);
135140
};
@@ -144,14 +149,17 @@ class TShardWriter: public NActors::TActorBootstrapped<TShardWriter> {
144149
const ui64 ShardId;
145150
const ui64 WritePartIdx;
146151
const ui64 TableId;
152+
const ui64 SchemaVersion;
147153
const TString DedupId;
148154
const IShardInfo::TPtr DataForShard;
149155
ui32 NumRetries = 0;
150156
TWritersController::TPtr ExternalController;
151157
const TActorId LeaderPipeCache;
152158
NWilson::TProfileSpan ActorSpan;
153159
EModificationType ModificationType;
160+
const bool ImmediateWrite = false;
154161

162+
void SendWriteRequest();
155163
static TDuration OverloadTimeout() {
156164
return TDuration::MilliSeconds(OverloadedDelayMs);
157165
}
@@ -164,21 +172,24 @@ class TShardWriter: public NActors::TActorBootstrapped<TShardWriter> {
164172
TBase::PassAway();
165173
}
166174
public:
167-
TShardWriter(const ui64 shardId, const ui64 tableId, const TString& dedupId, const IShardInfo::TPtr& data,
168-
const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController, const ui32 writePartIdx, const EModificationType mType);
175+
TShardWriter(const ui64 shardId, const ui64 tableId, const ui64 schemaVersion, const TString& dedupId, const IShardInfo::TPtr& data,
176+
const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController, const ui32 writePartIdx,
177+
const EModificationType mType, const bool immediateWrite);
169178

170179
STFUNC(StateMain) {
171180
switch (ev->GetTypeRewrite()) {
172-
hFunc(TEvWriteResult, Handle);
181+
hFunc(TEvColumnShard::TEvWriteResult, Handle);
173182
hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
183+
hFunc(NEvents::TDataEvents::TEvWriteResult, Handle);
174184
CFunc(TEvents::TSystem::Wakeup, HandleTimeout);
175185
}
176186
}
177187

178188
void Bootstrap();
179189

180-
void Handle(TEvWriteResult::TPtr& ev);
190+
void Handle(TEvColumnShard::TEvWriteResult::TPtr& ev);
181191
void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev);
192+
void Handle(NEvents::TDataEvents::TEvWriteResult::TPtr& ev);
182193
void HandleTimeout(const TActorContext& ctx);
183194
private:
184195
bool RetryWriteRequest(const bool delayed = true);

ydb/core/tx/data_events/shards_splitter.h

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,6 @@
1111

1212
namespace NKikimr::NEvWrite {
1313

14-
using TEvWrite = TEvColumnShard::TEvWrite;
15-
using TEvWriteResult = TEvColumnShard::TEvWriteResult;
16-
1714
class IShardsSplitter {
1815
public:
1916
using TPtr = std::shared_ptr<IShardsSplitter>;
@@ -43,7 +40,8 @@ class IShardsSplitter {
4340
using TPtr = std::shared_ptr<IShardInfo>;
4441
virtual ~IShardInfo() {}
4542

46-
virtual void Serialize(TEvWrite& evWrite) const = 0;
43+
virtual void Serialize(TEvColumnShard::TEvWrite& evWrite) const = 0;
44+
virtual void Serialize(NEvents::TDataEvents::TEvWrite& evWrite, const ui64 tableId, const ui64 schemaVersion) const = 0;
4745
virtual ui64 GetBytes() const = 0;
4846
virtual ui32 GetRowsCount() const = 0;
4947
virtual const TString& GetData() const = 0;
@@ -68,13 +66,21 @@ class IShardsSplitter {
6866

6967
TYdbConclusionStatus SplitData(const NSchemeCache::TSchemeCacheNavigate::TEntry& schemeEntry, const IEvWriteDataAccessor& data) {
7068
TableId = schemeEntry.TableId.PathId.LocalPathId;
69+
AFL_VERIFY(schemeEntry.ColumnTableInfo);
70+
AFL_VERIFY(schemeEntry.ColumnTableInfo->Description.HasSchema());
71+
SchemaVersion = schemeEntry.ColumnTableInfo->Description.GetSchema().GetVersion();
72+
AFL_VERIFY(SchemaVersion);
7173
return DoSplitData(schemeEntry, data);
7274
}
7375

7476
ui64 GetTableId() const {
7577
return TableId;
7678
}
7779

80+
ui64 GetSchemaVersion() const {
81+
return SchemaVersion;
82+
}
83+
7884
const TFullSplitData& GetSplitData() const {
7985
Y_ABORT_UNLESS(FullSplitData);
8086
return *FullSplitData;
@@ -86,6 +92,7 @@ class IShardsSplitter {
8692
virtual TYdbConclusionStatus DoSplitData(const NSchemeCache::TSchemeCacheNavigate::TEntry& schemeEntry, const IEvWriteDataAccessor& data) = 0;
8793

8894
ui64 TableId = 0;
95+
ui64 SchemaVersion = 0;
8996
protected:
9097
std::optional<TFullSplitData> FullSplitData;
9198
};

0 commit comments

Comments
 (0)