Skip to content

separate merger logic for make new one #6931

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/formats/arrow/common/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class IChunkedArray {
}
idx = nextIdx;
}
} else if (position < chunkCurrent->GetStartPosition()) {
} else {
AFL_VERIFY(chunkCurrent->GetChunkIndex() > 0);
ui64 idx = chunkCurrent->GetStartPosition();
for (i32 i = chunkCurrent->GetChunkIndex() - 1; i >= 0; --i) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include "merge_context.h"
#include "merger.h"

namespace NKikimr::NOlap::NCompaction {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#pragma once
#include <ydb/core/tx/columnshard/engines/changes/compaction/common/result.h>
#include <ydb/core/tx/columnshard/engines/changes/compaction/common/context.h>

namespace NKikimr::NOlap::NCompaction {
class IColumnMerger {
private:
bool Started = false;

virtual std::vector<TColumnPortionResult> DoExecute(
const NCompaction::TColumnMergeContext& context, const arrow::UInt16Array& pIdxArray, const arrow::UInt32Array& pRecordIdxArray) = 0;
virtual void DoStart(const std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>>& input) = 0;

public:
static inline const TString PortionIdFieldName = "$$__portion_id";
static inline const TString PortionRecordIndexFieldName = "$$__portion_record_idx";
static inline const std::shared_ptr<arrow::Field> PortionIdField =
std::make_shared<arrow::Field>(PortionIdFieldName, std::make_shared<arrow::UInt16Type>());
static inline const std::shared_ptr<arrow::Field> PortionRecordIndexField =
std::make_shared<arrow::Field>(PortionRecordIndexFieldName, std::make_shared<arrow::UInt32Type>());

virtual ~IColumnMerger() = default;

void Start(const std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>>& input) {
AFL_VERIFY(!Started);
Started = true;
return DoStart(input);
}

std::vector<TColumnPortionResult> Execute(
const NCompaction::TColumnMergeContext& context, const std::shared_ptr<arrow::RecordBatch>& remap) {

auto columnPortionIdx = remap->GetColumnByName(IColumnMerger::PortionIdFieldName);
auto columnPortionRecordIdx = remap->GetColumnByName(IColumnMerger::PortionRecordIndexFieldName);
Y_ABORT_UNLESS(columnPortionIdx && columnPortionRecordIdx);
Y_ABORT_UNLESS(columnPortionIdx->type_id() == arrow::UInt16Type::type_id);
Y_ABORT_UNLESS(columnPortionRecordIdx->type_id() == arrow::UInt32Type::type_id);
const arrow::UInt16Array& pIdxArray = static_cast<const arrow::UInt16Array&>(*columnPortionIdx);
const arrow::UInt32Array& pRecordIdxArray = static_cast<const arrow::UInt32Array&>(*columnPortionRecordIdx);

AFL_VERIFY(remap->num_rows() == pIdxArray.length());
AFL_VERIFY(remap->num_rows() == pRecordIdxArray.length());

return DoExecute(context, pIdxArray, pRecordIdxArray);
}
};

} // namespace NKikimr::NOlap::NCompaction
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
LIBRARY()

SRCS(
merger.cpp
)

PEERDIR(
ydb/core/tx/columnshard/engines/changes/compaction/common
)

END()
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#include "context.h"

namespace NKikimr::NOlap::NCompaction {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#include "result.h"
#include <util/string/builder.h>

namespace NKikimr::NOlap::NCompaction {

TString TColumnPortionResult::DebugString() const {
return TStringBuilder() << "chunks=" << Chunks.size() << ";";
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once
#include <ydb/core/tx/columnshard/splitter/abstract/chunks.h>

namespace NKikimr::NOlap::NCompaction {

class TColumnPortionResult {
protected:
std::vector<std::shared_ptr<IPortionDataChunk>> Chunks;
const ui32 ColumnId;
public:

TColumnPortionResult(const ui32 columnId)
: ColumnId(columnId) {

}

const std::vector<std::shared_ptr<IPortionDataChunk>>& GetChunks() const {
return Chunks;
}

TString DebugString() const;

};

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
LIBRARY()

SRCS(
context.cpp
result.cpp
)

PEERDIR(
ydb/core/tx/columnshard/engines/scheme
)

END()
89 changes: 27 additions & 62 deletions ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp
Original file line number Diff line number Diff line change
@@ -1,34 +1,25 @@
#include "merger.h"

#include "column_cursor.h"
#include "column_portion_chunk.h"
#include "merge_context.h"
#include "merged_column.h"
#include "abstract/merger.h"
#include "plain/logic.h"

#include <ydb/core/formats/arrow/reader/merger.h>
#include <ydb/core/formats/arrow/serializer/native.h>
#include <ydb/core/formats/arrow/simple_builder/array.h>
#include <ydb/core/formats/arrow/simple_builder/filler.h>
#include <ydb/core/formats/arrow/serializer/native.h>
#include <ydb/core/tx/columnshard/splitter/batch_slice.h>

namespace NKikimr::NOlap::NCompaction {

std::vector<NKikimr::NOlap::TWritePortionInfoWithBlobsResult> TMerger::Execute(const std::shared_ptr<TSerializationStats>& stats,
const NArrow::NMerger::TIntervalPositions& checkPoints, const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered,
const ui64 pathId, const std::optional<ui64> shardingActualVersion) {
const NArrow::NMerger::TIntervalPositions& checkPoints, const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered, const ui64 pathId,
const std::optional<ui64> shardingActualVersion) {
AFL_VERIFY(Batches.size() == Filters.size());
static const TString portionIdFieldName = "$$__portion_id";
static const TString portionRecordIndexFieldName = "$$__portion_record_idx";
static const std::shared_ptr<arrow::Field> portionIdField =
std::make_shared<arrow::Field>(portionIdFieldName, std::make_shared<arrow::UInt16Type>());
static const std::shared_ptr<arrow::Field> portionRecordIndexField =
std::make_shared<arrow::Field>(portionRecordIndexFieldName, std::make_shared<arrow::UInt32Type>());

std::vector<std::shared_ptr<arrow::RecordBatch>> batchResults;
{
arrow::FieldVector indexFields;
indexFields.emplace_back(portionIdField);
indexFields.emplace_back(portionRecordIndexField);
indexFields.emplace_back(IColumnMerger::PortionIdField);
indexFields.emplace_back(IColumnMerger::PortionRecordIndexField);
IIndexInfo::AddSpecialFields(indexFields);
auto dataSchema = std::make_shared<arrow::Schema>(indexFields);
NArrow::NMerger::TMergePartialStream mergeStream(
Expand All @@ -39,40 +30,40 @@ std::vector<NKikimr::NOlap::TWritePortionInfoWithBlobsResult> TMerger::Execute(c
{
NArrow::NConstruction::IArrayBuilder::TPtr column =
std::make_shared<NArrow::NConstruction::TSimpleArrayConstructor<NArrow::NConstruction::TIntConstFiller<arrow::UInt16Type>>>(
portionIdFieldName, idx);
batch->AddField(portionIdField, column->BuildArray(batch->num_rows())).Validate();
IColumnMerger::PortionIdFieldName, idx);
batch->AddField(IColumnMerger::PortionIdField, column->BuildArray(batch->num_rows())).Validate();
}
{
NArrow::NConstruction::IArrayBuilder::TPtr column =
std::make_shared<NArrow::NConstruction::TSimpleArrayConstructor<NArrow::NConstruction::TIntSeqFiller<arrow::UInt32Type>>>(
portionRecordIndexFieldName);
batch->AddField(portionRecordIndexField, column->BuildArray(batch->num_rows())).Validate();
IColumnMerger::PortionRecordIndexFieldName);
batch->AddField(IColumnMerger::PortionRecordIndexField, column->BuildArray(batch->num_rows())).Validate();
}
mergeStream.AddSource(batch, Filters[idx]);
++idx;
}
batchResults = mergeStream.DrainAllParts(checkPoints, indexFields);
}

std::vector<std::map<ui32, std::vector<NCompaction::TColumnPortionResult>>> chunkGroups;
std::vector<std::map<ui32, std::vector<TColumnPortionResult>>> chunkGroups;
chunkGroups.resize(batchResults.size());
for (auto&& columnId : resultFiltered->GetColumnIds()) {
NActors::TLogContextGuard logGuard(
NActors::TLogContextBuilder::Build()("field_name", resultFiltered->GetIndexInfo().GetColumnName(columnId)));
auto columnInfo = stats->GetColumnInfo(columnId);
auto resultField = resultFiltered->GetIndexInfo().GetColumnFieldVerified(columnId);
std::shared_ptr<IColumnMerger> merger = std::make_shared<TPlainMerger>();
// resultFiltered->BuildColumnMergerVerified(columnId);

std::vector<NCompaction::TPortionColumnCursor> cursors;
{
ui32 idx = 0;
std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> parts;
for (auto&& p : Batches) {
cursors.emplace_back(NCompaction::TPortionColumnCursor(p->GetColumnVerified(resultFiltered->GetFieldIndex(columnId)), idx));
++idx;
parts.emplace_back(p->GetColumnVerified(resultFiltered->GetFieldIndex(columnId)));
}

merger->Start(parts);
}

ui32 batchesRecordsCount = 0;
ui32 columnRecordsCount = 0;
std::map<std::string, std::vector<NCompaction::TColumnPortionResult>> columnChunks;
ui32 batchIdx = 0;
for (auto&& batchResult : batchResults) {
Expand All @@ -92,42 +83,10 @@ std::vector<NKikimr::NOlap::TWritePortionInfoWithBlobsResult> TMerger::Execute(c

NCompaction::TColumnMergeContext context(columnId, resultFiltered, portionRecordsCountLimit,
NSplitter::TSplitSettings().GetExpectedUnpackColumnChunkRawSize(), columnInfo, externalSaver);
NCompaction::TMergedColumn mColumn(context);

auto columnPortionIdx = batchResult->GetColumnByName(portionIdFieldName);
auto columnPortionRecordIdx = batchResult->GetColumnByName(portionRecordIndexFieldName);
auto columnSnapshotPlanStepIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP);
auto columnSnapshotTxIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID);
Y_ABORT_UNLESS(columnPortionIdx && columnPortionRecordIdx && columnSnapshotPlanStepIdx && columnSnapshotTxIdx);
Y_ABORT_UNLESS(columnPortionIdx->type_id() == arrow::UInt16Type::type_id);
Y_ABORT_UNLESS(columnPortionRecordIdx->type_id() == arrow::UInt32Type::type_id);
Y_ABORT_UNLESS(columnSnapshotPlanStepIdx->type_id() == arrow::UInt64Type::type_id);
Y_ABORT_UNLESS(columnSnapshotTxIdx->type_id() == arrow::UInt64Type::type_id);
const arrow::UInt16Array& pIdxArray = static_cast<const arrow::UInt16Array&>(*columnPortionIdx);
const arrow::UInt32Array& pRecordIdxArray = static_cast<const arrow::UInt32Array&>(*columnPortionRecordIdx);

AFL_VERIFY(batchResult->num_rows() == pIdxArray.length());
std::optional<ui16> predPortionIdx;
for (ui32 idx = 0; idx < pIdxArray.length(); ++idx) {
const ui16 portionIdx = pIdxArray.Value(idx);
const ui32 portionRecordIdx = pRecordIdxArray.Value(idx);
auto& cursor = cursors[portionIdx];
cursor.Next(portionRecordIdx, mColumn);
if (predPortionIdx && portionIdx != *predPortionIdx) {
cursors[*predPortionIdx].Fetch(mColumn);
}
if (idx + 1 == pIdxArray.length()) {
cursor.Fetch(mColumn);
}
predPortionIdx = portionIdx;
}
chunkGroups[batchIdx][columnId] = mColumn.BuildResult();
batchesRecordsCount += batchResult->num_rows();
columnRecordsCount += mColumn.GetRecordsCount();
AFL_VERIFY(batchResult->num_rows() == mColumn.GetRecordsCount());

chunkGroups[batchIdx][columnId] = merger->Execute(context, batchResult);
++batchIdx;
}
AFL_VERIFY(columnRecordsCount == batchesRecordsCount)("mCount", columnRecordsCount)("bCount", batchesRecordsCount);
}
ui32 batchIdx = 0;

Expand All @@ -149,6 +108,12 @@ std::vector<NKikimr::NOlap::TWritePortionInfoWithBlobsResult> TMerger::Execute(c
AFL_VERIFY(i.second.size() == columnChunks.begin()->second.size())("first", columnChunks.begin()->second.size())(
"current", i.second.size())("first_name", columnChunks.begin()->first)("current_name", i.first);
}
auto columnSnapshotPlanStepIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP);
auto columnSnapshotTxIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID);
Y_ABORT_UNLESS(columnSnapshotPlanStepIdx);
Y_ABORT_UNLESS(columnSnapshotTxIdx);
Y_ABORT_UNLESS(columnSnapshotPlanStepIdx->type_id() == arrow::UInt64Type::type_id);
Y_ABORT_UNLESS(columnSnapshotTxIdx->type_id() == arrow::UInt64Type::type_id);

std::vector<TGeneralSerializedSlice> batchSlices;
std::shared_ptr<TDefaultSchemaDetails> schemaDetails(new TDefaultSchemaDetails(resultFiltered, stats));
Expand Down Expand Up @@ -191,4 +156,4 @@ std::vector<NKikimr::NOlap::TWritePortionInfoWithBlobsResult> TMerger::Execute(c
return result;
}

}
} // namespace NKikimr::NOlap::NCompaction
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ namespace NKikimr::NOlap::NCompaction {

bool TPortionColumnCursor::Fetch(TMergedColumn& column) {
Y_ABORT_UNLESS(RecordIndexStart);
// NActors::TLogContextGuard lg(NActors::TLogContextBuilder::Build()("portion_id", PortionId));
if (CurrentChunk && CurrentChunk->GetStartPosition() <= *RecordIndexStart && *RecordIndexStart < CurrentChunk->GetFinishPosition()) {

} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ class TPortionColumnCursor {
std::shared_ptr<NArrow::NAccessor::IChunkedArray> BlobChunks;
std::optional<ui32> RecordIndexStart;
YDB_READONLY(ui32, RecordIndexFinish, 0);
const ui64 PortionId;

public:
~TPortionColumnCursor() {
AFL_VERIFY(!RecordIndexStart)("start", RecordIndexStart)("finish", RecordIndexFinish);
Expand All @@ -24,10 +22,8 @@ class TPortionColumnCursor {

bool Fetch(TMergedColumn& column);

TPortionColumnCursor(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& columnChunks, const ui64 portionId)
: BlobChunks(columnChunks)
, PortionId(portionId) {
Y_UNUSED(PortionId);
TPortionColumnCursor(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& columnChunks)
: BlobChunks(columnChunks) {
}
};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,45 +1,16 @@
#pragma once
#include "merge_context.h"
#include <ydb/core/formats/arrow/simple_arrays_cache.h>
#include <ydb/core/tx/columnshard/splitter/chunks.h>
#include <ydb/core/tx/columnshard/counters/splitter.h>
#include <ydb/core/tx/columnshard/engines/changes/compaction/common/context.h>
#include <ydb/core/tx/columnshard/engines/changes/compaction/common/result.h>
#include <ydb/core/tx/columnshard/engines/portions/column_record.h>
#include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h>
#include <ydb/core/tx/columnshard/engines/scheme/column_features.h>
#include <ydb/core/tx/columnshard/counters/splitter.h>
#include <ydb/core/tx/columnshard/splitter/chunk_meta.h>
#include <ydb/core/tx/columnshard/splitter/chunks.h>

namespace NKikimr::NOlap::NCompaction {

class TColumnPortionResult {
protected:
std::vector<std::shared_ptr<IPortionDataChunk>> Chunks;
ui64 CurrentPortionRecords = 0;
const ui32 ColumnId;
ui64 PackedSize = 0;
public:
ui64 GetPackedSize() const {
return PackedSize;
}

TColumnPortionResult(const ui32 columnId)
: ColumnId(columnId) {

}

const std::vector<std::shared_ptr<IPortionDataChunk>>& GetChunks() const {
return Chunks;
}

ui64 GetCurrentPortionRecords() const {
return CurrentPortionRecords;
}

TString DebugString() const {
return TStringBuilder() << "chunks=" << Chunks.size() << ";records=" << CurrentPortionRecords << ";";
}

};

class TColumnPortion: public TColumnPortionResult {
private:
using TBase = TColumnPortionResult;
Expand All @@ -49,12 +20,14 @@ class TColumnPortion: public TColumnPortionResult {
YDB_READONLY(ui64, CurrentChunkRawSize, 0);
double PredictedPackedBytes = 0;
const TSimpleColumnInfo ColumnInfo;
ui64 PackedSize = 0;
ui64 CurrentPortionRecords = 0;

public:
TColumnPortion(const TColumnMergeContext& context)
: TBase(context.GetColumnId())
, Context(context)
, ColumnInfo(Context.GetIndexInfo().GetColumnFeaturesVerified(context.GetColumnId()))
{
, ColumnInfo(Context.GetIndexInfo().GetColumnFeaturesVerified(context.GetColumnId())) {
Builder = Context.MakeBuilder();
Type = Builder->type();
}
Expand All @@ -70,4 +43,4 @@ class TColumnPortion: public TColumnPortionResult {
ui32 AppendSlice(const std::shared_ptr<arrow::Array>& a, const ui32 startIndex, const ui32 length);
};

}
} // namespace NKikimr::NOlap::NCompaction
Loading
Loading