diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 36b5743a93ff..c3b151fae403 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -53,7 +53,7 @@ message TActorSystemConfig { enum EActorSystemProfile { DEFAULT = 1; - LOW_CPU_CONSUMPTION = 2; + LOW_CPU_CONSUMPTION = 2; LOW_LATENCY = 3; } @@ -1432,6 +1432,7 @@ message TColumnShardConfig { optional bool TTLEnabled = 6 [default = true]; optional bool WritingEnabled = 7 [default = true]; optional uint32 WritingBufferDurationMs = 8 [default = 0]; + optional bool UseChunkedMergeOnCompaction = 9 [default = false]; } message TSchemeShardConfig { diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp index 6c3c9caed410..08e4fad95c66 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp @@ -16,28 +16,36 @@ namespace NKikimr::NOlap::NCompaction { -TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstructionContext& context) noexcept { +void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByFullBatches(TConstructionContext& context) noexcept { std::vector portions = TPortionInfoWithBlobs::RestorePortions(SwitchedPortions, Blobs); Blobs.clear(); - i64 portionsSize = 0; - i64 portionsCount = 0; - i64 insertedPortionsSize = 0; - i64 compactedPortionsSize = 0; - i64 otherPortionsSize = 0; - for (auto&& i : SwitchedPortions) { - if (i.GetMeta().GetProduced() == TPortionMeta::EProduced::INSERTED) { - insertedPortionsSize += i.GetBlobBytes(); - } else if (i.GetMeta().GetProduced() == TPortionMeta::EProduced::SPLIT_COMPACTED) { - compactedPortionsSize += i.GetBlobBytes(); - } else { - otherPortionsSize += i.GetBlobBytes(); + std::vector> batchResults; + auto resultSchema = context.SchemaVersions.GetLastSchema(); + { + auto resultDataSchema = resultSchema->GetIndexInfo().ArrowSchemaWithSpecials(); + NIndexedReader::TMergePartialStream mergeStream(resultSchema->GetIndexInfo().GetReplaceKey(), resultDataSchema, false); + for (auto&& i : portions) { + auto dataSchema = context.SchemaVersions.GetSchema(i.GetPortionInfo().GetMinSnapshot()); + auto batch = i.GetBatch(dataSchema, *resultSchema); + batch = resultSchema->NormalizeBatch(*dataSchema, batch); + Y_DEBUG_ABORT_UNLESS(NArrow::IsSortedAndUnique(batch, resultSchema->GetIndexInfo().GetReplaceKey())); + mergeStream.AddSource(batch, nullptr); } - portionsSize += i.GetBlobBytes(); - ++portionsCount; + batchResults = mergeStream.DrainAllParts(CheckPoints, resultDataSchema->fields()); } - NChanges::TGeneralCompactionCounters::OnPortionsKind(insertedPortionsSize, compactedPortionsSize, otherPortionsSize); - NChanges::TGeneralCompactionCounters::OnRepackPortions(portionsCount, portionsSize); + Y_ABORT_UNLESS(batchResults.size()); + for (auto&& b : batchResults) { + auto portions = MakeAppendedPortions(b, GranuleMeta->GetPathId(), resultSchema->GetSnapshot(), GranuleMeta.get(), context); + Y_ABORT_UNLESS(portions.size()); + for (auto& portion : portions) { + AppendedPortions.emplace_back(std::move(portion)); + } + } +} +void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(TConstructionContext& context) noexcept { + std::vector portions = TPortionInfoWithBlobs::RestorePortions(SwitchedPortions, Blobs); + Blobs.clear(); static const TString portionIdFieldName = "$$__portion_id"; static const TString portionRecordIndexFieldName = "$$__portion_record_idx"; static const std::shared_ptr portionIdField = std::make_shared(portionIdFieldName, std::make_shared()); @@ -192,6 +200,34 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc recordIdx += slice.GetRecordsCount(); } } +} + +TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstructionContext& context) noexcept { + i64 portionsSize = 0; + i64 portionsCount = 0; + i64 insertedPortionsSize = 0; + i64 compactedPortionsSize = 0; + i64 otherPortionsSize = 0; + for (auto&& i : SwitchedPortions) { + if (i.GetMeta().GetProduced() == TPortionMeta::EProduced::INSERTED) { + insertedPortionsSize += i.GetBlobBytes(); + } else if (i.GetMeta().GetProduced() == TPortionMeta::EProduced::SPLIT_COMPACTED) { + compactedPortionsSize += i.GetBlobBytes(); + } else { + otherPortionsSize += i.GetBlobBytes(); + } + portionsSize += i.GetBlobBytes(); + ++portionsCount; + } + NChanges::TGeneralCompactionCounters::OnPortionsKind(insertedPortionsSize, compactedPortionsSize, otherPortionsSize); + NChanges::TGeneralCompactionCounters::OnRepackPortions(portionsCount, portionsSize); + + if (AppDataVerified().ColumnShardConfig.GetUseChunkedMergeOnCompaction()) { + BuildAppendedPortionsByChunks(context); + } else { + BuildAppendedPortionsByFullBatches(context); + } + if (IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD)) { TStringBuilder sbSwitched; sbSwitched << ""; diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.h b/ydb/core/tx/columnshard/engines/changes/general_compaction.h index e107aa185363..811eab38aa60 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.h +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.h @@ -9,6 +9,8 @@ class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges { using TBase = TCompactColumnEngineChanges; virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override; std::map CheckPoints; + void BuildAppendedPortionsByFullBatches(TConstructionContext& context) noexcept; + void BuildAppendedPortionsByChunks(TConstructionContext& context) noexcept; protected: virtual TConclusionStatus DoConstructBlobs(TConstructionContext& context) noexcept override; virtual TPortionMeta::EProduced GetResultProducedClass() const override {