Skip to content

Commit c2dc821

Browse files
usage chunked merge by optional and usage full-batches-merge for norm… (#972)
1 parent 5af3641 commit c2dc821

File tree

3 files changed

+57
-18
lines changed

3 files changed

+57
-18
lines changed

ydb/core/protos/config.proto

+2-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ message TActorSystemConfig {
5353

5454
enum EActorSystemProfile {
5555
DEFAULT = 1;
56-
LOW_CPU_CONSUMPTION = 2;
56+
LOW_CPU_CONSUMPTION = 2;
5757
LOW_LATENCY = 3;
5858
}
5959

@@ -1432,6 +1432,7 @@ message TColumnShardConfig {
14321432
optional bool TTLEnabled = 6 [default = true];
14331433
optional bool WritingEnabled = 7 [default = true];
14341434
optional uint32 WritingBufferDurationMs = 8 [default = 0];
1435+
optional bool UseChunkedMergeOnCompaction = 9 [default = false];
14351436
}
14361437

14371438
message TSchemeShardConfig {

ydb/core/tx/columnshard/engines/changes/general_compaction.cpp

+53-17
Original file line numberDiff line numberDiff line change
@@ -16,28 +16,36 @@
1616

1717
namespace NKikimr::NOlap::NCompaction {
1818

19-
TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstructionContext& context) noexcept {
19+
void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByFullBatches(TConstructionContext& context) noexcept {
2020
std::vector<TPortionInfoWithBlobs> portions = TPortionInfoWithBlobs::RestorePortions(SwitchedPortions, Blobs);
2121
Blobs.clear();
22-
i64 portionsSize = 0;
23-
i64 portionsCount = 0;
24-
i64 insertedPortionsSize = 0;
25-
i64 compactedPortionsSize = 0;
26-
i64 otherPortionsSize = 0;
27-
for (auto&& i : SwitchedPortions) {
28-
if (i.GetMeta().GetProduced() == TPortionMeta::EProduced::INSERTED) {
29-
insertedPortionsSize += i.GetBlobBytes();
30-
} else if (i.GetMeta().GetProduced() == TPortionMeta::EProduced::SPLIT_COMPACTED) {
31-
compactedPortionsSize += i.GetBlobBytes();
32-
} else {
33-
otherPortionsSize += i.GetBlobBytes();
22+
std::vector<std::shared_ptr<arrow::RecordBatch>> batchResults;
23+
auto resultSchema = context.SchemaVersions.GetLastSchema();
24+
{
25+
auto resultDataSchema = resultSchema->GetIndexInfo().ArrowSchemaWithSpecials();
26+
NIndexedReader::TMergePartialStream mergeStream(resultSchema->GetIndexInfo().GetReplaceKey(), resultDataSchema, false);
27+
for (auto&& i : portions) {
28+
auto dataSchema = context.SchemaVersions.GetSchema(i.GetPortionInfo().GetMinSnapshot());
29+
auto batch = i.GetBatch(dataSchema, *resultSchema);
30+
batch = resultSchema->NormalizeBatch(*dataSchema, batch);
31+
Y_DEBUG_ABORT_UNLESS(NArrow::IsSortedAndUnique(batch, resultSchema->GetIndexInfo().GetReplaceKey()));
32+
mergeStream.AddSource(batch, nullptr);
3433
}
35-
portionsSize += i.GetBlobBytes();
36-
++portionsCount;
34+
batchResults = mergeStream.DrainAllParts(CheckPoints, resultDataSchema->fields());
3735
}
38-
NChanges::TGeneralCompactionCounters::OnPortionsKind(insertedPortionsSize, compactedPortionsSize, otherPortionsSize);
39-
NChanges::TGeneralCompactionCounters::OnRepackPortions(portionsCount, portionsSize);
36+
Y_ABORT_UNLESS(batchResults.size());
37+
for (auto&& b : batchResults) {
38+
auto portions = MakeAppendedPortions(b, GranuleMeta->GetPathId(), resultSchema->GetSnapshot(), GranuleMeta.get(), context);
39+
Y_ABORT_UNLESS(portions.size());
40+
for (auto& portion : portions) {
41+
AppendedPortions.emplace_back(std::move(portion));
42+
}
43+
}
44+
}
4045

46+
void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(TConstructionContext& context) noexcept {
47+
std::vector<TPortionInfoWithBlobs> portions = TPortionInfoWithBlobs::RestorePortions(SwitchedPortions, Blobs);
48+
Blobs.clear();
4149
static const TString portionIdFieldName = "$$__portion_id";
4250
static const TString portionRecordIndexFieldName = "$$__portion_record_idx";
4351
static const std::shared_ptr<arrow::Field> portionIdField = std::make_shared<arrow::Field>(portionIdFieldName, std::make_shared<arrow::UInt16Type>());
@@ -192,6 +200,34 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc
192200
recordIdx += slice.GetRecordsCount();
193201
}
194202
}
203+
}
204+
205+
TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstructionContext& context) noexcept {
206+
i64 portionsSize = 0;
207+
i64 portionsCount = 0;
208+
i64 insertedPortionsSize = 0;
209+
i64 compactedPortionsSize = 0;
210+
i64 otherPortionsSize = 0;
211+
for (auto&& i : SwitchedPortions) {
212+
if (i.GetMeta().GetProduced() == TPortionMeta::EProduced::INSERTED) {
213+
insertedPortionsSize += i.GetBlobBytes();
214+
} else if (i.GetMeta().GetProduced() == TPortionMeta::EProduced::SPLIT_COMPACTED) {
215+
compactedPortionsSize += i.GetBlobBytes();
216+
} else {
217+
otherPortionsSize += i.GetBlobBytes();
218+
}
219+
portionsSize += i.GetBlobBytes();
220+
++portionsCount;
221+
}
222+
NChanges::TGeneralCompactionCounters::OnPortionsKind(insertedPortionsSize, compactedPortionsSize, otherPortionsSize);
223+
NChanges::TGeneralCompactionCounters::OnRepackPortions(portionsCount, portionsSize);
224+
225+
if (AppDataVerified().ColumnShardConfig.GetUseChunkedMergeOnCompaction()) {
226+
BuildAppendedPortionsByChunks(context);
227+
} else {
228+
BuildAppendedPortionsByFullBatches(context);
229+
}
230+
195231
if (IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD)) {
196232
TStringBuilder sbSwitched;
197233
sbSwitched << "";

ydb/core/tx/columnshard/engines/changes/general_compaction.h

+2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges {
99
using TBase = TCompactColumnEngineChanges;
1010
virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override;
1111
std::map<NIndexedReader::TSortableBatchPosition, bool> CheckPoints;
12+
void BuildAppendedPortionsByFullBatches(TConstructionContext& context) noexcept;
13+
void BuildAppendedPortionsByChunks(TConstructionContext& context) noexcept;
1214
protected:
1315
virtual TConclusionStatus DoConstructBlobs(TConstructionContext& context) noexcept override;
1416
virtual TPortionMeta::EProduced GetResultProducedClass() const override {

0 commit comments

Comments
 (0)