Skip to content

unify indexation (for committed data) and compaction #6909

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions ydb/core/formats/arrow/common/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,16 @@ class IChunkedArray {

template <class TChunkAccessor>
TCurrentChunkAddress SelectChunk(const std::optional<TCurrentChunkAddress>& chunkCurrent, const ui64 position, const TChunkAccessor& accessor) const {
if (!chunkCurrent || position >= chunkCurrent->GetStartPosition() + chunkCurrent->GetLength()) {
if (!chunkCurrent || position >= chunkCurrent->GetStartPosition()) {
ui32 startIndex = 0;
ui64 idx = 0;
if (chunkCurrent) {
AFL_VERIFY(chunkCurrent->GetChunkIndex() + 1 < accessor.GetChunksCount());
startIndex = chunkCurrent->GetChunkIndex() + 1;
idx = chunkCurrent->GetStartPosition() + chunkCurrent->GetLength();
if (position < chunkCurrent->GetFinishPosition()) {
return *chunkCurrent;
}
AFL_VERIFY(chunkCurrent->GetChunkIndex() < accessor.GetChunksCount());
startIndex = chunkCurrent->GetChunkIndex();
idx = chunkCurrent->GetStartPosition();
}
for (ui32 i = startIndex; i < accessor.GetChunksCount(); ++i) {
const ui64 nextIdx = idx + accessor.GetChunkLength(i);
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/formats/arrow/reader/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,13 @@ void TMergePartialStream::DrainCurrentPosition(TRecordBatchBuilder* builder, std
SortHeap.CleanFinished();
}

std::vector<std::shared_ptr<arrow::RecordBatch>> TMergePartialStream::DrainAllParts(const std::map<TSortableBatchPosition, bool>& positions,
std::vector<std::shared_ptr<arrow::RecordBatch>> TMergePartialStream::DrainAllParts(const TIntervalPositions& positions,
const std::vector<std::shared_ptr<arrow::Field>>& resultFields)
{
std::vector<std::shared_ptr<arrow::RecordBatch>> result;
for (auto&& i : positions) {
TRecordBatchBuilder indexesBuilder(resultFields);
DrainCurrentTo(indexesBuilder, i.first, i.second);
DrainCurrentTo(indexesBuilder, i.GetPosition(), i.IsIncludedToLeftInterval());
result.emplace_back(indexesBuilder.Finalize());
if (result.back()->num_rows() == 0) {
result.pop_back();
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/formats/arrow/reader/merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class TMergePartialStream {
std::shared_ptr<arrow::Table> SingleSourceDrain(const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TCursor>* lastResultPosition = nullptr);
bool DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TCursor>* lastResultPosition = nullptr);
bool DrainToControlPoint(TRecordBatchBuilder& builder, const bool includeFinish, std::optional<TCursor>* lastResultPosition = nullptr);
std::vector<std::shared_ptr<arrow::RecordBatch>> DrainAllParts(const std::map<TSortableBatchPosition, bool>& positions,
std::vector<std::shared_ptr<arrow::RecordBatch>> DrainAllParts(const TIntervalPositions& positions,
const std::vector<std::shared_ptr<arrow::Field>>& resultFields);
};

Expand Down
72 changes: 72 additions & 0 deletions ydb/core/formats/arrow/reader/position.h
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,78 @@ class TSortableBatchPosition {

};

class TIntervalPosition {
private:
TSortableBatchPosition Position;
const bool LeftIntervalInclude;
public:
const TSortableBatchPosition& GetPosition() const {
return Position;
}
bool IsIncludedToLeftInterval() const {
return LeftIntervalInclude;
}
TIntervalPosition(TSortableBatchPosition&& position, const bool leftIntervalInclude)
: Position(std::move(position))
, LeftIntervalInclude(leftIntervalInclude) {

}

TIntervalPosition(const TSortableBatchPosition& position, const bool leftIntervalInclude)
: Position(position)
, LeftIntervalInclude(leftIntervalInclude) {

}

bool operator<(const TIntervalPosition& item) const {
std::partial_ordering cmp = Position.Compare(item.Position);
if (cmp == std::partial_ordering::equivalent) {
return (LeftIntervalInclude ? 1 : 0) < (item.LeftIntervalInclude ? 1 : 0);
}
return cmp == std::partial_ordering::less;
}

NJson::TJsonValue DebugJson() const {
NJson::TJsonValue result = NJson::JSON_MAP;
result.InsertValue("position", Position.DebugJson());
result.InsertValue("include", LeftIntervalInclude);
return result;
}
};

class TIntervalPositions {
private:
std::vector<TIntervalPosition> Positions;
public:
bool IsEmpty() const {
return Positions.empty();
}

std::vector<TIntervalPosition>::const_iterator begin() const {
return Positions.begin();
}

std::vector<TIntervalPosition>::const_iterator end() const {
return Positions.end();
}

void AddPosition(TSortableBatchPosition&& position, const bool includeLeftInterval) {
TIntervalPosition intervalPosition(std::move(position), includeLeftInterval);
if (Positions.size()) {
AFL_VERIFY(Positions.back() < intervalPosition)("back", Positions.back().DebugJson())("pos", intervalPosition.DebugJson());
}
Positions.emplace_back(std::move(intervalPosition));
}

void AddPosition(const TSortableBatchPosition& position, const bool includeLeftInterval) {
TIntervalPosition intervalPosition(position, includeLeftInterval);
if (Positions.size()) {
AFL_VERIFY(Positions.back() < intervalPosition)("back", Positions.back().DebugJson())("pos", intervalPosition.DebugJson());
}
Positions.emplace_back(std::move(intervalPosition));
}
};

class TRWSortableBatchPosition: public TSortableBatchPosition, public TMoveOnly {
private:
using TBase = TSortableBatchPosition;
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/ut/olap/sys_view_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,8 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
ui64 rawBytes2;
ui64 bytes2;
helper.GetVolumes(rawBytes2, bytes2, false, {"new_column_ui64"});
AFL_VERIFY(rawBytes2 == 6500041)("real", rawBytes2);
AFL_VERIFY(bytes2 == 45360)("b", bytes2);
AFL_VERIFY(rawBytes2 == 6500023)("real", rawBytes2);
AFL_VERIFY(bytes2 == 38880)("b", bytes2);
}
}

Expand Down
1 change: 0 additions & 1 deletion ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1497,7 +1497,6 @@ message TColumnShardConfig {
optional bool TTLEnabled = 6 [default = true];
optional bool WritingEnabled = 7 [default = true];
optional uint32 WritingBufferDurationMs = 8 [default = 0];
optional bool UseChunkedMergeOnCompaction = 9 [default = true];
optional uint64 CompactionMemoryLimit = 10 [default = 536870912];
optional uint64 TieringsMemoryLimit = 11 [default = 536870912];
message TIndexMetadataMemoryLimit {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,31 @@
namespace NKikimr::NOlap::NCompaction {

bool TPortionColumnCursor::Fetch(TMergedColumn& column) {
Y_ABORT_UNLESS(ChunkIdx < ColumnChunks.size());
Y_ABORT_UNLESS(RecordIndexStart);
ui32 currentStartPortionIdx = *RecordIndexStart;
ui32 currentFinishPortionIdx = RecordIndexFinish;
// NActors::TLogContextGuard lg(NActors::TLogContextBuilder::Build()("portion_id", PortionId));
while (currentStartPortionIdx - ChunkRecordIndexStartPosition >= CurrentChunkRecordsCount) {
if (!NextChunk()) {
return false;
}
if (CurrentChunk && CurrentChunk->GetStartPosition() <= *RecordIndexStart && *RecordIndexStart < CurrentChunk->GetFinishPosition()) {

} else {
CurrentChunk = BlobChunks->GetChunk(CurrentChunk, *RecordIndexStart);
}

ui32 currentStart = currentStartPortionIdx - ChunkRecordIndexStartPosition;
while (currentFinishPortionIdx - ChunkRecordIndexStartPosition >= CurrentChunkRecordsCount) {
const ui32 currentFinish = CurrentChunkRecordsCount;
// if (currentStart == 0 && CurrentColumnChunk) {
// column.AppendBlob(CurrentBlobChunk->GetData(), *CurrentColumnChunk);
// } else {
column.AppendSlice(GetCurrentArray(), currentStart, currentFinish - currentStart);
// }
currentStart = 0;
if (!NextChunk()) {
return false;
ui32 currentStart = *RecordIndexStart;
while (RecordIndexFinish >= CurrentChunk->GetFinishPosition()) {
column.AppendSlice(
CurrentChunk->GetArray(), currentStart - CurrentChunk->GetStartPosition(), CurrentChunk->GetFinishPosition() - currentStart);
currentStart = CurrentChunk->GetFinishPosition();
if (currentStart < BlobChunks->GetRecordsCount()) {
CurrentChunk = BlobChunks->GetChunk(CurrentChunk, currentStart);
} else {
CurrentChunk.reset();
break;
}
}

const ui32 currentFinish = currentFinishPortionIdx - ChunkRecordIndexStartPosition;
if (currentStart < currentFinish) {
Y_ABORT_UNLESS(currentFinish < CurrentChunkRecordsCount);
column.AppendSlice(GetCurrentArray(), currentStart, currentFinish - currentStart);
if (currentStart < RecordIndexFinish) {
AFL_VERIFY(CurrentChunk);
Y_ABORT_UNLESS(RecordIndexFinish < CurrentChunk->GetFinishPosition());
column.AppendSlice(CurrentChunk->GetArray(), currentStart - CurrentChunk->GetStartPosition(), RecordIndexFinish - currentStart);
}

RecordIndexStart.reset();
Expand All @@ -41,7 +37,6 @@ bool TPortionColumnCursor::Fetch(TMergedColumn& column) {
}

bool TPortionColumnCursor::Next(const ui32 portionRecordIdx, TMergedColumn& column) {
Y_ABORT_UNLESS(ChunkRecordIndexStartPosition <= portionRecordIdx);
if (!RecordIndexStart) {
RecordIndexStart = portionRecordIdx;
RecordIndexFinish = portionRecordIdx + 1;
Expand All @@ -55,29 +50,4 @@ bool TPortionColumnCursor::Next(const ui32 portionRecordIdx, TMergedColumn& colu
return true;
}

bool TPortionColumnCursor::NextChunk() {
CurrentArray = nullptr;
if (++ChunkIdx == ColumnChunks.size()) {
return false;
} else {
ChunkRecordIndexStartPosition += CurrentChunkRecordsCount;
CurrentBlobChunk = BlobChunks[ChunkIdx];
CurrentColumnChunk = ColumnChunks[ChunkIdx];
CurrentChunkRecordsCount = CurrentBlobChunk->GetRecordsCountVerified();
return true;
}
}

const std::shared_ptr<arrow::Array>& TPortionColumnCursor::GetCurrentArray() {
Y_ABORT_UNLESS(ChunkIdx < ColumnChunks.size());
Y_ABORT_UNLESS(CurrentBlobChunk);

if (!CurrentArray) {
auto res = NArrow::TStatusValidator::GetValid(ColumnLoader->Apply(CurrentBlobChunk->GetData()));
AFL_VERIFY(res->num_columns() == 1);
CurrentArray = res->column(0);
}
return CurrentArray;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,46 +9,25 @@ namespace NKikimr::NOlap::NCompaction {

class TPortionColumnCursor {
private:
std::vector<std::shared_ptr<IPortionDataChunk>> BlobChunks;
std::vector<const TColumnRecord*> ColumnChunks;
std::optional<NArrow::NAccessor::IChunkedArray::TCurrentChunkAddress> CurrentChunk;
std::shared_ptr<NArrow::NAccessor::IChunkedArray> BlobChunks;
std::optional<ui32> RecordIndexStart;
YDB_READONLY(ui32, RecordIndexFinish, 0);
ui32 ChunkRecordIndexStartPosition = 0;
ui32 ChunkIdx = 0;
std::shared_ptr<IPortionDataChunk> CurrentBlobChunk;
const TColumnRecord* CurrentColumnChunk = nullptr;
ui32 CurrentChunkRecordsCount = 0;
std::shared_ptr<arrow::Array> CurrentArray;
std::shared_ptr<TColumnLoader> ColumnLoader;
const ui64 PortionId;

const std::shared_ptr<arrow::Array>& GetCurrentArray();

bool NextChunk();

public:
~TPortionColumnCursor() {
AFL_VERIFY(!RecordIndexStart || ChunkIdx == ColumnChunks.size())("chunk", ChunkIdx)
("size", ColumnChunks.size())("start", RecordIndexStart)("finish", RecordIndexFinish)
("max", CurrentBlobChunk->GetRecordsCount())("current_start_position", ChunkRecordIndexStartPosition);
AFL_VERIFY(!RecordIndexStart)("start", RecordIndexStart)("finish", RecordIndexFinish);
}

bool Next(const ui32 portionRecordIdx, TMergedColumn& column);

bool Fetch(TMergedColumn& column);

TPortionColumnCursor(const std::vector<std::shared_ptr<IPortionDataChunk>>& columnChunks, const std::vector<const TColumnRecord*>& records, const std::shared_ptr<TColumnLoader>& loader, const ui64 portionId)
TPortionColumnCursor(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& columnChunks, const ui64 portionId)
: BlobChunks(columnChunks)
, ColumnChunks(records)
, ColumnLoader(loader)
, PortionId(portionId) {
AFL_VERIFY(ColumnLoader);
Y_UNUSED(PortionId);
Y_ABORT_UNLESS(BlobChunks.size());
Y_ABORT_UNLESS(ColumnChunks.size() == BlobChunks.size());
CurrentBlobChunk = BlobChunks.front();
CurrentColumnChunk = ColumnChunks.front();
CurrentChunkRecordsCount = CurrentBlobChunk->GetRecordsCountVerified();
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ ui32 TColumnPortion::AppendSlice(const std::shared_ptr<arrow::Array>& a, const u
Y_ABORT_UNLESS(length);
Y_ABORT_UNLESS(CurrentPortionRecords < Context.GetPortionRowsCountLimit());
Y_ABORT_UNLESS(startIndex + length <= a->length());
AFL_VERIFY(Type->id() == a->type_id())("own", Type->ToString())("a", a->type()->ToString());
ui32 i = startIndex;
const ui32 packedRecordSize = Context.GetColumnStat() ? Context.GetColumnStat()->GetPackedRecordSize() : 0;
for (; i < startIndex + length; ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class TColumnPortion: public TColumnPortionResult {
private:
using TBase = TColumnPortionResult;
std::unique_ptr<arrow::ArrayBuilder> Builder;
std::shared_ptr<arrow::DataType> Type;
const TColumnMergeContext& Context;
YDB_READONLY(ui64, CurrentChunkRawSize, 0);
double PredictedPackedBytes = 0;
Expand All @@ -55,6 +56,7 @@ class TColumnPortion: public TColumnPortionResult {
, ColumnInfo(Context.GetIndexInfo().GetColumnFeaturesVerified(context.GetColumnId()))
{
Builder = Context.MakeBuilder();
Type = Builder->type();
}

bool IsFullPortion() const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class TColumnMergeContext {
YDB_READONLY(bool, UseWholeChunksOptimization, true);

std::optional<TColumnSerializationStat> ColumnStat;

const TIndexInfo& IndexInfo;
public:
ISnapshotSchema::TPtr GetSchemaInfo() const {
Expand All @@ -41,8 +42,9 @@ class TColumnMergeContext {
return IndexInfo;
}

TColumnMergeContext(const ui32 columnId, const ISnapshotSchema::TPtr& schema, const ui32 portionRowsCountLimit, const ui32 chunkRawBytesLimit,
const std::optional<TColumnSerializationStat>& columnStat)
TColumnMergeContext(const ui32 columnId, const ISnapshotSchema::TPtr& schema, const ui32 portionRowsCountLimit,
const ui32 chunkRawBytesLimit, const std::optional<TColumnSerializationStat>& columnStat,
const NArrow::NSerialization::TSerializerContainer& overrideSerializer)
: ColumnId(columnId)
, SchemaInfo(schema)
, Saver(schema->GetColumnSaver(columnId))
Expand All @@ -52,10 +54,12 @@ class TColumnMergeContext {
, ChunkRawBytesLimit(chunkRawBytesLimit)
, UseWholeChunksOptimization(!schema->GetIndexInfo().GetReplaceKey()->GetFieldByName(ResultField->name()))
, ColumnStat(columnStat)
, IndexInfo(schema->GetIndexInfo())
{
, IndexInfo(schema->GetIndexInfo()) {
Y_ABORT_UNLESS(PortionRowsCountLimit);
Y_ABORT_UNLESS(ChunkRawBytesLimit);
if (!!overrideSerializer) {
Saver.ResetSerializer(overrideSerializer);
}
}
};

Expand Down
Loading
Loading