Skip to content

Commit cd7b9e1

Browse files
correction
1 parent a7f5463 commit cd7b9e1

File tree

3 files changed

+18
-15
lines changed

3 files changed

+18
-15
lines changed

ydb/core/tx/columnshard/engines/changes/compaction/abstract/merger.h

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ class IColumnMerger {
88
bool Started = false;
99

1010
virtual std::vector<TColumnPortionResult> DoExecute(
11-
const NCompaction::TColumnMergeContext& context, const std::shared_ptr<arrow::RecordBatch>& remap) = 0;
11+
const NCompaction::TColumnMergeContext& context, const arrow::UInt16Array& pIdxArray, const arrow::UInt32Array& pRecordIdxArray) = 0;
1212
virtual void DoStart(const std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>>& input) = 0;
1313

1414
public:
@@ -29,7 +29,19 @@ class IColumnMerger {
2929

3030
std::vector<TColumnPortionResult> Execute(
3131
const NCompaction::TColumnMergeContext& context, const std::shared_ptr<arrow::RecordBatch>& remap) {
32-
return DoExecute(context, remap);
32+
33+
auto columnPortionIdx = remap->GetColumnByName(IColumnMerger::PortionIdFieldName);
34+
auto columnPortionRecordIdx = remap->GetColumnByName(IColumnMerger::PortionRecordIndexFieldName);
35+
Y_ABORT_UNLESS(columnPortionIdx && columnPortionRecordIdx);
36+
Y_ABORT_UNLESS(columnPortionIdx->type_id() == arrow::UInt16Type::type_id);
37+
Y_ABORT_UNLESS(columnPortionRecordIdx->type_id() == arrow::UInt32Type::type_id);
38+
const arrow::UInt16Array& pIdxArray = static_cast<const arrow::UInt16Array&>(*columnPortionIdx);
39+
const arrow::UInt32Array& pRecordIdxArray = static_cast<const arrow::UInt32Array&>(*columnPortionRecordIdx);
40+
41+
AFL_VERIFY(remap->num_rows() == pIdxArray.length());
42+
AFL_VERIFY(remap->num_rows() == pRecordIdxArray.length());
43+
44+
return DoExecute(context, pIdxArray, pRecordIdxArray);
3345
}
3446
};
3547

ydb/core/tx/columnshard/engines/changes/compaction/plain/logic.cpp

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,9 @@ void TPlainMerger::DoStart(const std::vector<std::shared_ptr<NArrow::NAccessor::
99
}
1010

1111
std::vector<NKikimr::NOlap::NCompaction::TColumnPortionResult> TPlainMerger::DoExecute(
12-
const NCompaction::TColumnMergeContext& context, const std::shared_ptr<arrow::RecordBatch>& remap) {
12+
const NCompaction::TColumnMergeContext& context, const arrow::UInt16Array& pIdxArray, const arrow::UInt32Array& pRecordIdxArray) {
1313
NCompaction::TMergedColumn mColumn(context);
1414

15-
auto columnPortionIdx = remap->GetColumnByName(IColumnMerger::PortionIdFieldName);
16-
auto columnPortionRecordIdx = remap->GetColumnByName(IColumnMerger::PortionRecordIndexFieldName);
17-
Y_ABORT_UNLESS(columnPortionIdx && columnPortionRecordIdx);
18-
Y_ABORT_UNLESS(columnPortionIdx->type_id() == arrow::UInt16Type::type_id);
19-
Y_ABORT_UNLESS(columnPortionRecordIdx->type_id() == arrow::UInt32Type::type_id);
20-
const arrow::UInt16Array& pIdxArray = static_cast<const arrow::UInt16Array&>(*columnPortionIdx);
21-
const arrow::UInt32Array& pRecordIdxArray = static_cast<const arrow::UInt32Array&>(*columnPortionRecordIdx);
22-
23-
AFL_VERIFY(remap->num_rows() == pIdxArray.length());
2415
std::optional<ui16> predPortionIdx;
2516
for (ui32 idx = 0; idx < pIdxArray.length(); ++idx) {
2617
const ui16 portionIdx = pIdxArray.Value(idx);
@@ -35,7 +26,7 @@ std::vector<NKikimr::NOlap::NCompaction::TColumnPortionResult> TPlainMerger::DoE
3526
}
3627
predPortionIdx = portionIdx;
3728
}
38-
AFL_VERIFY(remap->num_rows() == mColumn.GetRecordsCount());
29+
AFL_VERIFY(pIdxArray.length() == mColumn.GetRecordsCount());
3930
return mColumn.BuildResult();
4031
}
4132

ydb/core/tx/columnshard/engines/changes/compaction/plain/logic.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ class TPlainMerger: public IColumnMerger {
1010
std::vector<NCompaction::TPortionColumnCursor> Cursors;
1111
virtual void DoStart(const std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>>& input) override;
1212

13-
virtual std::vector<TColumnPortionResult> DoExecute(
14-
const NCompaction::TColumnMergeContext& context, const std::shared_ptr<arrow::RecordBatch>& remap) override;
13+
virtual std::vector<TColumnPortionResult> DoExecute(const NCompaction::TColumnMergeContext& context, const arrow::UInt16Array& pIdxArray,
14+
const arrow::UInt32Array& pRecordIdxArray) override;
1515

1616
public:
1717
};

0 commit comments

Comments
 (0)