Skip to content

Commit 076a801

Browse files
immediate write for bulk upsert (#9489)
1 parent 51770b3 commit 076a801

26 files changed

+282
-130
lines changed

ydb/core/kqp/ut/common/kqp_ut_common.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ TKikimrRunner::TKikimrRunner(const TKikimrSettings& settings) {
123123
appConfig.MutableTableServiceConfig()->SetEnableRowsDuplicationCheck(true);
124124
ServerSettings->SetAppConfig(appConfig);
125125
ServerSettings->SetFeatureFlags(settings.FeatureFlags);
126+
ServerSettings->FeatureFlags.SetEnableImmediateWritingOnBulkUpsert(true);
126127
ServerSettings->SetNodeCount(settings.NodeCount);
127128
ServerSettings->SetEnableKqpSpilling(enableSpilling);
128129
ServerSettings->SetEnableDataColumnForIndexTable(true);

ydb/core/kqp/ut/olap/indexes_ut.cpp

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
8080

8181
{
8282
auto alterQuery = TStringBuilder() <<
83-
"ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, SCHEME_NEED_ACTUALIZATION=`true`, EXTERNAL_GUARANTEE_EXCLUSIVE_PK=`true`);";
83+
"ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, SCHEME_NEED_ACTUALIZATION=`true`);";
8484
auto session = tableClient.CreateSession().GetValueSync().GetSession();
8585
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
8686
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
@@ -336,13 +336,6 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
336336
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
337337
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
338338
}
339-
{
340-
auto alterQuery = TStringBuilder() <<
341-
"ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, EXTERNAL_GUARANTEE_EXCLUSIVE_PK=`true`);";
342-
auto session = tableClient.CreateSession().GetValueSync().GetSession();
343-
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
344-
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
345-
}
346339

347340
std::vector<TString> uids;
348341
std::vector<TString> resourceIds;

ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8172,7 +8172,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
81728172
csController->WaitCompactions(TDuration::Seconds(5));
81738173
}
81748174

8175-
testHelper.ReadData("SELECT value FROM `/Root/ColumnTableTest`", "[[#];[#];[[42u]];[[43u]]]");
8175+
testHelper.ReadData("SELECT value FROM `/Root/ColumnTableTest` ORDER BY value", "[[#];[#];[[42u]];[[43u]]]");
81768176
}
81778177

81788178
Y_UNIT_TEST(DropThenAddColumn) {

ydb/core/protos/data_events.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ message TEvWrite {
9595
repeated uint32 ColumnIds = 3 [packed = true];
9696
optional uint64 PayloadIndex = 4;
9797
optional EDataFormat PayloadFormat = 5;
98+
optional string PayloadSchema = 6;
9899
}
99100

100101
// Transaction operations

ydb/core/protos/feature_flags.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,4 +162,5 @@ message TFeatureFlags {
162162
optional bool EnableExternalDataSourcesOnServerless = 143 [default = true];
163163
optional bool EnableSparsedColumns = 144 [default = false];
164164
optional bool EnableParameterizedDecimal = 145 [default = false];
165+
optional bool EnableImmediateWritingOnBulkUpsert = 146 [default = false];
165166
}

ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -6,28 +6,11 @@
66
namespace NKikimr::NColumnShard {
77

88
bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TInsertWriteId writeId) {
9-
NKikimrTxColumnShard::TLogicalMetadata meta;
10-
meta.SetNumRows(batch->GetRowsCount());
11-
meta.SetRawBytes(batch->GetRawBytes());
12-
meta.SetDirtyWriteTimeSeconds(batch.GetStartInstant().Seconds());
13-
meta.SetSpecialKeysRawData(batch->GetSpecialKeysFullSafe());
14-
meta.SetSpecialKeysPayloadData(batch->GetSpecialKeysPayloadSafe());
15-
16-
const auto& blobRange = batch.GetRange();
17-
Y_ABORT_UNLESS(blobRange.GetBlobId().IsValid());
9+
auto userData = batch.BuildInsertionUserData(*Self);
10+
NOlap::TInsertedData insertData(writeId, userData);
1811

19-
// First write wins
2012
TBlobGroupSelector dsGroupSelector(Self->Info());
2113
NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
22-
23-
const auto& writeMeta = batch.GetAggregation().GetWriteMeta();
24-
meta.SetModificationType(TEnumOperator<NEvWrite::EModificationType>::SerializeToProto(writeMeta.GetModificationType()));
25-
*meta.MutableSchemaSubset() = batch.GetAggregation().GetSchemaSubset().SerializeToProto();
26-
auto schemeVersion = batch.GetAggregation().GetSchemaVersion();
27-
auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaVerified(schemeVersion);
28-
29-
auto userData = std::make_shared<NOlap::TUserData>(writeMeta.GetTableId(), blobRange, meta, tableSchema->GetVersion(), batch->GetData());
30-
NOlap::TInsertedData insertData(writeId, userData);
3114
bool ok = Self->InsertTable->Insert(dbTable, std::move(insertData));
3215
if (ok) {
3316
Self->UpdateInsertTableCounters();
@@ -36,6 +19,18 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSeriali
3619
return false;
3720
}
3821

22+
bool TTxWrite::CommitOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TInsertWriteId writeId) {
23+
auto userData = batch.BuildInsertionUserData(*Self);
24+
TBlobGroupSelector dsGroupSelector(Self->Info());
25+
NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
26+
NOlap::TCommittedData commitData(userData, Self->GetLastPlannedSnapshot(), Self->Generation(), writeId);
27+
if (Self->TablesManager.HasTable(userData->GetPathId())) {
28+
Self->InsertTable->CommitEphemeral(dbTable, std::move(commitData));
29+
}
30+
Self->UpdateInsertTableCounters();
31+
return true;
32+
}
33+
3934
bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
4035
TMemoryProfileGuard mpg("TTxWrite::Execute");
4136
NActors::TLogContextGuard logGuard =
@@ -65,10 +60,17 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
6560
operation = Self->OperationsManager->GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId());
6661
Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started);
6762
for (auto&& i : aggr->GetSplittedBlobs()) {
68-
const TInsertWriteId insertWriteId = Self->InsertTable->BuildNextWriteId(txc);
69-
aggr->AddInsertWriteId(insertWriteId);
70-
AFL_VERIFY(InsertOneBlob(txc, i, insertWriteId))("write_id", writeMeta.GetWriteId())("insert_write_id", insertWriteId)(
71-
"size", aggr->GetSplittedBlobs().size());
63+
if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
64+
static TAtomicCounter Counter = 0;
65+
const TInsertWriteId insertWriteId = (TInsertWriteId)Counter.Inc();
66+
AFL_VERIFY(CommitOneBlob(txc, i, insertWriteId))("write_id", writeMeta.GetWriteId())("insert_write_id", insertWriteId)(
67+
"size", aggr->GetSplittedBlobs().size());
68+
} else {
69+
const TInsertWriteId insertWriteId = Self->InsertTable->BuildNextWriteId(txc);
70+
aggr->AddInsertWriteId(insertWriteId);
71+
AFL_VERIFY(InsertOneBlob(txc, i, insertWriteId))("write_id", writeMeta.GetWriteId())("insert_write_id", insertWriteId)(
72+
"size", aggr->GetSplittedBlobs().size());
73+
}
7274
}
7375
}
7476
}
@@ -92,8 +94,6 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
9294
if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
9395
auto ev = NEvents::TDataEvents::TEvWriteResult::BuildCompleted(Self->TabletID());
9496
Results.emplace_back(std::move(ev), writeMeta.GetSource(), operation->GetCookie());
95-
Self->OperationsManager->AddTemporaryTxLink(operation->GetLockId());
96-
Self->OperationsManager->CommitTransactionOnExecute(*Self, operation->GetLockId(), txc, Self->GetLastTxSnapshot());
9797
} else if (operation->GetBehaviour() == EOperationBehaviour::InTxWrite) {
9898
NKikimrTxColumnShard::TCommitWriteTxBody proto;
9999
proto.SetLockId(operation->GetLockId());
@@ -156,13 +156,15 @@ void TTxWrite::Complete(const TActorContext& ctx) {
156156
Self->GetOperationsManager().AddEventForLock(*Self, op->GetLockId(), evWrite);
157157
}
158158
if (op->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
159+
Self->OperationsManager->AddTemporaryTxLink(op->GetLockId());
159160
Self->OperationsManager->CommitTransactionOnComplete(*Self, op->GetLockId(), Self->GetLastTxSnapshot());
160161
}
161162
}
162163
Self->Counters.GetCSCounters().OnWriteTxComplete(now - writeMeta.GetWriteStartInstant());
163164
Self->Counters.GetCSCounters().OnSuccessWriteResponse();
164165
}
165166
Self->Counters.GetTabletCounters()->IncCounter(COUNTER_IMMEDIATE_TX_COMPLETED);
167+
Self->SetupIndexation();
166168
}
167169

168170
} // namespace NKikimr::NColumnShard

ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ class TTxWrite : public NTabletFlatExecutor::TTransactionBase<TColumnShard> {
2020
TEvPrivate::TEvWriteBlobsResult::TPtr PutBlobResult;
2121
const ui32 TabletTxNo;
2222

23+
bool CommitOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TInsertWriteId writeId);
24+
bool InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TInsertWriteId writeId);
25+
2326
class TReplyInfo {
2427
private:
2528
std::unique_ptr<NActors::IEventBase> Event;
@@ -43,8 +46,6 @@ class TTxWrite : public NTabletFlatExecutor::TTransactionBase<TColumnShard> {
4346
std::vector<std::shared_ptr<TTxController::ITransactionOperator>> ResultOperators;
4447

4548

46-
bool InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TInsertWriteId writeId);
47-
4849
TStringBuilder TxPrefix() const {
4950
return TStringBuilder() << "TxWrite[" << ToString(TabletTxNo) << "] ";
5051
}

ydb/core/tx/columnshard/columnshard_impl.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -544,6 +544,10 @@ class TColumnShard
544544
public:
545545
ui64 TabletTxCounter = 0;
546546

547+
const TTablesManager& GetTablesManager() const {
548+
return TablesManager;
549+
}
550+
547551
bool HasLongTxWrites(const TInsertWriteId insertWriteId) const {
548552
return LongTxWrites.contains(insertWriteId);
549553
}

ydb/core/tx/columnshard/engines/insert_table/committed.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@ class TCommittedData: public TUserDataContainer {
2525
, DedupId(dedupId) {
2626
}
2727

28-
TCommittedData(const std::shared_ptr<TUserData>& userData, const TSnapshot& ss, const TInsertWriteId insertWriteId)
28+
TCommittedData(const std::shared_ptr<TUserData>& userData, const TSnapshot& ss, const ui64 generation, const TInsertWriteId ephemeralWriteId)
2929
: TBase(userData)
3030
, Snapshot(ss)
31-
, DedupId(ToString(ss.GetPlanStep()) + ":" + ToString((ui64)insertWriteId)) {
31+
, DedupId(ToString(generation) + ":" + ToString(ephemeralWriteId)) {
3232
}
3333

3434
void SetRemove() {

ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,22 @@ TInsertionSummary::TCounters TInsertTable::Commit(
5757
return counters;
5858
}
5959

60+
TInsertionSummary::TCounters TInsertTable::CommitEphemeral(IDbWrapper& dbTable, TCommittedData&& data) {
61+
TInsertionSummary::TCounters counters;
62+
counters.Rows += data.GetMeta().GetNumRows();
63+
counters.RawBytes += data.GetMeta().GetRawBytes();
64+
counters.Bytes += data.BlobSize();
65+
66+
AddBlobLink(data.GetBlobRange().BlobId);
67+
const ui64 pathId = data.GetPathId();
68+
auto& pathInfo = Summary.GetPathInfo(pathId);
69+
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "commit_insertion")("path_id", pathId)("blob_range", data.GetBlobRange().ToString());
70+
dbTable.Commit(data);
71+
pathInfo.AddCommitted(std::move(data));
72+
73+
return counters;
74+
}
75+
6076
void TInsertTable::Abort(IDbWrapper& dbTable, const THashSet<TInsertWriteId>& writeIds) {
6177
Y_ABORT_UNLESS(!writeIds.empty());
6278

ydb/core/tx/columnshard/engines/insert_table/insert_table.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ class TInsertTable: public TInsertTableAccessor {
9898
bool Insert(IDbWrapper& dbTable, TInsertedData&& data);
9999
TInsertionSummary::TCounters Commit(
100100
IDbWrapper& dbTable, ui64 planStep, ui64 txId, const THashSet<TInsertWriteId>& writeIds, std::function<bool(ui64)> pathExists);
101+
TInsertionSummary::TCounters CommitEphemeral(IDbWrapper& dbTable, TCommittedData&& data);
101102
void Abort(IDbWrapper& dbTable, const THashSet<TInsertWriteId>& writeIds);
102103
void MarkAsNotAbortable(const TInsertWriteId writeId) {
103104
Summary.MarkAsNotAbortable(writeId);

ydb/core/tx/columnshard/engines/insert_table/inserted.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
namespace NKikimr::NOlap {
77

8-
TCommittedData TInsertedData::Commit(const ui64 planStep, const ui64 txId) {
8+
TCommittedData TInsertedData::Commit(const ui64 planStep, const ui64 txId) const {
99
return TCommittedData(UserData, planStep, txId, InsertWriteId);
1010
}
1111

ydb/core/tx/columnshard/engines/insert_table/inserted.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ class TInsertedData: public TUserDataContainer {
2929
/// One of them wins and becomes committed. Original DedupId would be lost then.
3030
/// After commit we use original Initiator:WriteId as DedupId of inserted blob inside {PlanStep, TxId}.
3131
/// pathId, initiator, {writeId}, {dedupId} -> pathId, planStep, txId, {dedupId}
32-
[[nodiscard]] TCommittedData Commit(const ui64 planStep, const ui64 txId);
32+
[[nodiscard]] TCommittedData Commit(const ui64 planStep, const ui64 txId) const;
3333
};
3434

3535
} // namespace NKikimr::NOlap

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ TConclusionStatus TStartMergeTask::DoExecuteImpl() {
8585
break;
8686
}
8787
}
88-
if (MergingContext->IsExclusiveInterval() && sourcesInMemory) {
88+
if ((MergingContext->IsExclusiveInterval() || Context->GetCommonContext()->GetReadMetadata()->HasGuaranteeExclusivePK()) &&
89+
sourcesInMemory) {
8990
TMemoryProfileGuard mGuard("SCAN_PROFILE::MERGE::EXCLUSIVE", IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN_MEMORY));
9091
auto& container = Sources.begin()->second->GetStageResult().GetBatch();
9192
if (container && container->num_rows()) {

ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -451,8 +451,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
451451
engine.Load(db);
452452

453453
std::vector<TCommittedData> dataToIndex = {
454-
TCommittedData(TUserData::Build(paths[0], blobRanges[0], TLocalHelper::GetMetaProto(), 0, {}), TSnapshot(1, 2), (TInsertWriteId)2),
455-
TCommittedData(TUserData::Build(paths[0], blobRanges[1], TLocalHelper::GetMetaProto(), 0, {}), TSnapshot(2, 1), (TInsertWriteId)1)
454+
TCommittedData(TUserData::Build(paths[0], blobRanges[0], TLocalHelper::GetMetaProto(), 0, {}), TSnapshot(1, 2), 0, (TInsertWriteId)2),
455+
TCommittedData(TUserData::Build(paths[0], blobRanges[1], TLocalHelper::GetMetaProto(), 0, {}), TSnapshot(2, 1), 0, (TInsertWriteId)1)
456456
};
457457

458458
// write
@@ -553,7 +553,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
553553
std::vector<TCommittedData> dataToIndex;
554554
TSnapshot ss(planStep, txId);
555555
dataToIndex.push_back(
556-
TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, (TInsertWriteId)txId));
556+
TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, 0, (TInsertWriteId)txId));
557557

558558
bool ok = Insert(engine, db, ss, std::move(dataToIndex), blobs, step);
559559
UNIT_ASSERT(ok);
@@ -651,7 +651,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
651651
// PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]
652652
std::vector<TCommittedData> dataToIndex;
653653
TSnapshot ss(planStep, txId);
654-
dataToIndex.push_back(TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, (TInsertWriteId)txId));
654+
dataToIndex.push_back(
655+
TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, 0, (TInsertWriteId)txId));
655656

656657
bool ok = Insert(engine, db, ss, std::move(dataToIndex), blobs, step);
657658
blobsAll.Merge(std::move(blobs));
@@ -682,7 +683,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
682683
// PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]
683684
std::vector<TCommittedData> dataToIndex;
684685
TSnapshot ss(planStep, txId);
685-
dataToIndex.push_back(TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, TInsertWriteId(txId)));
686+
dataToIndex.push_back(
687+
TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, 0, TInsertWriteId(txId)));
686688

687689
bool ok = Insert(engine, db, ss, std::move(dataToIndex), blobs, step);
688690
UNIT_ASSERT(ok);
@@ -730,7 +732,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
730732
TSnapshot ss(planStep, txId);
731733
std::vector<TCommittedData> dataToIndex;
732734
dataToIndex.push_back(
733-
TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, TInsertWriteId(txId)));
735+
TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, 0, TInsertWriteId(txId)));
734736

735737
bool ok = Insert(engine, db, ss, std::move(dataToIndex), blobs, step);
736738
UNIT_ASSERT(ok);

ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
#include "indexed_blob_constructor.h"
22

3-
#include <ydb/core/tx/columnshard/defs.h>
43
#include <ydb/core/tx/columnshard/blob.h>
4+
#include <ydb/core/tx/columnshard/columnshard_impl.h>
55
#include <ydb/core/tx/columnshard/columnshard_private_events.h>
6-
6+
#include <ydb/core/tx/columnshard/defs.h>
77

88
namespace NKikimr::NOlap {
99

10-
TIndexedWriteController::TIndexedWriteController(const TActorId& dstActor, const std::shared_ptr<IBlobsWritingAction>& action, std::vector<std::shared_ptr<TWriteAggregation>>&& aggregations)
10+
TIndexedWriteController::TIndexedWriteController(
11+
const TActorId& dstActor, const std::shared_ptr<IBlobsWritingAction>& action, std::vector<std::shared_ptr<TWriteAggregation>>&& aggregations)
1112
: Buffer(action, std::move(aggregations))
12-
, DstActor(dstActor)
13-
{
13+
, DstActor(dstActor) {
1414
auto blobs = Buffer.GroupIntoBlobs();
1515
for (auto&& b : blobs) {
1616
auto& task = AddWriteTask(TBlobWriteInfo::BuildWriteTask(b.ExtractBlobData(), action));
@@ -33,6 +33,26 @@ void TWideSerializedBatch::InitBlobId(const TUnifiedBlobId& id) {
3333
Range.BlobId = id;
3434
}
3535

36+
std::shared_ptr<NKikimr::NOlap::TUserData> TWideSerializedBatch::BuildInsertionUserData(const NColumnShard::TColumnShard& owner) const {
37+
NKikimrTxColumnShard::TLogicalMetadata meta;
38+
meta.SetNumRows(SplittedBlobs.GetRowsCount());
39+
meta.SetRawBytes(SplittedBlobs.GetRawBytes());
40+
meta.SetDirtyWriteTimeSeconds(GetStartInstant().Seconds());
41+
meta.SetSpecialKeysRawData(SplittedBlobs.GetSpecialKeysFullSafe());
42+
meta.SetSpecialKeysPayloadData(SplittedBlobs.GetSpecialKeysPayloadSafe());
43+
44+
const auto& blobRange = Range;
45+
Y_ABORT_UNLESS(blobRange.GetBlobId().IsValid());
46+
47+
const auto& writeMeta = GetAggregation().GetWriteMeta();
48+
meta.SetModificationType(TEnumOperator<NEvWrite::EModificationType>::SerializeToProto(writeMeta.GetModificationType()));
49+
*meta.MutableSchemaSubset() = GetAggregation().GetSchemaSubset().SerializeToProto();
50+
auto schemeVersion = GetAggregation().GetSchemaVersion();
51+
auto tableSchema = owner.GetTablesManager().GetPrimaryIndex()->GetVersionedIndex().GetSchemaVerified(schemeVersion);
52+
53+
return std::make_shared<NOlap::TUserData>(writeMeta.GetTableId(), blobRange, meta, tableSchema->GetVersion(), SplittedBlobs.GetData());
54+
}
55+
3656
void TWritingBuffer::InitReadyInstant(const TMonotonic instant) {
3757
for (auto&& aggr : Aggregations) {
3858
aggr->MutableWriteMeta().SetWriteMiddle5StartInstant(instant);
@@ -89,4 +109,4 @@ TString TWritingBlob::ExtractBlobData() {
89109
return result;
90110
}
91111

92-
}
112+
} // namespace NKikimr::NOlap

0 commit comments

Comments
 (0)