1
1
#include " merger.h"
2
2
3
- #include " column_cursor.h"
4
- #include " column_portion_chunk.h"
5
- #include " merge_context.h"
6
- #include " merged_column.h"
3
+ #include " abstract/merger.h"
4
+ #include " plain/logic.h"
7
5
8
6
#include < ydb/core/formats/arrow/reader/merger.h>
7
+ #include < ydb/core/formats/arrow/serializer/native.h>
9
8
#include < ydb/core/formats/arrow/simple_builder/array.h>
10
9
#include < ydb/core/formats/arrow/simple_builder/filler.h>
11
- #include < ydb/core/formats/arrow/serializer/native.h>
12
10
#include < ydb/core/tx/columnshard/splitter/batch_slice.h>
13
11
14
12
namespace NKikimr ::NOlap::NCompaction {
15
13
16
14
std::vector<NKikimr::NOlap::TWritePortionInfoWithBlobsResult> TMerger::Execute (const std::shared_ptr<TSerializationStats>& stats,
17
- const NArrow::NMerger::TIntervalPositions& checkPoints, const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered,
18
- const ui64 pathId, const std::optional<ui64> shardingActualVersion) {
15
+ const NArrow::NMerger::TIntervalPositions& checkPoints, const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered, const ui64 pathId,
16
+ const std::optional<ui64> shardingActualVersion) {
19
17
AFL_VERIFY (Batches.size () == Filters.size ());
20
- static const TString portionIdFieldName = " $$__portion_id" ;
21
- static const TString portionRecordIndexFieldName = " $$__portion_record_idx" ;
22
- static const std::shared_ptr<arrow::Field> portionIdField =
23
- std::make_shared<arrow::Field>(portionIdFieldName, std::make_shared<arrow::UInt16Type>());
24
- static const std::shared_ptr<arrow::Field> portionRecordIndexField =
25
- std::make_shared<arrow::Field>(portionRecordIndexFieldName, std::make_shared<arrow::UInt32Type>());
26
-
27
18
std::vector<std::shared_ptr<arrow::RecordBatch>> batchResults;
28
19
{
29
20
arrow::FieldVector indexFields;
30
- indexFields.emplace_back (portionIdField );
31
- indexFields.emplace_back (portionRecordIndexField );
21
+ indexFields.emplace_back (IColumnMerger::PortionIdField );
22
+ indexFields.emplace_back (IColumnMerger::PortionRecordIndexField );
32
23
IIndexInfo::AddSpecialFields (indexFields);
33
24
auto dataSchema = std::make_shared<arrow::Schema>(indexFields);
34
25
NArrow::NMerger::TMergePartialStream mergeStream (
@@ -39,40 +30,40 @@ std::vector<NKikimr::NOlap::TWritePortionInfoWithBlobsResult> TMerger::Execute(c
39
30
{
40
31
NArrow::NConstruction::IArrayBuilder::TPtr column =
41
32
std::make_shared<NArrow::NConstruction::TSimpleArrayConstructor<NArrow::NConstruction::TIntConstFiller<arrow::UInt16Type>>>(
42
- portionIdFieldName , idx);
43
- batch->AddField (portionIdField , column->BuildArray (batch->num_rows ())).Validate ();
33
+ IColumnMerger::PortionIdFieldName , idx);
34
+ batch->AddField (IColumnMerger::PortionIdField , column->BuildArray (batch->num_rows ())).Validate ();
44
35
}
45
36
{
46
37
NArrow::NConstruction::IArrayBuilder::TPtr column =
47
38
std::make_shared<NArrow::NConstruction::TSimpleArrayConstructor<NArrow::NConstruction::TIntSeqFiller<arrow::UInt32Type>>>(
48
- portionRecordIndexFieldName );
49
- batch->AddField (portionRecordIndexField , column->BuildArray (batch->num_rows ())).Validate ();
39
+ IColumnMerger::PortionRecordIndexFieldName );
40
+ batch->AddField (IColumnMerger::PortionRecordIndexField , column->BuildArray (batch->num_rows ())).Validate ();
50
41
}
51
42
mergeStream.AddSource (batch, Filters[idx]);
52
43
++idx;
53
44
}
54
45
batchResults = mergeStream.DrainAllParts (checkPoints, indexFields);
55
46
}
56
47
57
- std::vector<std::map<ui32, std::vector<NCompaction:: TColumnPortionResult>>> chunkGroups;
48
+ std::vector<std::map<ui32, std::vector<TColumnPortionResult>>> chunkGroups;
58
49
chunkGroups.resize (batchResults.size ());
59
50
for (auto && columnId : resultFiltered->GetColumnIds ()) {
60
51
NActors::TLogContextGuard logGuard (
61
52
NActors::TLogContextBuilder::Build ()(" field_name" , resultFiltered->GetIndexInfo ().GetColumnName (columnId)));
62
53
auto columnInfo = stats->GetColumnInfo (columnId);
63
54
auto resultField = resultFiltered->GetIndexInfo ().GetColumnFieldVerified (columnId);
55
+ std::shared_ptr<IColumnMerger> merger = std::make_shared<TPlainMerger>();
56
+ // resultFiltered->BuildColumnMergerVerified(columnId);
64
57
65
- std::vector<NCompaction::TPortionColumnCursor> cursors;
66
58
{
67
- ui32 idx = 0 ;
59
+ std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> parts ;
68
60
for (auto && p : Batches) {
69
- cursors.emplace_back (NCompaction::TPortionColumnCursor (p->GetColumnVerified (resultFiltered->GetFieldIndex (columnId)), idx));
70
- ++idx;
61
+ parts.emplace_back (p->GetColumnVerified (resultFiltered->GetFieldIndex (columnId)));
71
62
}
63
+
64
+ merger->Start (parts);
72
65
}
73
66
74
- ui32 batchesRecordsCount = 0 ;
75
- ui32 columnRecordsCount = 0 ;
76
67
std::map<std::string, std::vector<NCompaction::TColumnPortionResult>> columnChunks;
77
68
ui32 batchIdx = 0 ;
78
69
for (auto && batchResult : batchResults) {
@@ -92,42 +83,10 @@ std::vector<NKikimr::NOlap::TWritePortionInfoWithBlobsResult> TMerger::Execute(c
92
83
93
84
NCompaction::TColumnMergeContext context (columnId, resultFiltered, portionRecordsCountLimit,
94
85
NSplitter::TSplitSettings ().GetExpectedUnpackColumnChunkRawSize(), columnInfo, externalSaver);
95
- NCompaction::TMergedColumn mColumn (context);
96
-
97
- auto columnPortionIdx = batchResult->GetColumnByName (portionIdFieldName);
98
- auto columnPortionRecordIdx = batchResult->GetColumnByName (portionRecordIndexFieldName);
99
- auto columnSnapshotPlanStepIdx = batchResult->GetColumnByName (TIndexInfo::SPEC_COL_PLAN_STEP);
100
- auto columnSnapshotTxIdx = batchResult->GetColumnByName (TIndexInfo::SPEC_COL_TX_ID);
101
- Y_ABORT_UNLESS (columnPortionIdx && columnPortionRecordIdx && columnSnapshotPlanStepIdx && columnSnapshotTxIdx);
102
- Y_ABORT_UNLESS (columnPortionIdx->type_id () == arrow::UInt16Type::type_id);
103
- Y_ABORT_UNLESS (columnPortionRecordIdx->type_id () == arrow::UInt32Type::type_id);
104
- Y_ABORT_UNLESS (columnSnapshotPlanStepIdx->type_id () == arrow::UInt64Type::type_id);
105
- Y_ABORT_UNLESS (columnSnapshotTxIdx->type_id () == arrow::UInt64Type::type_id);
106
- const arrow::UInt16Array& pIdxArray = static_cast <const arrow::UInt16Array&>(*columnPortionIdx);
107
- const arrow::UInt32Array& pRecordIdxArray = static_cast <const arrow::UInt32Array&>(*columnPortionRecordIdx);
108
-
109
- AFL_VERIFY (batchResult->num_rows () == pIdxArray.length ());
110
- std::optional<ui16> predPortionIdx;
111
- for (ui32 idx = 0 ; idx < pIdxArray.length (); ++idx) {
112
- const ui16 portionIdx = pIdxArray.Value (idx);
113
- const ui32 portionRecordIdx = pRecordIdxArray.Value (idx);
114
- auto & cursor = cursors[portionIdx];
115
- cursor.Next (portionRecordIdx, mColumn );
116
- if (predPortionIdx && portionIdx != *predPortionIdx) {
117
- cursors[*predPortionIdx].Fetch (mColumn );
118
- }
119
- if (idx + 1 == pIdxArray.length ()) {
120
- cursor.Fetch (mColumn );
121
- }
122
- predPortionIdx = portionIdx;
123
- }
124
- chunkGroups[batchIdx][columnId] = mColumn .BuildResult ();
125
- batchesRecordsCount += batchResult->num_rows ();
126
- columnRecordsCount += mColumn .GetRecordsCount ();
127
- AFL_VERIFY (batchResult->num_rows () == mColumn .GetRecordsCount ());
86
+
87
+ chunkGroups[batchIdx][columnId] = merger->Execute (context, batchResult);
128
88
++batchIdx;
129
89
}
130
- AFL_VERIFY (columnRecordsCount == batchesRecordsCount)(" mCount" , columnRecordsCount)(" bCount" , batchesRecordsCount);
131
90
}
132
91
ui32 batchIdx = 0 ;
133
92
@@ -149,6 +108,12 @@ std::vector<NKikimr::NOlap::TWritePortionInfoWithBlobsResult> TMerger::Execute(c
149
108
AFL_VERIFY (i.second .size () == columnChunks.begin ()->second .size ())(" first" , columnChunks.begin ()->second .size ())(
150
109
" current" , i.second .size ())(" first_name" , columnChunks.begin ()->first )(" current_name" , i.first );
151
110
}
111
+ auto columnSnapshotPlanStepIdx = batchResult->GetColumnByName (TIndexInfo::SPEC_COL_PLAN_STEP);
112
+ auto columnSnapshotTxIdx = batchResult->GetColumnByName (TIndexInfo::SPEC_COL_TX_ID);
113
+ Y_ABORT_UNLESS (columnSnapshotPlanStepIdx);
114
+ Y_ABORT_UNLESS (columnSnapshotTxIdx);
115
+ Y_ABORT_UNLESS (columnSnapshotPlanStepIdx->type_id () == arrow::UInt64Type::type_id);
116
+ Y_ABORT_UNLESS (columnSnapshotTxIdx->type_id () == arrow::UInt64Type::type_id);
152
117
153
118
std::vector<TGeneralSerializedSlice> batchSlices;
154
119
std::shared_ptr<TDefaultSchemaDetails> schemaDetails (new TDefaultSchemaDetails (resultFiltered, stats));
@@ -191,4 +156,4 @@ std::vector<NKikimr::NOlap::TWritePortionInfoWithBlobsResult> TMerger::Execute(c
191
156
return result;
192
157
}
193
158
194
- }
159
+ } // namespace NKikimr::NOlap::NCompaction
0 commit comments