Skip to content

Commit e7fa04e

Browse files
Merge 831f41c into fa53e40
2 parents fa53e40 + 831f41c commit e7fa04e

20 files changed

+375
-420
lines changed

ydb/core/formats/arrow/common/accessor.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,13 @@ class IChunkedArray {
9191

9292
template <class TChunkAccessor>
9393
TCurrentChunkAddress SelectChunk(const std::optional<TCurrentChunkAddress>& chunkCurrent, const ui64 position, const TChunkAccessor& accessor) const {
94-
if (!chunkCurrent || position >= chunkCurrent->GetStartPosition() + chunkCurrent->GetLength()) {
94+
if (!chunkCurrent || position >= chunkCurrent->GetStartPosition()) {
9595
ui32 startIndex = 0;
9696
ui64 idx = 0;
9797
if (chunkCurrent) {
98-
AFL_VERIFY(chunkCurrent->GetChunkIndex() + 1 < accessor.GetChunksCount());
99-
startIndex = chunkCurrent->GetChunkIndex() + 1;
100-
idx = chunkCurrent->GetStartPosition() + chunkCurrent->GetLength();
98+
AFL_VERIFY(chunkCurrent->GetChunkIndex() < accessor.GetChunksCount());
99+
startIndex = chunkCurrent->GetChunkIndex();
100+
idx = chunkCurrent->GetStartPosition();
101101
}
102102
for (ui32 i = startIndex; i < accessor.GetChunksCount(); ++i) {
103103
const ui64 nextIdx = idx + accessor.GetChunkLength(i);

ydb/core/protos/config.proto

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1497,7 +1497,6 @@ message TColumnShardConfig {
14971497
optional bool TTLEnabled = 6 [default = true];
14981498
optional bool WritingEnabled = 7 [default = true];
14991499
optional uint32 WritingBufferDurationMs = 8 [default = 0];
1500-
optional bool UseChunkedMergeOnCompaction = 9 [default = true];
15011500
optional uint64 CompactionMemoryLimit = 10 [default = 536870912];
15021501
optional uint64 TieringsMemoryLimit = 11 [default = 536870912];
15031502
message TIndexMetadataMemoryLimit {

ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.cpp

Lines changed: 16 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -4,35 +4,27 @@
44
namespace NKikimr::NOlap::NCompaction {
55

66
bool TPortionColumnCursor::Fetch(TMergedColumn& column) {
7-
Y_ABORT_UNLESS(ChunkIdx < ColumnChunks.size());
87
Y_ABORT_UNLESS(RecordIndexStart);
9-
ui32 currentStartPortionIdx = *RecordIndexStart;
10-
ui32 currentFinishPortionIdx = RecordIndexFinish;
118
// NActors::TLogContextGuard lg(NActors::TLogContextBuilder::Build()("portion_id", PortionId));
12-
while (currentStartPortionIdx - ChunkRecordIndexStartPosition >= CurrentChunkRecordsCount) {
13-
if (!NextChunk()) {
14-
return false;
9+
CurrentChunk = BlobChunks->GetChunk(CurrentChunk, *RecordIndexStart);
10+
11+
ui32 currentStart = *RecordIndexStart;
12+
while (RecordIndexFinish >= CurrentChunk->GetFinishPosition()) {
13+
column.AppendSlice(
14+
CurrentChunk->GetArray(), currentStart - CurrentChunk->GetStartPosition(), CurrentChunk->GetFinishPosition() - currentStart);
15+
currentStart = CurrentChunk->GetFinishPosition();
16+
if (currentStart < BlobChunks->GetRecordsCount()) {
17+
CurrentChunk = BlobChunks->GetChunk(CurrentChunk, currentStart);
18+
} else {
19+
CurrentChunk.reset();
20+
break;
1521
}
1622
}
1723

18-
ui32 currentStart = currentStartPortionIdx - ChunkRecordIndexStartPosition;
19-
while (currentFinishPortionIdx - ChunkRecordIndexStartPosition >= CurrentChunkRecordsCount) {
20-
const ui32 currentFinish = CurrentChunkRecordsCount;
21-
// if (currentStart == 0 && CurrentColumnChunk) {
22-
// column.AppendBlob(CurrentBlobChunk->GetData(), *CurrentColumnChunk);
23-
// } else {
24-
column.AppendSlice(GetCurrentArray(), currentStart, currentFinish - currentStart);
25-
// }
26-
currentStart = 0;
27-
if (!NextChunk()) {
28-
return false;
29-
}
30-
}
31-
32-
const ui32 currentFinish = currentFinishPortionIdx - ChunkRecordIndexStartPosition;
33-
if (currentStart < currentFinish) {
34-
Y_ABORT_UNLESS(currentFinish < CurrentChunkRecordsCount);
35-
column.AppendSlice(GetCurrentArray(), currentStart, currentFinish - currentStart);
24+
if (currentStart < RecordIndexFinish) {
25+
AFL_VERIFY(CurrentChunk);
26+
Y_ABORT_UNLESS(RecordIndexFinish < CurrentChunk->GetFinishPosition());
27+
column.AppendSlice(CurrentChunk->GetArray(), currentStart - CurrentChunk->GetStartPosition(), RecordIndexFinish - currentStart);
3628
}
3729

3830
RecordIndexStart.reset();
@@ -41,7 +33,6 @@ bool TPortionColumnCursor::Fetch(TMergedColumn& column) {
4133
}
4234

4335
bool TPortionColumnCursor::Next(const ui32 portionRecordIdx, TMergedColumn& column) {
44-
Y_ABORT_UNLESS(ChunkRecordIndexStartPosition <= portionRecordIdx);
4536
if (!RecordIndexStart) {
4637
RecordIndexStart = portionRecordIdx;
4738
RecordIndexFinish = portionRecordIdx + 1;
@@ -55,29 +46,4 @@ bool TPortionColumnCursor::Next(const ui32 portionRecordIdx, TMergedColumn& colu
5546
return true;
5647
}
5748

58-
bool TPortionColumnCursor::NextChunk() {
59-
CurrentArray = nullptr;
60-
if (++ChunkIdx == ColumnChunks.size()) {
61-
return false;
62-
} else {
63-
ChunkRecordIndexStartPosition += CurrentChunkRecordsCount;
64-
CurrentBlobChunk = BlobChunks[ChunkIdx];
65-
CurrentColumnChunk = ColumnChunks[ChunkIdx];
66-
CurrentChunkRecordsCount = CurrentBlobChunk->GetRecordsCountVerified();
67-
return true;
68-
}
69-
}
70-
71-
const std::shared_ptr<arrow::Array>& TPortionColumnCursor::GetCurrentArray() {
72-
Y_ABORT_UNLESS(ChunkIdx < ColumnChunks.size());
73-
Y_ABORT_UNLESS(CurrentBlobChunk);
74-
75-
if (!CurrentArray) {
76-
auto res = NArrow::TStatusValidator::GetValid(ColumnLoader->Apply(CurrentBlobChunk->GetData()));
77-
AFL_VERIFY(res->num_columns() == 1);
78-
CurrentArray = res->column(0);
79-
}
80-
return CurrentArray;
81-
}
82-
8349
}

ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,46 +9,25 @@ namespace NKikimr::NOlap::NCompaction {
99

1010
class TPortionColumnCursor {
1111
private:
12-
std::vector<std::shared_ptr<IPortionDataChunk>> BlobChunks;
13-
std::vector<const TColumnRecord*> ColumnChunks;
12+
std::optional<NArrow::NAccessor::IChunkedArray::TCurrentChunkAddress> CurrentChunk;
13+
std::shared_ptr<NArrow::NAccessor::IChunkedArray> BlobChunks;
1414
std::optional<ui32> RecordIndexStart;
1515
YDB_READONLY(ui32, RecordIndexFinish, 0);
16-
ui32 ChunkRecordIndexStartPosition = 0;
17-
ui32 ChunkIdx = 0;
18-
std::shared_ptr<IPortionDataChunk> CurrentBlobChunk;
19-
const TColumnRecord* CurrentColumnChunk = nullptr;
20-
ui32 CurrentChunkRecordsCount = 0;
21-
std::shared_ptr<arrow::Array> CurrentArray;
22-
std::shared_ptr<TColumnLoader> ColumnLoader;
2316
const ui64 PortionId;
2417

25-
const std::shared_ptr<arrow::Array>& GetCurrentArray();
26-
27-
bool NextChunk();
28-
2918
public:
3019
~TPortionColumnCursor() {
31-
AFL_VERIFY(!RecordIndexStart || ChunkIdx == ColumnChunks.size())("chunk", ChunkIdx)
32-
("size", ColumnChunks.size())("start", RecordIndexStart)("finish", RecordIndexFinish)
33-
("max", CurrentBlobChunk->GetRecordsCount())("current_start_position", ChunkRecordIndexStartPosition);
20+
AFL_VERIFY(!RecordIndexStart)("start", RecordIndexStart)("finish", RecordIndexFinish);
3421
}
3522

3623
bool Next(const ui32 portionRecordIdx, TMergedColumn& column);
3724

3825
bool Fetch(TMergedColumn& column);
3926

40-
TPortionColumnCursor(const std::vector<std::shared_ptr<IPortionDataChunk>>& columnChunks, const std::vector<const TColumnRecord*>& records, const std::shared_ptr<TColumnLoader>& loader, const ui64 portionId)
27+
TPortionColumnCursor(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& columnChunks, const ui64 portionId)
4128
: BlobChunks(columnChunks)
42-
, ColumnChunks(records)
43-
, ColumnLoader(loader)
4429
, PortionId(portionId) {
45-
AFL_VERIFY(ColumnLoader);
4630
Y_UNUSED(PortionId);
47-
Y_ABORT_UNLESS(BlobChunks.size());
48-
Y_ABORT_UNLESS(ColumnChunks.size() == BlobChunks.size());
49-
CurrentBlobChunk = BlobChunks.front();
50-
CurrentColumnChunk = ColumnChunks.front();
51-
CurrentChunkRecordsCount = CurrentBlobChunk->GetRecordsCountVerified();
5231
}
5332
};
5433

ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ ui32 TColumnPortion::AppendSlice(const std::shared_ptr<arrow::Array>& a, const u
3838
Y_ABORT_UNLESS(length);
3939
Y_ABORT_UNLESS(CurrentPortionRecords < Context.GetPortionRowsCountLimit());
4040
Y_ABORT_UNLESS(startIndex + length <= a->length());
41+
AFL_VERIFY(Type->id() == a->type_id())("own", Type->ToString())("a", a->type()->ToString());
4142
ui32 i = startIndex;
4243
const ui32 packedRecordSize = Context.GetColumnStat() ? Context.GetColumnStat()->GetPackedRecordSize() : 0;
4344
for (; i < startIndex + length; ++i) {

ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ class TColumnPortion: public TColumnPortionResult {
4444
private:
4545
using TBase = TColumnPortionResult;
4646
std::unique_ptr<arrow::ArrayBuilder> Builder;
47+
std::shared_ptr<arrow::DataType> Type;
4748
const TColumnMergeContext& Context;
4849
YDB_READONLY(ui64, CurrentChunkRawSize, 0);
4950
double PredictedPackedBytes = 0;
@@ -55,6 +56,7 @@ class TColumnPortion: public TColumnPortionResult {
5556
, ColumnInfo(Context.GetIndexInfo().GetColumnFeaturesVerified(context.GetColumnId()))
5657
{
5758
Builder = Context.MakeBuilder();
59+
Type = Builder->type();
5860
}
5961

6062
bool IsFullPortion() const {
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
#include "merger.h"
2+
3+
#include "column_cursor.h"
4+
#include "column_portion_chunk.h"
5+
#include "merge_context.h"
6+
#include "merged_column.h"
7+
8+
#include <ydb/core/formats/arrow/reader/merger.h>
9+
#include <ydb/core/formats/arrow/simple_builder/array.h>
10+
#include <ydb/core/formats/arrow/simple_builder/filler.h>
11+
#include <ydb/core/tx/columnshard/splitter/batch_slice.h>
12+
13+
namespace NKikimr::NOlap::NCompaction {
14+
15+
std::vector<NKikimr::NOlap::TWritePortionInfoWithBlobsResult> TMerger::Execute(const std::shared_ptr<TSerializationStats>& stats,
16+
const std::map<NArrow::NMerger::TSortableBatchPosition, bool>& checkPoints, const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered,
17+
const ui64 pathId, const std::optional<ui64> shardingActualVersion) {
18+
AFL_VERIFY(Batches.size() == Filters.size());
19+
static const TString portionIdFieldName = "$$__portion_id";
20+
static const TString portionRecordIndexFieldName = "$$__portion_record_idx";
21+
static const std::shared_ptr<arrow::Field> portionIdField =
22+
std::make_shared<arrow::Field>(portionIdFieldName, std::make_shared<arrow::UInt16Type>());
23+
static const std::shared_ptr<arrow::Field> portionRecordIndexField =
24+
std::make_shared<arrow::Field>(portionRecordIndexFieldName, std::make_shared<arrow::UInt32Type>());
25+
26+
std::vector<std::shared_ptr<arrow::RecordBatch>> batchResults;
27+
{
28+
arrow::FieldVector indexFields;
29+
indexFields.emplace_back(portionIdField);
30+
indexFields.emplace_back(portionRecordIndexField);
31+
IIndexInfo::AddSpecialFields(indexFields);
32+
auto dataSchema = std::make_shared<arrow::Schema>(indexFields);
33+
NArrow::NMerger::TMergePartialStream mergeStream(
34+
resultFiltered->GetIndexInfo().GetReplaceKey(), dataSchema, false, IIndexInfo::GetSnapshotColumnNames());
35+
36+
ui32 idx = 0;
37+
for (auto&& batch : Batches) {
38+
{
39+
NArrow::NConstruction::IArrayBuilder::TPtr column =
40+
std::make_shared<NArrow::NConstruction::TSimpleArrayConstructor<NArrow::NConstruction::TIntConstFiller<arrow::UInt16Type>>>(
41+
portionIdFieldName, idx);
42+
batch->AddField(portionIdField, column->BuildArray(batch->num_rows())).Validate();
43+
}
44+
{
45+
NArrow::NConstruction::IArrayBuilder::TPtr column =
46+
std::make_shared<NArrow::NConstruction::TSimpleArrayConstructor<NArrow::NConstruction::TIntSeqFiller<arrow::UInt32Type>>>(
47+
portionRecordIndexFieldName);
48+
batch->AddField(portionRecordIndexField, column->BuildArray(batch->num_rows())).Validate();
49+
}
50+
mergeStream.AddSource(batch, Filters[idx]);
51+
++idx;
52+
}
53+
batchResults = mergeStream.DrainAllParts(checkPoints, indexFields);
54+
}
55+
56+
std::vector<std::map<ui32, std::vector<NCompaction::TColumnPortionResult>>> chunkGroups;
57+
chunkGroups.resize(batchResults.size());
58+
for (auto&& columnId : resultFiltered->GetColumnIds()) {
59+
NActors::TLogContextGuard logGuard(
60+
NActors::TLogContextBuilder::Build()("field_name", resultFiltered->GetIndexInfo().GetColumnName(columnId)));
61+
auto columnInfo = stats->GetColumnInfo(columnId);
62+
auto resultField = resultFiltered->GetIndexInfo().GetColumnFieldVerified(columnId);
63+
64+
std::vector<NCompaction::TPortionColumnCursor> cursors;
65+
{
66+
ui32 idx = 0;
67+
for (auto&& p : Batches) {
68+
cursors.emplace_back(NCompaction::TPortionColumnCursor(p->GetColumnVerified(resultFiltered->GetFieldIndex(columnId)), idx));
69+
++idx;
70+
}
71+
}
72+
73+
ui32 batchesRecordsCount = 0;
74+
ui32 columnRecordsCount = 0;
75+
std::map<std::string, std::vector<NCompaction::TColumnPortionResult>> columnChunks;
76+
ui32 batchIdx = 0;
77+
for (auto&& batchResult : batchResults) {
78+
const ui32 portionRecordsCountLimit =
79+
batchResult->num_rows() / (batchResult->num_rows() / NSplitter::TSplitSettings().GetExpectedRecordsCountOnPage() + 1) + 1;
80+
NCompaction::TColumnMergeContext context(columnId, resultFiltered, portionRecordsCountLimit,
81+
NSplitter::TSplitSettings().GetExpectedUnpackColumnChunkRawSize(), columnInfo);
82+
NCompaction::TMergedColumn mColumn(context);
83+
84+
auto columnPortionIdx = batchResult->GetColumnByName(portionIdFieldName);
85+
auto columnPortionRecordIdx = batchResult->GetColumnByName(portionRecordIndexFieldName);
86+
auto columnSnapshotPlanStepIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP);
87+
auto columnSnapshotTxIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID);
88+
Y_ABORT_UNLESS(columnPortionIdx && columnPortionRecordIdx && columnSnapshotPlanStepIdx && columnSnapshotTxIdx);
89+
Y_ABORT_UNLESS(columnPortionIdx->type_id() == arrow::UInt16Type::type_id);
90+
Y_ABORT_UNLESS(columnPortionRecordIdx->type_id() == arrow::UInt32Type::type_id);
91+
Y_ABORT_UNLESS(columnSnapshotPlanStepIdx->type_id() == arrow::UInt64Type::type_id);
92+
Y_ABORT_UNLESS(columnSnapshotTxIdx->type_id() == arrow::UInt64Type::type_id);
93+
const arrow::UInt16Array& pIdxArray = static_cast<const arrow::UInt16Array&>(*columnPortionIdx);
94+
const arrow::UInt32Array& pRecordIdxArray = static_cast<const arrow::UInt32Array&>(*columnPortionRecordIdx);
95+
96+
AFL_VERIFY(batchResult->num_rows() == pIdxArray.length());
97+
std::optional<ui16> predPortionIdx;
98+
for (ui32 idx = 0; idx < pIdxArray.length(); ++idx) {
99+
const ui16 portionIdx = pIdxArray.Value(idx);
100+
const ui32 portionRecordIdx = pRecordIdxArray.Value(idx);
101+
auto& cursor = cursors[portionIdx];
102+
cursor.Next(portionRecordIdx, mColumn);
103+
if (predPortionIdx && portionIdx != *predPortionIdx) {
104+
cursors[*predPortionIdx].Fetch(mColumn);
105+
}
106+
if (idx + 1 == pIdxArray.length()) {
107+
cursor.Fetch(mColumn);
108+
}
109+
predPortionIdx = portionIdx;
110+
}
111+
chunkGroups[batchIdx][columnId] = mColumn.BuildResult();
112+
batchesRecordsCount += batchResult->num_rows();
113+
columnRecordsCount += mColumn.GetRecordsCount();
114+
AFL_VERIFY(batchResult->num_rows() == mColumn.GetRecordsCount());
115+
++batchIdx;
116+
}
117+
AFL_VERIFY(columnRecordsCount == batchesRecordsCount)("mCount", columnRecordsCount)("bCount", batchesRecordsCount);
118+
}
119+
ui32 batchIdx = 0;
120+
121+
const auto groups =
122+
resultFiltered->GetIndexInfo().GetEntityGroupsByStorageId(IStoragesManager::DefaultStorageId, *SaverContext.GetStoragesManager());
123+
std::vector<TWritePortionInfoWithBlobsResult> result;
124+
for (auto&& columnChunks : chunkGroups) {
125+
auto batchResult = batchResults[batchIdx];
126+
++batchIdx;
127+
Y_ABORT_UNLESS(columnChunks.size());
128+
129+
for (auto&& i : columnChunks) {
130+
if (i.second.size() != columnChunks.begin()->second.size()) {
131+
for (ui32 p = 0; p < std::min<ui32>(columnChunks.begin()->second.size(), i.second.size()); ++p) {
132+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("p_first", columnChunks.begin()->second[p].DebugString())(
133+
"p", i.second[p].DebugString());
134+
}
135+
}
136+
AFL_VERIFY(i.second.size() == columnChunks.begin()->second.size())("first", columnChunks.begin()->second.size())(
137+
"current", i.second.size())("first_name", columnChunks.begin()->first)("current_name", i.first);
138+
}
139+
140+
std::vector<TGeneralSerializedSlice> batchSlices;
141+
std::shared_ptr<TDefaultSchemaDetails> schemaDetails(new TDefaultSchemaDetails(resultFiltered, stats));
142+
143+
for (ui32 i = 0; i < columnChunks.begin()->second.size(); ++i) {
144+
THashMap<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>> portionColumns;
145+
for (auto&& p : columnChunks) {
146+
portionColumns.emplace(p.first, p.second[i].GetChunks());
147+
}
148+
batchSlices.emplace_back(portionColumns, schemaDetails, Context.Counters.SplitterCounters);
149+
}
150+
TSimilarPacker slicer(NSplitter::TSplitSettings().GetExpectedPortionSize());
151+
auto packs = slicer.Split(batchSlices);
152+
153+
ui32 recordIdx = 0;
154+
for (auto&& i : packs) {
155+
TGeneralSerializedSlice slicePrimary(std::move(i));
156+
auto dataWithSecondary = resultFiltered->GetIndexInfo()
157+
.AppendIndexes(slicePrimary.GetPortionChunksToHash(), SaverContext.GetStoragesManager())
158+
.DetachResult();
159+
TGeneralSerializedSlice slice(dataWithSecondary.GetExternalData(), schemaDetails, Context.Counters.SplitterCounters);
160+
161+
auto b = batchResult->Slice(recordIdx, slice.GetRecordsCount());
162+
const ui32 deletionsCount = IIndexInfo::CalcDeletions(b, true);
163+
auto constructor = TWritePortionInfoWithBlobsConstructor::BuildByBlobs(slice.GroupChunksByBlobs(groups),
164+
dataWithSecondary.GetSecondaryInplaceData(), pathId, resultFiltered->GetVersion(), resultFiltered->GetSnapshot(),
165+
SaverContext.GetStoragesManager());
166+
167+
NArrow::TFirstLastSpecialKeys primaryKeys(slice.GetFirstLastPKBatch(resultFiltered->GetIndexInfo().GetReplaceKey()));
168+
NArrow::TMinMaxSpecialKeys snapshotKeys(b, TIndexInfo::ArrowSchemaSnapshot());
169+
constructor.GetPortionConstructor().AddMetadata(*resultFiltered, deletionsCount, primaryKeys, snapshotKeys);
170+
constructor.GetPortionConstructor().MutableMeta().SetTierName(IStoragesManager::DefaultStorageId);
171+
if (shardingActualVersion) {
172+
constructor.GetPortionConstructor().SetShardingVersion(*shardingActualVersion);
173+
}
174+
result.emplace_back(std::move(constructor));
175+
recordIdx += slice.GetRecordsCount();
176+
}
177+
}
178+
return result;
179+
}
180+
181+
}

0 commit comments

Comments
 (0)