Skip to content

Commit a7f5463

Browse files
separate merger logic for make new one
clean deprecated methods that constructed arrow classes directly
1 parent 51ddf40 commit a7f5463

26 files changed

+230
-186
lines changed

ydb/core/formats/arrow/common/accessor.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ class IChunkedArray {
109109
}
110110
idx = nextIdx;
111111
}
112-
} else if (position < chunkCurrent->GetStartPosition()) {
112+
} else {
113113
AFL_VERIFY(chunkCurrent->GetChunkIndex() > 0);
114114
ui64 idx = chunkCurrent->GetStartPosition();
115115
for (i32 i = chunkCurrent->GetChunkIndex() - 1; i >= 0; --i) {

ydb/core/tx/columnshard/engines/changes/compaction/merge_context.cpp renamed to ydb/core/tx/columnshard/engines/changes/compaction/abstract/merger.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
#include "merge_context.h"
1+
#include "merger.h"
22

33
namespace NKikimr::NOlap::NCompaction {
44

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#pragma once
2+
#include <ydb/core/tx/columnshard/engines/changes/compaction/common/result.h>
3+
#include <ydb/core/tx/columnshard/engines/changes/compaction/common/context.h>
4+
5+
namespace NKikimr::NOlap::NCompaction {
6+
class IColumnMerger {
7+
private:
8+
bool Started = false;
9+
10+
virtual std::vector<TColumnPortionResult> DoExecute(
11+
const NCompaction::TColumnMergeContext& context, const std::shared_ptr<arrow::RecordBatch>& remap) = 0;
12+
virtual void DoStart(const std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>>& input) = 0;
13+
14+
public:
15+
static inline const TString PortionIdFieldName = "$$__portion_id";
16+
static inline const TString PortionRecordIndexFieldName = "$$__portion_record_idx";
17+
static inline const std::shared_ptr<arrow::Field> PortionIdField =
18+
std::make_shared<arrow::Field>(PortionIdFieldName, std::make_shared<arrow::UInt16Type>());
19+
static inline const std::shared_ptr<arrow::Field> PortionRecordIndexField =
20+
std::make_shared<arrow::Field>(PortionRecordIndexFieldName, std::make_shared<arrow::UInt32Type>());
21+
22+
virtual ~IColumnMerger() = default;
23+
24+
void Start(const std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>>& input) {
25+
AFL_VERIFY(!Started);
26+
Started = true;
27+
return DoStart(input);
28+
}
29+
30+
std::vector<TColumnPortionResult> Execute(
31+
const NCompaction::TColumnMergeContext& context, const std::shared_ptr<arrow::RecordBatch>& remap) {
32+
return DoExecute(context, remap);
33+
}
34+
};
35+
36+
} // namespace NKikimr::NOlap::NCompaction
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
LIBRARY()
2+
3+
SRCS(
4+
merger.cpp
5+
)
6+
7+
PEERDIR(
8+
ydb/core/tx/columnshard/engines/changes/compaction/common
9+
)
10+
11+
END()
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#include "context.h"
2+
3+
namespace NKikimr::NOlap::NCompaction {
4+
5+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
#include "result.h"
2+
#include <util/string/builder.h>
3+
4+
namespace NKikimr::NOlap::NCompaction {
5+
6+
TString TColumnPortionResult::DebugString() const {
7+
return TStringBuilder() << "chunks=" << Chunks.size() << ";";
8+
}
9+
10+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#pragma once
2+
#include <ydb/core/tx/columnshard/splitter/abstract/chunks.h>
3+
4+
namespace NKikimr::NOlap::NCompaction {
5+
6+
class TColumnPortionResult {
7+
protected:
8+
std::vector<std::shared_ptr<IPortionDataChunk>> Chunks;
9+
const ui32 ColumnId;
10+
public:
11+
12+
TColumnPortionResult(const ui32 columnId)
13+
: ColumnId(columnId) {
14+
15+
}
16+
17+
const std::vector<std::shared_ptr<IPortionDataChunk>>& GetChunks() const {
18+
return Chunks;
19+
}
20+
21+
TString DebugString() const;
22+
23+
};
24+
25+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
LIBRARY()
2+
3+
SRCS(
4+
context.cpp
5+
result.cpp
6+
)
7+
8+
PEERDIR(
9+
ydb/core/tx/columnshard/engines/scheme
10+
)
11+
12+
END()

ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp

Lines changed: 27 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,25 @@
11
#include "merger.h"
22

3-
#include "column_cursor.h"
4-
#include "column_portion_chunk.h"
5-
#include "merge_context.h"
6-
#include "merged_column.h"
3+
#include "abstract/merger.h"
4+
#include "plain/logic.h"
75

86
#include <ydb/core/formats/arrow/reader/merger.h>
7+
#include <ydb/core/formats/arrow/serializer/native.h>
98
#include <ydb/core/formats/arrow/simple_builder/array.h>
109
#include <ydb/core/formats/arrow/simple_builder/filler.h>
11-
#include <ydb/core/formats/arrow/serializer/native.h>
1210
#include <ydb/core/tx/columnshard/splitter/batch_slice.h>
1311

1412
namespace NKikimr::NOlap::NCompaction {
1513

1614
std::vector<NKikimr::NOlap::TWritePortionInfoWithBlobsResult> TMerger::Execute(const std::shared_ptr<TSerializationStats>& stats,
17-
const NArrow::NMerger::TIntervalPositions& checkPoints, const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered,
18-
const ui64 pathId, const std::optional<ui64> shardingActualVersion) {
15+
const NArrow::NMerger::TIntervalPositions& checkPoints, const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered, const ui64 pathId,
16+
const std::optional<ui64> shardingActualVersion) {
1917
AFL_VERIFY(Batches.size() == Filters.size());
20-
static const TString portionIdFieldName = "$$__portion_id";
21-
static const TString portionRecordIndexFieldName = "$$__portion_record_idx";
22-
static const std::shared_ptr<arrow::Field> portionIdField =
23-
std::make_shared<arrow::Field>(portionIdFieldName, std::make_shared<arrow::UInt16Type>());
24-
static const std::shared_ptr<arrow::Field> portionRecordIndexField =
25-
std::make_shared<arrow::Field>(portionRecordIndexFieldName, std::make_shared<arrow::UInt32Type>());
26-
2718
std::vector<std::shared_ptr<arrow::RecordBatch>> batchResults;
2819
{
2920
arrow::FieldVector indexFields;
30-
indexFields.emplace_back(portionIdField);
31-
indexFields.emplace_back(portionRecordIndexField);
21+
indexFields.emplace_back(IColumnMerger::PortionIdField);
22+
indexFields.emplace_back(IColumnMerger::PortionRecordIndexField);
3223
IIndexInfo::AddSpecialFields(indexFields);
3324
auto dataSchema = std::make_shared<arrow::Schema>(indexFields);
3425
NArrow::NMerger::TMergePartialStream mergeStream(
@@ -39,40 +30,40 @@ std::vector<NKikimr::NOlap::TWritePortionInfoWithBlobsResult> TMerger::Execute(c
3930
{
4031
NArrow::NConstruction::IArrayBuilder::TPtr column =
4132
std::make_shared<NArrow::NConstruction::TSimpleArrayConstructor<NArrow::NConstruction::TIntConstFiller<arrow::UInt16Type>>>(
42-
portionIdFieldName, idx);
43-
batch->AddField(portionIdField, column->BuildArray(batch->num_rows())).Validate();
33+
IColumnMerger::PortionIdFieldName, idx);
34+
batch->AddField(IColumnMerger::PortionIdField, column->BuildArray(batch->num_rows())).Validate();
4435
}
4536
{
4637
NArrow::NConstruction::IArrayBuilder::TPtr column =
4738
std::make_shared<NArrow::NConstruction::TSimpleArrayConstructor<NArrow::NConstruction::TIntSeqFiller<arrow::UInt32Type>>>(
48-
portionRecordIndexFieldName);
49-
batch->AddField(portionRecordIndexField, column->BuildArray(batch->num_rows())).Validate();
39+
IColumnMerger::PortionRecordIndexFieldName);
40+
batch->AddField(IColumnMerger::PortionRecordIndexField, column->BuildArray(batch->num_rows())).Validate();
5041
}
5142
mergeStream.AddSource(batch, Filters[idx]);
5243
++idx;
5344
}
5445
batchResults = mergeStream.DrainAllParts(checkPoints, indexFields);
5546
}
5647

57-
std::vector<std::map<ui32, std::vector<NCompaction::TColumnPortionResult>>> chunkGroups;
48+
std::vector<std::map<ui32, std::vector<TColumnPortionResult>>> chunkGroups;
5849
chunkGroups.resize(batchResults.size());
5950
for (auto&& columnId : resultFiltered->GetColumnIds()) {
6051
NActors::TLogContextGuard logGuard(
6152
NActors::TLogContextBuilder::Build()("field_name", resultFiltered->GetIndexInfo().GetColumnName(columnId)));
6253
auto columnInfo = stats->GetColumnInfo(columnId);
6354
auto resultField = resultFiltered->GetIndexInfo().GetColumnFieldVerified(columnId);
55+
std::shared_ptr<IColumnMerger> merger = std::make_shared<TPlainMerger>();
56+
// resultFiltered->BuildColumnMergerVerified(columnId);
6457

65-
std::vector<NCompaction::TPortionColumnCursor> cursors;
6658
{
67-
ui32 idx = 0;
59+
std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> parts;
6860
for (auto&& p : Batches) {
69-
cursors.emplace_back(NCompaction::TPortionColumnCursor(p->GetColumnVerified(resultFiltered->GetFieldIndex(columnId)), idx));
70-
++idx;
61+
parts.emplace_back(p->GetColumnVerified(resultFiltered->GetFieldIndex(columnId)));
7162
}
63+
64+
merger->Start(parts);
7265
}
7366

74-
ui32 batchesRecordsCount = 0;
75-
ui32 columnRecordsCount = 0;
7667
std::map<std::string, std::vector<NCompaction::TColumnPortionResult>> columnChunks;
7768
ui32 batchIdx = 0;
7869
for (auto&& batchResult : batchResults) {
@@ -92,42 +83,10 @@ std::vector<NKikimr::NOlap::TWritePortionInfoWithBlobsResult> TMerger::Execute(c
9283

9384
NCompaction::TColumnMergeContext context(columnId, resultFiltered, portionRecordsCountLimit,
9485
NSplitter::TSplitSettings().GetExpectedUnpackColumnChunkRawSize(), columnInfo, externalSaver);
95-
NCompaction::TMergedColumn mColumn(context);
96-
97-
auto columnPortionIdx = batchResult->GetColumnByName(portionIdFieldName);
98-
auto columnPortionRecordIdx = batchResult->GetColumnByName(portionRecordIndexFieldName);
99-
auto columnSnapshotPlanStepIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP);
100-
auto columnSnapshotTxIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID);
101-
Y_ABORT_UNLESS(columnPortionIdx && columnPortionRecordIdx && columnSnapshotPlanStepIdx && columnSnapshotTxIdx);
102-
Y_ABORT_UNLESS(columnPortionIdx->type_id() == arrow::UInt16Type::type_id);
103-
Y_ABORT_UNLESS(columnPortionRecordIdx->type_id() == arrow::UInt32Type::type_id);
104-
Y_ABORT_UNLESS(columnSnapshotPlanStepIdx->type_id() == arrow::UInt64Type::type_id);
105-
Y_ABORT_UNLESS(columnSnapshotTxIdx->type_id() == arrow::UInt64Type::type_id);
106-
const arrow::UInt16Array& pIdxArray = static_cast<const arrow::UInt16Array&>(*columnPortionIdx);
107-
const arrow::UInt32Array& pRecordIdxArray = static_cast<const arrow::UInt32Array&>(*columnPortionRecordIdx);
108-
109-
AFL_VERIFY(batchResult->num_rows() == pIdxArray.length());
110-
std::optional<ui16> predPortionIdx;
111-
for (ui32 idx = 0; idx < pIdxArray.length(); ++idx) {
112-
const ui16 portionIdx = pIdxArray.Value(idx);
113-
const ui32 portionRecordIdx = pRecordIdxArray.Value(idx);
114-
auto& cursor = cursors[portionIdx];
115-
cursor.Next(portionRecordIdx, mColumn);
116-
if (predPortionIdx && portionIdx != *predPortionIdx) {
117-
cursors[*predPortionIdx].Fetch(mColumn);
118-
}
119-
if (idx + 1 == pIdxArray.length()) {
120-
cursor.Fetch(mColumn);
121-
}
122-
predPortionIdx = portionIdx;
123-
}
124-
chunkGroups[batchIdx][columnId] = mColumn.BuildResult();
125-
batchesRecordsCount += batchResult->num_rows();
126-
columnRecordsCount += mColumn.GetRecordsCount();
127-
AFL_VERIFY(batchResult->num_rows() == mColumn.GetRecordsCount());
86+
87+
chunkGroups[batchIdx][columnId] = merger->Execute(context, batchResult);
12888
++batchIdx;
12989
}
130-
AFL_VERIFY(columnRecordsCount == batchesRecordsCount)("mCount", columnRecordsCount)("bCount", batchesRecordsCount);
13190
}
13291
ui32 batchIdx = 0;
13392

@@ -149,6 +108,12 @@ std::vector<NKikimr::NOlap::TWritePortionInfoWithBlobsResult> TMerger::Execute(c
149108
AFL_VERIFY(i.second.size() == columnChunks.begin()->second.size())("first", columnChunks.begin()->second.size())(
150109
"current", i.second.size())("first_name", columnChunks.begin()->first)("current_name", i.first);
151110
}
111+
auto columnSnapshotPlanStepIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP);
112+
auto columnSnapshotTxIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID);
113+
Y_ABORT_UNLESS(columnSnapshotPlanStepIdx);
114+
Y_ABORT_UNLESS(columnSnapshotTxIdx);
115+
Y_ABORT_UNLESS(columnSnapshotPlanStepIdx->type_id() == arrow::UInt64Type::type_id);
116+
Y_ABORT_UNLESS(columnSnapshotTxIdx->type_id() == arrow::UInt64Type::type_id);
152117

153118
std::vector<TGeneralSerializedSlice> batchSlices;
154119
std::shared_ptr<TDefaultSchemaDetails> schemaDetails(new TDefaultSchemaDetails(resultFiltered, stats));
@@ -191,4 +156,4 @@ std::vector<NKikimr::NOlap::TWritePortionInfoWithBlobsResult> TMerger::Execute(c
191156
return result;
192157
}
193158

194-
}
159+
} // namespace NKikimr::NOlap::NCompaction

ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.cpp renamed to ydb/core/tx/columnshard/engines/changes/compaction/plain/column_cursor.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ namespace NKikimr::NOlap::NCompaction {
55

66
bool TPortionColumnCursor::Fetch(TMergedColumn& column) {
77
Y_ABORT_UNLESS(RecordIndexStart);
8-
// NActors::TLogContextGuard lg(NActors::TLogContextBuilder::Build()("portion_id", PortionId));
98
if (CurrentChunk && CurrentChunk->GetStartPosition() <= *RecordIndexStart && *RecordIndexStart < CurrentChunk->GetFinishPosition()) {
109

1110
} else {

ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h renamed to ydb/core/tx/columnshard/engines/changes/compaction/plain/column_cursor.h

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@ class TPortionColumnCursor {
1313
std::shared_ptr<NArrow::NAccessor::IChunkedArray> BlobChunks;
1414
std::optional<ui32> RecordIndexStart;
1515
YDB_READONLY(ui32, RecordIndexFinish, 0);
16-
const ui64 PortionId;
17-
1816
public:
1917
~TPortionColumnCursor() {
2018
AFL_VERIFY(!RecordIndexStart)("start", RecordIndexStart)("finish", RecordIndexFinish);
@@ -24,10 +22,8 @@ class TPortionColumnCursor {
2422

2523
bool Fetch(TMergedColumn& column);
2624

27-
TPortionColumnCursor(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& columnChunks, const ui64 portionId)
28-
: BlobChunks(columnChunks)
29-
, PortionId(portionId) {
30-
Y_UNUSED(PortionId);
25+
TPortionColumnCursor(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& columnChunks)
26+
: BlobChunks(columnChunks) {
3127
}
3228
};
3329

Original file line numberDiff line numberDiff line change
@@ -1,45 +1,16 @@
11
#pragma once
2-
#include "merge_context.h"
32
#include <ydb/core/formats/arrow/simple_arrays_cache.h>
4-
#include <ydb/core/tx/columnshard/splitter/chunks.h>
3+
#include <ydb/core/tx/columnshard/counters/splitter.h>
4+
#include <ydb/core/tx/columnshard/engines/changes/compaction/common/context.h>
5+
#include <ydb/core/tx/columnshard/engines/changes/compaction/common/result.h>
56
#include <ydb/core/tx/columnshard/engines/portions/column_record.h>
67
#include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h>
78
#include <ydb/core/tx/columnshard/engines/scheme/column_features.h>
8-
#include <ydb/core/tx/columnshard/counters/splitter.h>
99
#include <ydb/core/tx/columnshard/splitter/chunk_meta.h>
10+
#include <ydb/core/tx/columnshard/splitter/chunks.h>
1011

1112
namespace NKikimr::NOlap::NCompaction {
1213

13-
class TColumnPortionResult {
14-
protected:
15-
std::vector<std::shared_ptr<IPortionDataChunk>> Chunks;
16-
ui64 CurrentPortionRecords = 0;
17-
const ui32 ColumnId;
18-
ui64 PackedSize = 0;
19-
public:
20-
ui64 GetPackedSize() const {
21-
return PackedSize;
22-
}
23-
24-
TColumnPortionResult(const ui32 columnId)
25-
: ColumnId(columnId) {
26-
27-
}
28-
29-
const std::vector<std::shared_ptr<IPortionDataChunk>>& GetChunks() const {
30-
return Chunks;
31-
}
32-
33-
ui64 GetCurrentPortionRecords() const {
34-
return CurrentPortionRecords;
35-
}
36-
37-
TString DebugString() const {
38-
return TStringBuilder() << "chunks=" << Chunks.size() << ";records=" << CurrentPortionRecords << ";";
39-
}
40-
41-
};
42-
4314
class TColumnPortion: public TColumnPortionResult {
4415
private:
4516
using TBase = TColumnPortionResult;
@@ -49,12 +20,14 @@ class TColumnPortion: public TColumnPortionResult {
4920
YDB_READONLY(ui64, CurrentChunkRawSize, 0);
5021
double PredictedPackedBytes = 0;
5122
const TSimpleColumnInfo ColumnInfo;
23+
ui64 PackedSize = 0;
24+
ui64 CurrentPortionRecords = 0;
25+
5226
public:
5327
TColumnPortion(const TColumnMergeContext& context)
5428
: TBase(context.GetColumnId())
5529
, Context(context)
56-
, ColumnInfo(Context.GetIndexInfo().GetColumnFeaturesVerified(context.GetColumnId()))
57-
{
30+
, ColumnInfo(Context.GetIndexInfo().GetColumnFeaturesVerified(context.GetColumnId())) {
5831
Builder = Context.MakeBuilder();
5932
Type = Builder->type();
6033
}
@@ -70,4 +43,4 @@ class TColumnPortion: public TColumnPortionResult {
7043
ui32 AppendSlice(const std::shared_ptr<arrow::Array>& a, const ui32 startIndex, const ui32 length);
7144
};
7245

73-
}
46+
} // namespace NKikimr::NOlap::NCompaction

0 commit comments

Comments
 (0)