Skip to content

Commit b8fbb10

Browse files
Merge 03f1b29 into fa53e40
2 parents fa53e40 + 03f1b29 commit b8fbb10

24 files changed

+405
-472
lines changed

ydb/core/formats/arrow/common/accessor.h

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,16 @@ class IChunkedArray {
9191

9292
template <class TChunkAccessor>
9393
TCurrentChunkAddress SelectChunk(const std::optional<TCurrentChunkAddress>& chunkCurrent, const ui64 position, const TChunkAccessor& accessor) const {
94-
if (!chunkCurrent || position >= chunkCurrent->GetStartPosition() + chunkCurrent->GetLength()) {
94+
if (!chunkCurrent || position >= chunkCurrent->GetStartPosition()) {
9595
ui32 startIndex = 0;
9696
ui64 idx = 0;
9797
if (chunkCurrent) {
98-
AFL_VERIFY(chunkCurrent->GetChunkIndex() + 1 < accessor.GetChunksCount());
99-
startIndex = chunkCurrent->GetChunkIndex() + 1;
100-
idx = chunkCurrent->GetStartPosition() + chunkCurrent->GetLength();
98+
if (position < chunkCurrent->GetFinishPosition()) {
99+
return *chunkCurrent;
100+
}
101+
AFL_VERIFY(chunkCurrent->GetChunkIndex() < accessor.GetChunksCount());
102+
startIndex = chunkCurrent->GetChunkIndex();
103+
idx = chunkCurrent->GetStartPosition();
101104
}
102105
for (ui32 i = startIndex; i < accessor.GetChunksCount(); ++i) {
103106
const ui64 nextIdx = idx + accessor.GetChunkLength(i);

ydb/core/kqp/ut/olap/sys_view_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
242242
ui64 rawBytes2;
243243
ui64 bytes2;
244244
helper.GetVolumes(rawBytes2, bytes2, false, {"new_column_ui64"});
245-
AFL_VERIFY(rawBytes2 == 6500041)("real", rawBytes2);
245+
AFL_VERIFY(rawBytes2 == 6500023)("real", rawBytes2);
246246
AFL_VERIFY(bytes2 == 45360)("b", bytes2);
247247
}
248248
}

ydb/core/protos/config.proto

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1497,7 +1497,6 @@ message TColumnShardConfig {
14971497
optional bool TTLEnabled = 6 [default = true];
14981498
optional bool WritingEnabled = 7 [default = true];
14991499
optional uint32 WritingBufferDurationMs = 8 [default = 0];
1500-
optional bool UseChunkedMergeOnCompaction = 9 [default = true];
15011500
optional uint64 CompactionMemoryLimit = 10 [default = 536870912];
15021501
optional uint64 TieringsMemoryLimit = 11 [default = 536870912];
15031502
message TIndexMetadataMemoryLimit {

ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.cpp

Lines changed: 18 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -4,35 +4,31 @@
44
namespace NKikimr::NOlap::NCompaction {
55

66
bool TPortionColumnCursor::Fetch(TMergedColumn& column) {
7-
Y_ABORT_UNLESS(ChunkIdx < ColumnChunks.size());
87
Y_ABORT_UNLESS(RecordIndexStart);
9-
ui32 currentStartPortionIdx = *RecordIndexStart;
10-
ui32 currentFinishPortionIdx = RecordIndexFinish;
118
// NActors::TLogContextGuard lg(NActors::TLogContextBuilder::Build()("portion_id", PortionId));
12-
while (currentStartPortionIdx - ChunkRecordIndexStartPosition >= CurrentChunkRecordsCount) {
13-
if (!NextChunk()) {
14-
return false;
15-
}
9+
if (CurrentChunk && CurrentChunk->GetStartPosition() <= *RecordIndexStart && *RecordIndexStart < CurrentChunk->GetFinishPosition()) {
10+
11+
} else {
12+
CurrentChunk = BlobChunks->GetChunk(CurrentChunk, *RecordIndexStart);
1613
}
1714

18-
ui32 currentStart = currentStartPortionIdx - ChunkRecordIndexStartPosition;
19-
while (currentFinishPortionIdx - ChunkRecordIndexStartPosition >= CurrentChunkRecordsCount) {
20-
const ui32 currentFinish = CurrentChunkRecordsCount;
21-
// if (currentStart == 0 && CurrentColumnChunk) {
22-
// column.AppendBlob(CurrentBlobChunk->GetData(), *CurrentColumnChunk);
23-
// } else {
24-
column.AppendSlice(GetCurrentArray(), currentStart, currentFinish - currentStart);
25-
// }
26-
currentStart = 0;
27-
if (!NextChunk()) {
28-
return false;
15+
ui32 currentStart = *RecordIndexStart;
16+
while (RecordIndexFinish >= CurrentChunk->GetFinishPosition()) {
17+
column.AppendSlice(
18+
CurrentChunk->GetArray(), currentStart - CurrentChunk->GetStartPosition(), CurrentChunk->GetFinishPosition() - currentStart);
19+
currentStart = CurrentChunk->GetFinishPosition();
20+
if (currentStart < BlobChunks->GetRecordsCount()) {
21+
CurrentChunk = BlobChunks->GetChunk(CurrentChunk, currentStart);
22+
} else {
23+
CurrentChunk.reset();
24+
break;
2925
}
3026
}
3127

32-
const ui32 currentFinish = currentFinishPortionIdx - ChunkRecordIndexStartPosition;
33-
if (currentStart < currentFinish) {
34-
Y_ABORT_UNLESS(currentFinish < CurrentChunkRecordsCount);
35-
column.AppendSlice(GetCurrentArray(), currentStart, currentFinish - currentStart);
28+
if (currentStart < RecordIndexFinish) {
29+
AFL_VERIFY(CurrentChunk);
30+
Y_ABORT_UNLESS(RecordIndexFinish < CurrentChunk->GetFinishPosition());
31+
column.AppendSlice(CurrentChunk->GetArray(), currentStart - CurrentChunk->GetStartPosition(), RecordIndexFinish - currentStart);
3632
}
3733

3834
RecordIndexStart.reset();
@@ -41,7 +37,6 @@ bool TPortionColumnCursor::Fetch(TMergedColumn& column) {
4137
}
4238

4339
bool TPortionColumnCursor::Next(const ui32 portionRecordIdx, TMergedColumn& column) {
44-
Y_ABORT_UNLESS(ChunkRecordIndexStartPosition <= portionRecordIdx);
4540
if (!RecordIndexStart) {
4641
RecordIndexStart = portionRecordIdx;
4742
RecordIndexFinish = portionRecordIdx + 1;
@@ -55,29 +50,4 @@ bool TPortionColumnCursor::Next(const ui32 portionRecordIdx, TMergedColumn& colu
5550
return true;
5651
}
5752

58-
bool TPortionColumnCursor::NextChunk() {
59-
CurrentArray = nullptr;
60-
if (++ChunkIdx == ColumnChunks.size()) {
61-
return false;
62-
} else {
63-
ChunkRecordIndexStartPosition += CurrentChunkRecordsCount;
64-
CurrentBlobChunk = BlobChunks[ChunkIdx];
65-
CurrentColumnChunk = ColumnChunks[ChunkIdx];
66-
CurrentChunkRecordsCount = CurrentBlobChunk->GetRecordsCountVerified();
67-
return true;
68-
}
69-
}
70-
71-
const std::shared_ptr<arrow::Array>& TPortionColumnCursor::GetCurrentArray() {
72-
Y_ABORT_UNLESS(ChunkIdx < ColumnChunks.size());
73-
Y_ABORT_UNLESS(CurrentBlobChunk);
74-
75-
if (!CurrentArray) {
76-
auto res = NArrow::TStatusValidator::GetValid(ColumnLoader->Apply(CurrentBlobChunk->GetData()));
77-
AFL_VERIFY(res->num_columns() == 1);
78-
CurrentArray = res->column(0);
79-
}
80-
return CurrentArray;
81-
}
82-
8353
}

ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,46 +9,25 @@ namespace NKikimr::NOlap::NCompaction {
99

1010
class TPortionColumnCursor {
1111
private:
12-
std::vector<std::shared_ptr<IPortionDataChunk>> BlobChunks;
13-
std::vector<const TColumnRecord*> ColumnChunks;
12+
std::optional<NArrow::NAccessor::IChunkedArray::TCurrentChunkAddress> CurrentChunk;
13+
std::shared_ptr<NArrow::NAccessor::IChunkedArray> BlobChunks;
1414
std::optional<ui32> RecordIndexStart;
1515
YDB_READONLY(ui32, RecordIndexFinish, 0);
16-
ui32 ChunkRecordIndexStartPosition = 0;
17-
ui32 ChunkIdx = 0;
18-
std::shared_ptr<IPortionDataChunk> CurrentBlobChunk;
19-
const TColumnRecord* CurrentColumnChunk = nullptr;
20-
ui32 CurrentChunkRecordsCount = 0;
21-
std::shared_ptr<arrow::Array> CurrentArray;
22-
std::shared_ptr<TColumnLoader> ColumnLoader;
2316
const ui64 PortionId;
2417

25-
const std::shared_ptr<arrow::Array>& GetCurrentArray();
26-
27-
bool NextChunk();
28-
2918
public:
3019
~TPortionColumnCursor() {
31-
AFL_VERIFY(!RecordIndexStart || ChunkIdx == ColumnChunks.size())("chunk", ChunkIdx)
32-
("size", ColumnChunks.size())("start", RecordIndexStart)("finish", RecordIndexFinish)
33-
("max", CurrentBlobChunk->GetRecordsCount())("current_start_position", ChunkRecordIndexStartPosition);
20+
AFL_VERIFY(!RecordIndexStart)("start", RecordIndexStart)("finish", RecordIndexFinish);
3421
}
3522

3623
bool Next(const ui32 portionRecordIdx, TMergedColumn& column);
3724

3825
bool Fetch(TMergedColumn& column);
3926

40-
TPortionColumnCursor(const std::vector<std::shared_ptr<IPortionDataChunk>>& columnChunks, const std::vector<const TColumnRecord*>& records, const std::shared_ptr<TColumnLoader>& loader, const ui64 portionId)
27+
TPortionColumnCursor(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& columnChunks, const ui64 portionId)
4128
: BlobChunks(columnChunks)
42-
, ColumnChunks(records)
43-
, ColumnLoader(loader)
4429
, PortionId(portionId) {
45-
AFL_VERIFY(ColumnLoader);
4630
Y_UNUSED(PortionId);
47-
Y_ABORT_UNLESS(BlobChunks.size());
48-
Y_ABORT_UNLESS(ColumnChunks.size() == BlobChunks.size());
49-
CurrentBlobChunk = BlobChunks.front();
50-
CurrentColumnChunk = ColumnChunks.front();
51-
CurrentChunkRecordsCount = CurrentBlobChunk->GetRecordsCountVerified();
5231
}
5332
};
5433

ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ ui32 TColumnPortion::AppendSlice(const std::shared_ptr<arrow::Array>& a, const u
3838
Y_ABORT_UNLESS(length);
3939
Y_ABORT_UNLESS(CurrentPortionRecords < Context.GetPortionRowsCountLimit());
4040
Y_ABORT_UNLESS(startIndex + length <= a->length());
41+
AFL_VERIFY(Type->id() == a->type_id())("own", Type->ToString())("a", a->type()->ToString());
4142
ui32 i = startIndex;
4243
const ui32 packedRecordSize = Context.GetColumnStat() ? Context.GetColumnStat()->GetPackedRecordSize() : 0;
4344
for (; i < startIndex + length; ++i) {

ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ class TColumnPortion: public TColumnPortionResult {
4444
private:
4545
using TBase = TColumnPortionResult;
4646
std::unique_ptr<arrow::ArrayBuilder> Builder;
47+
std::shared_ptr<arrow::DataType> Type;
4748
const TColumnMergeContext& Context;
4849
YDB_READONLY(ui64, CurrentChunkRawSize, 0);
4950
double PredictedPackedBytes = 0;
@@ -55,6 +56,7 @@ class TColumnPortion: public TColumnPortionResult {
5556
, ColumnInfo(Context.GetIndexInfo().GetColumnFeaturesVerified(context.GetColumnId()))
5657
{
5758
Builder = Context.MakeBuilder();
59+
Type = Builder->type();
5860
}
5961

6062
bool IsFullPortion() const {

ydb/core/tx/columnshard/engines/changes/compaction/merge_context.h

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ class TColumnMergeContext {
2323
YDB_READONLY(bool, UseWholeChunksOptimization, true);
2424

2525
std::optional<TColumnSerializationStat> ColumnStat;
26+
2627
const TIndexInfo& IndexInfo;
2728
public:
2829
ISnapshotSchema::TPtr GetSchemaInfo() const {
@@ -41,8 +42,9 @@ class TColumnMergeContext {
4142
return IndexInfo;
4243
}
4344

44-
TColumnMergeContext(const ui32 columnId, const ISnapshotSchema::TPtr& schema, const ui32 portionRowsCountLimit, const ui32 chunkRawBytesLimit,
45-
const std::optional<TColumnSerializationStat>& columnStat)
45+
TColumnMergeContext(const ui32 columnId, const ISnapshotSchema::TPtr& schema, const ui32 portionRowsCountLimit,
46+
const ui32 chunkRawBytesLimit, const std::optional<TColumnSerializationStat>& columnStat,
47+
const NArrow::NSerialization::TSerializerContainer& overrideSerializer)
4648
: ColumnId(columnId)
4749
, SchemaInfo(schema)
4850
, Saver(schema->GetColumnSaver(columnId))
@@ -52,10 +54,12 @@ class TColumnMergeContext {
5254
, ChunkRawBytesLimit(chunkRawBytesLimit)
5355
, UseWholeChunksOptimization(!schema->GetIndexInfo().GetReplaceKey()->GetFieldByName(ResultField->name()))
5456
, ColumnStat(columnStat)
55-
, IndexInfo(schema->GetIndexInfo())
56-
{
57+
, IndexInfo(schema->GetIndexInfo()) {
5758
Y_ABORT_UNLESS(PortionRowsCountLimit);
5859
Y_ABORT_UNLESS(ChunkRawBytesLimit);
60+
if (!!overrideSerializer) {
61+
Saver.ResetSerializer(overrideSerializer);
62+
}
5963
}
6064
};
6165

0 commit comments

Comments
 (0)