From 831f41c901b891fa15174730688bf1bdd47fbc99 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Sun, 21 Jul 2024 17:33:41 +0300 Subject: [PATCH 1/4] unify indexation (for committed data) and compaction --- ydb/core/formats/arrow/common/accessor.h | 8 +- ydb/core/protos/config.proto | 1 - .../changes/compaction/column_cursor.cpp | 66 +--- .../changes/compaction/column_cursor.h | 29 +- .../compaction/column_portion_chunk.cpp | 1 + .../changes/compaction/column_portion_chunk.h | 2 + .../engines/changes/compaction/merger.cpp | 181 ++++++++++ .../engines/changes/compaction/merger.h | 47 +++ .../engines/changes/compaction/ya.make | 1 + .../engines/changes/general_compaction.cpp | 316 ++++-------------- .../engines/changes/general_compaction.h | 1 - .../engines/changes/indexation.cpp | 80 ++--- .../engines/portions/portion_info.cpp | 15 +- .../engines/portions/portion_info.h | 2 +- .../engines/portions/read_with_blobs.cpp | 21 +- .../engines/portions/read_with_blobs.h | 2 +- .../engines/scheme/abstract/index_info.h | 8 + .../columnshard/engines/scheme/index_info.cpp | 7 +- .../columnshard/engines/scheme/index_info.h | 5 + .../engines/scheme/versions/filtered_scheme.h | 2 +- 20 files changed, 375 insertions(+), 420 deletions(-) create mode 100644 ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp create mode 100644 ydb/core/tx/columnshard/engines/changes/compaction/merger.h diff --git a/ydb/core/formats/arrow/common/accessor.h b/ydb/core/formats/arrow/common/accessor.h index 702f13fcc6f4..cfadfff13a7f 100644 --- a/ydb/core/formats/arrow/common/accessor.h +++ b/ydb/core/formats/arrow/common/accessor.h @@ -91,13 +91,13 @@ class IChunkedArray { template TCurrentChunkAddress SelectChunk(const std::optional& chunkCurrent, const ui64 position, const TChunkAccessor& accessor) const { - if (!chunkCurrent || position >= chunkCurrent->GetStartPosition() + chunkCurrent->GetLength()) { + if (!chunkCurrent || position >= chunkCurrent->GetStartPosition()) { ui32 startIndex = 0; ui64 idx = 0; if (chunkCurrent) { - AFL_VERIFY(chunkCurrent->GetChunkIndex() + 1 < accessor.GetChunksCount()); - startIndex = chunkCurrent->GetChunkIndex() + 1; - idx = chunkCurrent->GetStartPosition() + chunkCurrent->GetLength(); + AFL_VERIFY(chunkCurrent->GetChunkIndex() < accessor.GetChunksCount()); + startIndex = chunkCurrent->GetChunkIndex(); + idx = chunkCurrent->GetStartPosition(); } for (ui32 i = startIndex; i < accessor.GetChunksCount(); ++i) { const ui64 nextIdx = idx + accessor.GetChunkLength(i); diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 6697efab5001..06c8662821d4 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1497,7 +1497,6 @@ message TColumnShardConfig { optional bool TTLEnabled = 6 [default = true]; optional bool WritingEnabled = 7 [default = true]; optional uint32 WritingBufferDurationMs = 8 [default = 0]; - optional bool UseChunkedMergeOnCompaction = 9 [default = true]; optional uint64 CompactionMemoryLimit = 10 [default = 536870912]; optional uint64 TieringsMemoryLimit = 11 [default = 536870912]; message TIndexMetadataMemoryLimit { diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.cpp index cdb81296cf73..c99d6b3200f0 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.cpp @@ -4,35 +4,27 @@ namespace NKikimr::NOlap::NCompaction { bool TPortionColumnCursor::Fetch(TMergedColumn& column) { - Y_ABORT_UNLESS(ChunkIdx < ColumnChunks.size()); Y_ABORT_UNLESS(RecordIndexStart); - ui32 currentStartPortionIdx = *RecordIndexStart; - ui32 currentFinishPortionIdx = RecordIndexFinish; // NActors::TLogContextGuard lg(NActors::TLogContextBuilder::Build()("portion_id", PortionId)); - while (currentStartPortionIdx - ChunkRecordIndexStartPosition >= CurrentChunkRecordsCount) { - if (!NextChunk()) { - return false; + CurrentChunk = BlobChunks->GetChunk(CurrentChunk, *RecordIndexStart); + + ui32 currentStart = *RecordIndexStart; + while (RecordIndexFinish >= CurrentChunk->GetFinishPosition()) { + column.AppendSlice( + CurrentChunk->GetArray(), currentStart - CurrentChunk->GetStartPosition(), CurrentChunk->GetFinishPosition() - currentStart); + currentStart = CurrentChunk->GetFinishPosition(); + if (currentStart < BlobChunks->GetRecordsCount()) { + CurrentChunk = BlobChunks->GetChunk(CurrentChunk, currentStart); + } else { + CurrentChunk.reset(); + break; } } - ui32 currentStart = currentStartPortionIdx - ChunkRecordIndexStartPosition; - while (currentFinishPortionIdx - ChunkRecordIndexStartPosition >= CurrentChunkRecordsCount) { - const ui32 currentFinish = CurrentChunkRecordsCount; -// if (currentStart == 0 && CurrentColumnChunk) { -// column.AppendBlob(CurrentBlobChunk->GetData(), *CurrentColumnChunk); -// } else { - column.AppendSlice(GetCurrentArray(), currentStart, currentFinish - currentStart); -// } - currentStart = 0; - if (!NextChunk()) { - return false; - } - } - - const ui32 currentFinish = currentFinishPortionIdx - ChunkRecordIndexStartPosition; - if (currentStart < currentFinish) { - Y_ABORT_UNLESS(currentFinish < CurrentChunkRecordsCount); - column.AppendSlice(GetCurrentArray(), currentStart, currentFinish - currentStart); + if (currentStart < RecordIndexFinish) { + AFL_VERIFY(CurrentChunk); + Y_ABORT_UNLESS(RecordIndexFinish < CurrentChunk->GetFinishPosition()); + column.AppendSlice(CurrentChunk->GetArray(), currentStart - CurrentChunk->GetStartPosition(), RecordIndexFinish - currentStart); } RecordIndexStart.reset(); @@ -41,7 +33,6 @@ bool TPortionColumnCursor::Fetch(TMergedColumn& column) { } bool TPortionColumnCursor::Next(const ui32 portionRecordIdx, TMergedColumn& column) { - Y_ABORT_UNLESS(ChunkRecordIndexStartPosition <= portionRecordIdx); if (!RecordIndexStart) { RecordIndexStart = portionRecordIdx; RecordIndexFinish = portionRecordIdx + 1; @@ -55,29 +46,4 @@ bool TPortionColumnCursor::Next(const ui32 portionRecordIdx, TMergedColumn& colu return true; } -bool TPortionColumnCursor::NextChunk() { - CurrentArray = nullptr; - if (++ChunkIdx == ColumnChunks.size()) { - return false; - } else { - ChunkRecordIndexStartPosition += CurrentChunkRecordsCount; - CurrentBlobChunk = BlobChunks[ChunkIdx]; - CurrentColumnChunk = ColumnChunks[ChunkIdx]; - CurrentChunkRecordsCount = CurrentBlobChunk->GetRecordsCountVerified(); - return true; - } -} - -const std::shared_ptr& TPortionColumnCursor::GetCurrentArray() { - Y_ABORT_UNLESS(ChunkIdx < ColumnChunks.size()); - Y_ABORT_UNLESS(CurrentBlobChunk); - - if (!CurrentArray) { - auto res = NArrow::TStatusValidator::GetValid(ColumnLoader->Apply(CurrentBlobChunk->GetData())); - AFL_VERIFY(res->num_columns() == 1); - CurrentArray = res->column(0); - } - return CurrentArray; -} - } diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h b/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h index 493cd6268f6a..3274201f229a 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h +++ b/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h @@ -9,46 +9,25 @@ namespace NKikimr::NOlap::NCompaction { class TPortionColumnCursor { private: - std::vector> BlobChunks; - std::vector ColumnChunks; + std::optional CurrentChunk; + std::shared_ptr BlobChunks; std::optional RecordIndexStart; YDB_READONLY(ui32, RecordIndexFinish, 0); - ui32 ChunkRecordIndexStartPosition = 0; - ui32 ChunkIdx = 0; - std::shared_ptr CurrentBlobChunk; - const TColumnRecord* CurrentColumnChunk = nullptr; - ui32 CurrentChunkRecordsCount = 0; - std::shared_ptr CurrentArray; - std::shared_ptr ColumnLoader; const ui64 PortionId; - const std::shared_ptr& GetCurrentArray(); - - bool NextChunk(); - public: ~TPortionColumnCursor() { - AFL_VERIFY(!RecordIndexStart || ChunkIdx == ColumnChunks.size())("chunk", ChunkIdx) - ("size", ColumnChunks.size())("start", RecordIndexStart)("finish", RecordIndexFinish) - ("max", CurrentBlobChunk->GetRecordsCount())("current_start_position", ChunkRecordIndexStartPosition); + AFL_VERIFY(!RecordIndexStart)("start", RecordIndexStart)("finish", RecordIndexFinish); } bool Next(const ui32 portionRecordIdx, TMergedColumn& column); bool Fetch(TMergedColumn& column); - TPortionColumnCursor(const std::vector>& columnChunks, const std::vector& records, const std::shared_ptr& loader, const ui64 portionId) + TPortionColumnCursor(const std::shared_ptr& columnChunks, const ui64 portionId) : BlobChunks(columnChunks) - , ColumnChunks(records) - , ColumnLoader(loader) , PortionId(portionId) { - AFL_VERIFY(ColumnLoader); Y_UNUSED(PortionId); - Y_ABORT_UNLESS(BlobChunks.size()); - Y_ABORT_UNLESS(ColumnChunks.size() == BlobChunks.size()); - CurrentBlobChunk = BlobChunks.front(); - CurrentColumnChunk = ColumnChunks.front(); - CurrentChunkRecordsCount = CurrentBlobChunk->GetRecordsCountVerified(); } }; diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.cpp index 09eed586ac20..1cd921676f01 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.cpp @@ -38,6 +38,7 @@ ui32 TColumnPortion::AppendSlice(const std::shared_ptr& a, const u Y_ABORT_UNLESS(length); Y_ABORT_UNLESS(CurrentPortionRecords < Context.GetPortionRowsCountLimit()); Y_ABORT_UNLESS(startIndex + length <= a->length()); + AFL_VERIFY(Type->id() == a->type_id())("own", Type->ToString())("a", a->type()->ToString()); ui32 i = startIndex; const ui32 packedRecordSize = Context.GetColumnStat() ? Context.GetColumnStat()->GetPackedRecordSize() : 0; for (; i < startIndex + length; ++i) { diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.h b/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.h index f1d4cbadd6cf..d176781bbf5c 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.h +++ b/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.h @@ -44,6 +44,7 @@ class TColumnPortion: public TColumnPortionResult { private: using TBase = TColumnPortionResult; std::unique_ptr Builder; + std::shared_ptr Type; const TColumnMergeContext& Context; YDB_READONLY(ui64, CurrentChunkRawSize, 0); double PredictedPackedBytes = 0; @@ -55,6 +56,7 @@ class TColumnPortion: public TColumnPortionResult { , ColumnInfo(Context.GetIndexInfo().GetColumnFeaturesVerified(context.GetColumnId())) { Builder = Context.MakeBuilder(); + Type = Builder->type(); } bool IsFullPortion() const { diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp new file mode 100644 index 000000000000..5cbb99229d27 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp @@ -0,0 +1,181 @@ +#include "merger.h" + +#include "column_cursor.h" +#include "column_portion_chunk.h" +#include "merge_context.h" +#include "merged_column.h" + +#include +#include +#include +#include + +namespace NKikimr::NOlap::NCompaction { + +std::vector TMerger::Execute(const std::shared_ptr& stats, + const std::map& checkPoints, const std::shared_ptr& resultFiltered, + const ui64 pathId, const std::optional shardingActualVersion) { + AFL_VERIFY(Batches.size() == Filters.size()); + static const TString portionIdFieldName = "$$__portion_id"; + static const TString portionRecordIndexFieldName = "$$__portion_record_idx"; + static const std::shared_ptr portionIdField = + std::make_shared(portionIdFieldName, std::make_shared()); + static const std::shared_ptr portionRecordIndexField = + std::make_shared(portionRecordIndexFieldName, std::make_shared()); + + std::vector> batchResults; + { + arrow::FieldVector indexFields; + indexFields.emplace_back(portionIdField); + indexFields.emplace_back(portionRecordIndexField); + IIndexInfo::AddSpecialFields(indexFields); + auto dataSchema = std::make_shared(indexFields); + NArrow::NMerger::TMergePartialStream mergeStream( + resultFiltered->GetIndexInfo().GetReplaceKey(), dataSchema, false, IIndexInfo::GetSnapshotColumnNames()); + + ui32 idx = 0; + for (auto&& batch : Batches) { + { + NArrow::NConstruction::IArrayBuilder::TPtr column = + std::make_shared>>( + portionIdFieldName, idx); + batch->AddField(portionIdField, column->BuildArray(batch->num_rows())).Validate(); + } + { + NArrow::NConstruction::IArrayBuilder::TPtr column = + std::make_shared>>( + portionRecordIndexFieldName); + batch->AddField(portionRecordIndexField, column->BuildArray(batch->num_rows())).Validate(); + } + mergeStream.AddSource(batch, Filters[idx]); + ++idx; + } + batchResults = mergeStream.DrainAllParts(checkPoints, indexFields); + } + + std::vector>> chunkGroups; + chunkGroups.resize(batchResults.size()); + for (auto&& columnId : resultFiltered->GetColumnIds()) { + NActors::TLogContextGuard logGuard( + NActors::TLogContextBuilder::Build()("field_name", resultFiltered->GetIndexInfo().GetColumnName(columnId))); + auto columnInfo = stats->GetColumnInfo(columnId); + auto resultField = resultFiltered->GetIndexInfo().GetColumnFieldVerified(columnId); + + std::vector cursors; + { + ui32 idx = 0; + for (auto&& p : Batches) { + cursors.emplace_back(NCompaction::TPortionColumnCursor(p->GetColumnVerified(resultFiltered->GetFieldIndex(columnId)), idx)); + ++idx; + } + } + + ui32 batchesRecordsCount = 0; + ui32 columnRecordsCount = 0; + std::map> columnChunks; + ui32 batchIdx = 0; + for (auto&& batchResult : batchResults) { + const ui32 portionRecordsCountLimit = + batchResult->num_rows() / (batchResult->num_rows() / NSplitter::TSplitSettings().GetExpectedRecordsCountOnPage() + 1) + 1; + NCompaction::TColumnMergeContext context(columnId, resultFiltered, portionRecordsCountLimit, + NSplitter::TSplitSettings().GetExpectedUnpackColumnChunkRawSize(), columnInfo); + NCompaction::TMergedColumn mColumn(context); + + auto columnPortionIdx = batchResult->GetColumnByName(portionIdFieldName); + auto columnPortionRecordIdx = batchResult->GetColumnByName(portionRecordIndexFieldName); + auto columnSnapshotPlanStepIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP); + auto columnSnapshotTxIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID); + Y_ABORT_UNLESS(columnPortionIdx && columnPortionRecordIdx && columnSnapshotPlanStepIdx && columnSnapshotTxIdx); + Y_ABORT_UNLESS(columnPortionIdx->type_id() == arrow::UInt16Type::type_id); + Y_ABORT_UNLESS(columnPortionRecordIdx->type_id() == arrow::UInt32Type::type_id); + Y_ABORT_UNLESS(columnSnapshotPlanStepIdx->type_id() == arrow::UInt64Type::type_id); + Y_ABORT_UNLESS(columnSnapshotTxIdx->type_id() == arrow::UInt64Type::type_id); + const arrow::UInt16Array& pIdxArray = static_cast(*columnPortionIdx); + const arrow::UInt32Array& pRecordIdxArray = static_cast(*columnPortionRecordIdx); + + AFL_VERIFY(batchResult->num_rows() == pIdxArray.length()); + std::optional predPortionIdx; + for (ui32 idx = 0; idx < pIdxArray.length(); ++idx) { + const ui16 portionIdx = pIdxArray.Value(idx); + const ui32 portionRecordIdx = pRecordIdxArray.Value(idx); + auto& cursor = cursors[portionIdx]; + cursor.Next(portionRecordIdx, mColumn); + if (predPortionIdx && portionIdx != *predPortionIdx) { + cursors[*predPortionIdx].Fetch(mColumn); + } + if (idx + 1 == pIdxArray.length()) { + cursor.Fetch(mColumn); + } + predPortionIdx = portionIdx; + } + chunkGroups[batchIdx][columnId] = mColumn.BuildResult(); + batchesRecordsCount += batchResult->num_rows(); + columnRecordsCount += mColumn.GetRecordsCount(); + AFL_VERIFY(batchResult->num_rows() == mColumn.GetRecordsCount()); + ++batchIdx; + } + AFL_VERIFY(columnRecordsCount == batchesRecordsCount)("mCount", columnRecordsCount)("bCount", batchesRecordsCount); + } + ui32 batchIdx = 0; + + const auto groups = + resultFiltered->GetIndexInfo().GetEntityGroupsByStorageId(IStoragesManager::DefaultStorageId, *SaverContext.GetStoragesManager()); + std::vector result; + for (auto&& columnChunks : chunkGroups) { + auto batchResult = batchResults[batchIdx]; + ++batchIdx; + Y_ABORT_UNLESS(columnChunks.size()); + + for (auto&& i : columnChunks) { + if (i.second.size() != columnChunks.begin()->second.size()) { + for (ui32 p = 0; p < std::min(columnChunks.begin()->second.size(), i.second.size()); ++p) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("p_first", columnChunks.begin()->second[p].DebugString())( + "p", i.second[p].DebugString()); + } + } + AFL_VERIFY(i.second.size() == columnChunks.begin()->second.size())("first", columnChunks.begin()->second.size())( + "current", i.second.size())("first_name", columnChunks.begin()->first)("current_name", i.first); + } + + std::vector batchSlices; + std::shared_ptr schemaDetails(new TDefaultSchemaDetails(resultFiltered, stats)); + + for (ui32 i = 0; i < columnChunks.begin()->second.size(); ++i) { + THashMap>> portionColumns; + for (auto&& p : columnChunks) { + portionColumns.emplace(p.first, p.second[i].GetChunks()); + } + batchSlices.emplace_back(portionColumns, schemaDetails, Context.Counters.SplitterCounters); + } + TSimilarPacker slicer(NSplitter::TSplitSettings().GetExpectedPortionSize()); + auto packs = slicer.Split(batchSlices); + + ui32 recordIdx = 0; + for (auto&& i : packs) { + TGeneralSerializedSlice slicePrimary(std::move(i)); + auto dataWithSecondary = resultFiltered->GetIndexInfo() + .AppendIndexes(slicePrimary.GetPortionChunksToHash(), SaverContext.GetStoragesManager()) + .DetachResult(); + TGeneralSerializedSlice slice(dataWithSecondary.GetExternalData(), schemaDetails, Context.Counters.SplitterCounters); + + auto b = batchResult->Slice(recordIdx, slice.GetRecordsCount()); + const ui32 deletionsCount = IIndexInfo::CalcDeletions(b, true); + auto constructor = TWritePortionInfoWithBlobsConstructor::BuildByBlobs(slice.GroupChunksByBlobs(groups), + dataWithSecondary.GetSecondaryInplaceData(), pathId, resultFiltered->GetVersion(), resultFiltered->GetSnapshot(), + SaverContext.GetStoragesManager()); + + NArrow::TFirstLastSpecialKeys primaryKeys(slice.GetFirstLastPKBatch(resultFiltered->GetIndexInfo().GetReplaceKey())); + NArrow::TMinMaxSpecialKeys snapshotKeys(b, TIndexInfo::ArrowSchemaSnapshot()); + constructor.GetPortionConstructor().AddMetadata(*resultFiltered, deletionsCount, primaryKeys, snapshotKeys); + constructor.GetPortionConstructor().MutableMeta().SetTierName(IStoragesManager::DefaultStorageId); + if (shardingActualVersion) { + constructor.GetPortionConstructor().SetShardingVersion(*shardingActualVersion); + } + result.emplace_back(std::move(constructor)); + recordIdx += slice.GetRecordsCount(); + } + } + return result; +} + +} diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/merger.h b/ydb/core/tx/columnshard/engines/changes/compaction/merger.h new file mode 100644 index 000000000000..c41a99c53427 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/changes/compaction/merger.h @@ -0,0 +1,47 @@ +#pragma once +#include +#include +#include +#include +#include +#include +#include + +namespace NKikimr::NOlap::NCompaction { +class TMerger { +private: + std::vector> Batches; + std::vector> Filters; + const TConstructionContext& Context; + const TSaverContext& SaverContext; + +public: + void AddBatch(const std::shared_ptr& batch, const std::shared_ptr& filter) { + AFL_VERIFY(batch); + Batches.emplace_back(batch); + Filters.emplace_back(filter); + } + + TMerger(const TConstructionContext& context, const TSaverContext& saverContext) + : Context(context) + , SaverContext(saverContext) + { + + } + + TMerger(const TConstructionContext& context, const TSaverContext& saverContext, + std::vector>&& batches, + std::vector>&& filters) + : Batches(std::move(batches)) + , Filters(std::move(filters)) + , Context(context) + , SaverContext(saverContext) { + AFL_VERIFY(Batches.size() == Filters.size()); + } + + std::vector Execute( + const std::shared_ptr& stats, + const std::map& checkPoints, + const std::shared_ptr& resultFiltered, const ui64 pathId, const std::optional shardingActualVersion); +}; +} diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/ya.make b/ydb/core/tx/columnshard/engines/changes/compaction/ya.make index aa52c0f9d6a0..9d7a9ba90842 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/ya.make +++ b/ydb/core/tx/columnshard/engines/changes/compaction/ya.make @@ -5,6 +5,7 @@ SRCS( column_cursor.cpp column_portion_chunk.cpp merged_column.cpp + merger.cpp ) PEERDIR( diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp index f6b56c88bb61..047e4b137a7f 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp @@ -1,81 +1,31 @@ #include "general_compaction.h" -#include "compaction/column_cursor.h" -#include "compaction/column_portion_chunk.h" -#include "compaction/merge_context.h" -#include "compaction/merged_column.h" #include "counters/general.h" +#include "compaction/merger.h" -#include -#include -#include +#include #include -#include -#include -#include -#include -#include namespace NKikimr::NOlap::NCompaction { -void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByFullBatches( - TConstructionContext& context, std::vector&& portions) noexcept { - std::vector> batchResults; - auto resultSchema = context.SchemaVersions.GetLastSchema(); - auto shardingActual = context.SchemaVersions.GetShardingInfoActual(GranuleMeta->GetPathId()); - { - auto resultDataSchema = resultSchema->GetIndexInfo().ArrowSchemaWithSpecials(); - NArrow::NMerger::TMergePartialStream mergeStream( - resultSchema->GetIndexInfo().GetReplaceKey(), resultDataSchema, false, IIndexInfo::GetSnapshotColumnNames()); - - THashSet portionsInUsage; - std::set columnIds; - for (auto&& i : portions) { - if (columnIds.size() != resultSchema->GetColumnsCount()) { - for (auto id : i.GetPortionInfo().GetColumnIds()) { - if (resultSchema->GetFieldIndex(id) > 0) { - columnIds.emplace(id); - } - } - } - AFL_VERIFY(portionsInUsage.emplace(i.GetPortionInfo().GetPortionId()).second); - } - AFL_VERIFY(columnIds.size() <= resultSchema->GetColumnsCount()); - - for (auto&& i : portions) { - auto dataSchema = i.GetPortionInfo().GetSchema(context.SchemaVersions); - auto batch = i.RestoreBatch(dataSchema, *resultSchema); - batch = resultSchema->NormalizeBatch(*dataSchema, batch, columnIds).DetachResult(); - IIndexInfo::NormalizeDeletionColumn(*batch); - auto filter = BuildPortionFilter(shardingActual, batch, i.GetPortionInfo(), portionsInUsage, resultSchema); - mergeStream.AddSource(batch, filter); - } - batchResults = mergeStream.DrainAllParts(CheckPoints, resultDataSchema->fields()); - } - Y_ABORT_UNLESS(batchResults.size()); - for (auto&& b : batchResults) { - auto portions = MakeAppendedPortions(b, GranuleMeta->GetPathId(), resultSchema->GetSnapshot(), GranuleMeta.get(), context, {}); - Y_ABORT_UNLESS(portions.size()); - for (auto& portion : portions) { - if (shardingActual) { - portion.GetPortionConstructor().SetShardingVersion(shardingActual->GetSnapshotVersion()); - } - AppendedPortions.emplace_back(std::move(portion)); - } - } -} - std::shared_ptr TGeneralCompactColumnEngineChanges::BuildPortionFilter( const std::optional& shardingActual, const std::shared_ptr& batch, const TPortionInfo& pInfo, const THashSet& portionsInUsage, const ISnapshotSchema::TPtr& resultSchema) const { std::shared_ptr filter; - auto table = batch->BuildTableVerified(); if (shardingActual && pInfo.NeedShardingFilter(*shardingActual)) { + std::set fieldNames; + for (auto&& i : shardingActual->GetShardingInfo()->GetColumnNames()) { + fieldNames.emplace(i); + } + auto table = batch->BuildTableVerified(fieldNames); + AFL_VERIFY(table); filter = shardingActual->GetShardingInfo()->GetFilter(table); } NArrow::TColumnFilter filterDeleted = NArrow::TColumnFilter::BuildAllowFilter(); if (pInfo.GetMeta().GetDeletionsCount()) { + auto table = batch->BuildTableVerified(std::set({ TIndexInfo::SPEC_COL_DELETE_FLAG })); + AFL_VERIFY(table); auto col = table->GetColumnByName(TIndexInfo::SPEC_COL_DELETE_FLAG); AFL_VERIFY(col); AFL_VERIFY(col->type()->id() == arrow::Type::BOOL); @@ -128,216 +78,70 @@ std::shared_ptr TGeneralCompactColumnEngineChanges::Build void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks( TConstructionContext& context, std::vector&& portions) noexcept { - static const TString portionIdFieldName = "$$__portion_id"; - static const TString portionRecordIndexFieldName = "$$__portion_record_idx"; - static const std::shared_ptr portionIdField = - std::make_shared(portionIdFieldName, std::make_shared()); - static const std::shared_ptr portionRecordIndexField = - std::make_shared(portionRecordIndexFieldName, std::make_shared()); - auto resultSchema = context.SchemaVersions.GetLastSchema(); auto shardingActual = context.SchemaVersions.GetShardingInfoActual(GranuleMeta->GetPathId()); - std::vector pkFieldNames = resultSchema->GetIndexInfo().GetReplaceKey()->field_names(); - std::set pkFieldNamesSet(pkFieldNames.begin(), pkFieldNames.end()); - for (auto&& i : TIndexInfo::GetSnapshotColumnNames()) { - pkFieldNamesSet.emplace(i); - } - pkFieldNamesSet.emplace(TIndexInfo::SPEC_COL_DELETE_FLAG); - - std::vector> batchResults; - { - arrow::FieldVector indexFields; - indexFields.emplace_back(portionIdField); - indexFields.emplace_back(portionRecordIndexField); - IIndexInfo::AddSpecialFields(indexFields); - auto dataSchema = std::make_shared(indexFields); - NArrow::NMerger::TMergePartialStream mergeStream( - resultSchema->GetIndexInfo().GetReplaceKey(), dataSchema, false, IIndexInfo::GetSnapshotColumnNames()); - THashSet usedPortionIds; - for (auto&& i : portions) { - AFL_VERIFY(usedPortionIds.emplace(i.GetPortionInfo().GetPortionId()).second); - } - - ui32 idx = 0; - for (auto&& i : portions) { - auto dataSchema = i.GetPortionInfo().GetSchema(context.SchemaVersions); - auto batch = i.RestoreBatch(dataSchema, *resultSchema, pkFieldNamesSet); - { - NArrow::NConstruction::IArrayBuilder::TPtr column = - std::make_shared>>( - portionIdFieldName, idx++); - batch->AddField(portionIdField, column->BuildArray(batch->num_rows())).Validate(); - } - { - NArrow::NConstruction::IArrayBuilder::TPtr column = - std::make_shared>>( - portionRecordIndexFieldName); - batch->AddField(portionRecordIndexField, column->BuildArray(batch->num_rows())).Validate(); - } - IIndexInfo::NormalizeDeletionColumn(*batch); - std::shared_ptr filter = - BuildPortionFilter(shardingActual, batch, i.GetPortionInfo(), usedPortionIds, resultSchema); - mergeStream.AddSource(batch, filter); - } - batchResults = mergeStream.DrainAllParts(CheckPoints, indexFields); - } - std::shared_ptr stats = std::make_shared(); - std::set columnIds; + std::shared_ptr resultFiltered; + NCompaction::TMerger merger(context, SaverContext); { + std::set pkColumnIds; { - THashMap schemas; - for (auto& portion : SwitchedPortions) { - auto dataSchema = portion.GetSchema(context.SchemaVersions); - schemas.emplace(dataSchema->GetVersion(), dataSchema); - } - columnIds = ISnapshotSchema::GetColumnsWithDifferentDefaults(schemas, resultSchema); + auto pkColumnIdsVector = IIndexInfo::AddSnapshotFieldIds(resultSchema->GetIndexInfo().GetPKColumnIds()); + pkColumnIds = std::set(pkColumnIdsVector.begin(), pkColumnIdsVector.end()); } - for (auto&& i : SwitchedPortions) { - stats->Merge(i.GetSerializationStat(*resultSchema)); - if (columnIds.size() != resultSchema->GetColumnsCount()) { - for (auto id : i.GetColumnIds()) { - if (resultSchema->HasColumnId(id)) { - columnIds.emplace(id); - } + std::set dataColumnIds; + { + { + THashMap schemas; + for (auto& portion : SwitchedPortions) { + auto dataSchema = portion.GetSchema(context.SchemaVersions); + schemas.emplace(dataSchema->GetVersion(), dataSchema); } + dataColumnIds = ISnapshotSchema::GetColumnsWithDifferentDefaults(schemas, resultSchema); } - } - AFL_VERIFY(columnIds.size() <= resultSchema->GetColumnsCount()); - } - - std::vector>> chunkGroups; - chunkGroups.resize(batchResults.size()); - for (auto&& columnId : columnIds) { - NActors::TLogContextGuard logGuard( - NActors::TLogContextBuilder::Build()("field_name", resultSchema->GetIndexInfo().GetColumnName(columnId))); - auto columnInfo = stats->GetColumnInfo(columnId); - auto resultField = resultSchema->GetIndexInfo().GetColumnFieldVerified(columnId); - - std::vector cursors; - for (auto&& p : portions) { - auto dataSchema = p.GetPortionInfo().GetSchema(context.SchemaVersions); - auto loader = dataSchema->GetColumnLoaderOptional(columnId); - std::vector records; - std::vector> chunks; - if (!p.ExtractColumnChunks(columnId, records, chunks)) { - if (!loader) { - loader = resultSchema->GetColumnLoaderVerified(columnId); + for (auto&& i : SwitchedPortions) { + stats->Merge(i.GetSerializationStat(*resultSchema)); + if (dataColumnIds.size() != resultSchema->GetColumnsCount()) { + for (auto id : i.GetColumnIds()) { + if (resultSchema->HasColumnId(id)) { + dataColumnIds.emplace(id); + } + } } - auto f = resultSchema->GetFieldByColumnIdVerified(columnId); - chunks.emplace_back(std::make_shared(columnId, p.GetPortionInfo().GetRecordsCount(), - resultField, resultSchema->GetExternalDefaultValueVerified(columnId), - resultSchema->GetColumnSaver(columnId))); - records = { nullptr }; } - AFL_VERIFY(!!loader); - cursors.emplace_back(TPortionColumnCursor(chunks, records, loader, p.GetPortionInfo().GetPortionId())); - } - - ui32 batchesRecordsCount = 0; - ui32 columnRecordsCount = 0; - std::map> columnChunks; - ui32 batchIdx = 0; - for (auto&& batchResult : batchResults) { - const ui32 portionRecordsCountLimit = - batchResult->num_rows() / (batchResult->num_rows() / NSplitter::TSplitSettings().GetExpectedRecordsCountOnPage() + 1) + 1; - TColumnMergeContext context( - columnId, resultSchema, portionRecordsCountLimit, NSplitter::TSplitSettings().GetExpectedUnpackColumnChunkRawSize(), columnInfo); - TMergedColumn mColumn(context); - - auto columnPortionIdx = batchResult->GetColumnByName(portionIdFieldName); - auto columnPortionRecordIdx = batchResult->GetColumnByName(portionRecordIndexFieldName); - auto columnSnapshotPlanStepIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP); - auto columnSnapshotTxIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID); - Y_ABORT_UNLESS(columnPortionIdx && columnPortionRecordIdx && columnSnapshotPlanStepIdx && columnSnapshotTxIdx); - Y_ABORT_UNLESS(columnPortionIdx->type_id() == arrow::UInt16Type::type_id); - Y_ABORT_UNLESS(columnPortionRecordIdx->type_id() == arrow::UInt32Type::type_id); - Y_ABORT_UNLESS(columnSnapshotPlanStepIdx->type_id() == arrow::UInt64Type::type_id); - Y_ABORT_UNLESS(columnSnapshotTxIdx->type_id() == arrow::UInt64Type::type_id); - const arrow::UInt16Array& pIdxArray = static_cast(*columnPortionIdx); - const arrow::UInt32Array& pRecordIdxArray = static_cast(*columnPortionRecordIdx); - - AFL_VERIFY(batchResult->num_rows() == pIdxArray.length()); - std::optional predPortionIdx; - for (ui32 idx = 0; idx < pIdxArray.length(); ++idx) { - const ui16 portionIdx = pIdxArray.Value(idx); - const ui32 portionRecordIdx = pRecordIdxArray.Value(idx); - auto& cursor = cursors[portionIdx]; - cursor.Next(portionRecordIdx, mColumn); - if (predPortionIdx && portionIdx != *predPortionIdx) { - cursors[*predPortionIdx].Fetch(mColumn); - } - if (idx + 1 == pIdxArray.length()) { - cursor.Fetch(mColumn); - } - predPortionIdx = portionIdx; + AFL_VERIFY(dataColumnIds.size() <= resultSchema->GetColumnsCount()); + if (dataColumnIds.contains((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG)) { + pkColumnIds.emplace((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG); } - chunkGroups[batchIdx][columnId] = mColumn.BuildResult(); - batchesRecordsCount += batchResult->num_rows(); - columnRecordsCount += mColumn.GetRecordsCount(); - AFL_VERIFY(batchResult->num_rows() == mColumn.GetRecordsCount()); - ++batchIdx; } - AFL_VERIFY(columnRecordsCount == batchesRecordsCount)("mCount", columnRecordsCount)("bCount", batchesRecordsCount); - } - ui32 batchIdx = 0; - - const auto groups = - resultSchema->GetIndexInfo().GetEntityGroupsByStorageId(IStoragesManager::DefaultStorageId, *SaverContext.GetStoragesManager()); - for (auto&& columnChunks : chunkGroups) { - auto batchResult = batchResults[batchIdx]; - ++batchIdx; - Y_ABORT_UNLESS(columnChunks.size()); - for (auto&& i : columnChunks) { - if (i.second.size() != columnChunks.begin()->second.size()) { - for (ui32 p = 0; p < std::min(columnChunks.begin()->second.size(), i.second.size()); ++p) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("p_first", columnChunks.begin()->second[p].DebugString())( - "p", i.second[p].DebugString()); - } + resultFiltered = std::make_shared(resultSchema, dataColumnIds); + { + auto seqDataColumnIds = dataColumnIds; + for (auto&& i : pkColumnIds) { + AFL_VERIFY(seqDataColumnIds.erase(i)); } - AFL_VERIFY(i.second.size() == columnChunks.begin()->second.size())("first", columnChunks.begin()->second.size())( - "current", i.second.size())("first_name", columnChunks.begin()->first)("current_name", i.first); - } - - std::vector batchSlices; - std::shared_ptr schemaDetails(new TDefaultSchemaDetails(resultSchema, stats)); - - for (ui32 i = 0; i < columnChunks.begin()->second.size(); ++i) { - THashMap>> portionColumns; - for (auto&& p : columnChunks) { - portionColumns.emplace(p.first, p.second[i].GetChunks()); + THashSet usedPortionIds; + for (auto&& i : portions) { + AFL_VERIFY(usedPortionIds.emplace(i.GetPortionInfo().GetPortionId()).second); } - batchSlices.emplace_back(portionColumns, schemaDetails, context.Counters.SplitterCounters); - } - TSimilarPacker slicer(NSplitter::TSplitSettings().GetExpectedPortionSize()); - auto packs = slicer.Split(batchSlices); - ui32 recordIdx = 0; - for (auto&& i : packs) { - TGeneralSerializedSlice slicePrimary(std::move(i)); - auto dataWithSecondary = - resultSchema->GetIndexInfo().AppendIndexes(slicePrimary.GetPortionChunksToHash(), SaverContext.GetStoragesManager()).DetachResult(); - TGeneralSerializedSlice slice(dataWithSecondary.GetExternalData(), schemaDetails, context.Counters.SplitterCounters); - - auto b = batchResult->Slice(recordIdx, slice.GetRecordsCount()); - const ui32 deletionsCount = IIndexInfo::CalcDeletions(b, true); - auto constructor = TWritePortionInfoWithBlobsConstructor::BuildByBlobs(slice.GroupChunksByBlobs(groups), - dataWithSecondary.GetSecondaryInplaceData(), GranuleMeta->GetPathId(), - resultSchema->GetVersion(), resultSchema->GetSnapshot(), SaverContext.GetStoragesManager()); - - NArrow::TFirstLastSpecialKeys primaryKeys(slice.GetFirstLastPKBatch(resultSchema->GetIndexInfo().GetReplaceKey())); - NArrow::TMinMaxSpecialKeys snapshotKeys(b, TIndexInfo::ArrowSchemaSnapshot()); - constructor.GetPortionConstructor().AddMetadata(*resultSchema, deletionsCount, primaryKeys, snapshotKeys); - constructor.GetPortionConstructor().MutableMeta().SetTierName(IStoragesManager::DefaultStorageId); - if (shardingActual) { - constructor.GetPortionConstructor().SetShardingVersion(shardingActual->GetSnapshotVersion()); + for (auto&& i : portions) { + auto blobsSchema = i.GetPortionInfo().GetSchema(context.SchemaVersions); + auto batch = i.RestoreBatch(*blobsSchema, *resultFiltered, seqDataColumnIds); + std::shared_ptr filter = + BuildPortionFilter(shardingActual, batch, i.GetPortionInfo(), usedPortionIds, resultFiltered); + merger.AddBatch(batch, filter); } - AppendedPortions.emplace_back(std::move(constructor)); - recordIdx += slice.GetRecordsCount(); } } + + std::optional shardingActualVersion; + if (shardingActual) { + shardingActualVersion = shardingActual->GetSnapshotVersion(); + } + AppendedPortions = merger.Execute(stats, CheckPoints, resultFiltered, GranuleMeta->GetPathId(), shardingActualVersion); } TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstructionContext& context) noexcept { @@ -363,11 +167,7 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc { std::vector portions = TReadPortionInfoWithBlobs::RestorePortions(SwitchedPortions, Blobs, context.SchemaVersions); - if (!HasAppData() || AppDataVerified().ColumnShardConfig.GetUseChunkedMergeOnCompaction()) { - BuildAppendedPortionsByChunks(context, std::move(portions)); - } else { - BuildAppendedPortionsByFullBatches(context, std::move(portions)); - } + BuildAppendedPortionsByChunks(context, std::move(portions)); } if (IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD)) { @@ -417,11 +217,7 @@ void TGeneralCompactColumnEngineChanges::AddCheckPoint( } std::shared_ptr TGeneralCompactColumnEngineChanges::BuildMemoryPredictor() { - if (!HasAppData() || AppDataVerified().ColumnShardConfig.GetUseChunkedMergeOnCompaction()) { - return std::make_shared(); - } else { - return std::make_shared(); - } + return std::make_shared(); } ui64 TGeneralCompactColumnEngineChanges::TMemoryPredictorChunkedPolicy::AddPortion(const TPortionInfo& portionInfo) { diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.h b/ydb/core/tx/columnshard/engines/changes/general_compaction.h index d378b4829b08..29bf0e99b9cb 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.h +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.h @@ -10,7 +10,6 @@ class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges { using TBase = TCompactColumnEngineChanges; virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) override; std::map CheckPoints; - void BuildAppendedPortionsByFullBatches(TConstructionContext& context, std::vector&& portions) noexcept; void BuildAppendedPortionsByChunks(TConstructionContext& context, std::vector&& portions) noexcept; std::shared_ptr BuildPortionFilter(const std::optional& shardingActual, diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp index ca2ab41f319b..cd555b42f412 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp +++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp @@ -1,12 +1,7 @@ #include "indexation.h" -#include -#include + +#include "compaction/merger.h" #include -#include -#include -#include -#include -#include namespace NKikimr::NOlap { @@ -81,6 +76,14 @@ class TPathData { } + std::vector> GetGeneralContainers() const { + std::vector> result; + for (auto&& i : Batches) { + result.emplace_back(i.GetBatch()); + } + return result; + } + bool HasDeletion() { return HasDeletionFlag; } @@ -100,26 +103,6 @@ class TPathData { ShardingInfo = info; } } - - std::shared_ptr Merge( - const std::shared_ptr& sortSchema, const std::shared_ptr& dataSchema) const { - NArrow::NMerger::TMergePartialStream stream(sortSchema, dataSchema, false, IIndexInfo::GetSnapshotColumnNames()); - THashMap fieldSizes; - ui64 rowsCount = 0; - for (auto&& batch : Batches) { - auto& forMerge = batch.GetBatch(); - stream.AddSource(forMerge, nullptr); - for (ui32 cIdx = 0; cIdx < (ui32)forMerge->GetColumnsCount(); ++cIdx) { - fieldSizes[forMerge->GetSchema()->GetFieldVerified(cIdx)->name()] += forMerge->GetColumnVerified(cIdx)->GetRawSize().value_or(0); - } - rowsCount += forMerge->num_rows(); - } - - NArrow::NMerger::TRecordBatchBuilder builder(dataSchema->fields(), rowsCount, fieldSizes); - stream.SetPossibleSameVersion(true); - stream.DrainAll(builder); - return builder.Finalize(); - } }; class TPathesData { @@ -204,37 +187,26 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont } Y_ABORT_UNLESS(Blobs.IsEmpty()); - const std::vector comparableColumns = resultSchema->GetIndexInfo().GetReplaceKey()->field_names(); - auto filteredSchema = resultSchema->GetIndexInfo().GetColumnsSchema(usageColumnIds); + auto filteredSnapshot = std::make_shared(resultSchema, usageColumnIds); + auto stats = std::make_shared(); + std::vector> filters; for (auto& [pathId, pathInfo] : pathBatches.GetData()) { - auto shardingFilter = context.SchemaVersions.GetShardingInfoActual(pathId); - auto mergedBatch = pathInfo.Merge(resultSchema->GetIndexInfo().GetReplaceKey(), filteredSchema); - Y_DEBUG_ABORT_UNLESS(NArrow::IsSortedAndUnique(mergedBatch, resultSchema->GetIndexInfo().GetReplaceKey())); + std::optional shardingVersion; + if (pathInfo.GetShardingInfo()) { + shardingVersion = pathInfo.GetShardingInfo()->GetSnapshotVersion(); + } + auto batches = pathInfo.GetGeneralContainers(); + filters.resize(batches.size()); auto itGranule = PathToGranule.find(pathId); AFL_VERIFY(itGranule != PathToGranule.end()); - std::vector> result = - NArrow::NMerger::TRWSortableBatchPosition::SplitByBordersInSequentialContainer(mergedBatch, comparableColumns, itGranule->second); - for (auto&& b : result) { - if (!b) { - continue; - } - std::optional externalSaver; - if (b->num_rows() < 100) { - externalSaver = NArrow::NSerialization::TSerializerContainer( - std::make_shared(arrow::Compression::type::UNCOMPRESSED)); - } else { - externalSaver = NArrow::NSerialization::TSerializerContainer( - std::make_shared(arrow::Compression::type::LZ4_FRAME)); - } - auto portions = MakeAppendedPortions(b, pathId, maxSnapshot, nullptr, context, externalSaver); - Y_ABORT_UNLESS(portions.size()); - for (auto& portion : portions) { - if (pathInfo.GetShardingInfo()) { - portion.GetPortionConstructor().SetShardingVersion(pathInfo.GetShardingInfo()->GetSnapshotVersion()); - } - AppendedPortions.emplace_back(TWritePortionInfoWithBlobsResult(std::move(portion))); - } + std::map points; + for (auto&& i : itGranule->second) { + AFL_VERIFY(points.emplace(i, false).second); + } + auto localAppended = NCompaction::TMerger(context, SaverContext, std::move(batches), std::move(filters)).Execute(stats, points, filteredSnapshot, pathId, shardingVersion); + for (auto&& i : localAppended) { + AppendedPortions.emplace_back(std::move(i)); } } diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp index 3091b5a61068..ab480d5479af 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -710,7 +710,11 @@ std::shared_ptr TPortionInfo::TPreparedColumn::Assembl ui64 recordsCount = 0; for (auto& blob : Blobs) { chunks.push_back(blob.BuildDeserializeChunk(Loader)); - recordsCount += blob.GetExpectedRowsCountVerified(); + if (!!blob.GetData()) { + recordsCount += blob.GetExpectedRowsCountVerified(); + } else { + recordsCount += blob.GetDefaultRowsCount(); + } } return std::make_shared(recordsCount, Loader, std::move(chunks)); @@ -773,11 +777,16 @@ std::shared_ptr TPortionInfo::TPreparedBatchData::Ass return std::make_shared(fields, std::move(columns)); } -std::shared_ptr TPortionInfo::TPreparedBatchData::AssembleToGeneralContainer() const { +std::shared_ptr TPortionInfo::TPreparedBatchData::AssembleToGeneralContainer( + const std::set& sequentialColumnIds) const { std::vector> columns; std::vector> fields; for (auto&& i : Columns) { - columns.emplace_back(i.AssembleAccessor()); + if (sequentialColumnIds.contains(i.GetColumnId())) { + columns.emplace_back(i.AssembleForSeqAccess()); + } else { + columns.emplace_back(i.AssembleAccessor()); + } fields.emplace_back(i.GetField()); } diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h index b4f06b16597e..ad1b661dea12 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h @@ -752,7 +752,7 @@ class TPortionInfo { } std::shared_ptr Assemble(const TAssembleOptions& options = {}) const; - std::shared_ptr AssembleToGeneralContainer() const; + std::shared_ptr AssembleToGeneralContainer(const std::set& sequentialColumnIds) const; std::shared_ptr AssembleTable(const TAssembleOptions& options = {}) const; std::shared_ptr AssembleForSeqAccess() const; }; diff --git a/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp b/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp index 7e4164b64da0..7d74512e1696 100644 --- a/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp +++ b/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp @@ -14,19 +14,13 @@ void TReadPortionInfoWithBlobs::RestoreChunk(const std::shared_ptr TReadPortionInfoWithBlobs::RestoreBatch( - const ISnapshotSchema::TPtr& data, const ISnapshotSchema& result, const std::set& columnNames) const { - Y_ABORT_UNLESS(data); + const ISnapshotSchema& data, const ISnapshotSchema& resultSchema, const std::set& seqColumns) const { THashMap blobs; for (auto&& i : PortionInfo.Records) { blobs[i.GetAddress()] = GetBlobByAddressVerified(i.ColumnId, i.Chunk); Y_ABORT_UNLESS(blobs[i.GetAddress()].size() == i.BlobRange.Size); } - if (columnNames.empty()) { - return PortionInfo.PrepareForAssemble(*data, result, blobs).AssembleToGeneralContainer(); - } else { - auto filteredSchema = std::make_shared(data, columnNames); - return PortionInfo.PrepareForAssemble(*data, *filteredSchema, blobs).AssembleToGeneralContainer(); - } + return PortionInfo.PrepareForAssemble(data, resultSchema, blobs).AssembleToGeneralContainer(seqColumns); } NKikimr::NOlap::TReadPortionInfoWithBlobs TReadPortionInfoWithBlobs::RestorePortion(const TPortionInfo& portion, NBlobOperations::NRead::TCompositeReadBlobs& blobs, const TIndexInfo& indexInfo) { @@ -70,21 +64,16 @@ bool TReadPortionInfoWithBlobs::ExtractColumnChunks(const ui32 entityId, std::ve if (records.empty()) { return false; } - std::map> chunksMap; + std::vector> chunksLocal; for (auto it = Chunks.begin(); it != Chunks.end();) { if (it->first.GetEntityId() == entityId) { - chunksMap.emplace(it->first, std::move(it->second)); + AFL_VERIFY(chunksLocal.empty() || chunksLocal.back()->GetChunkAddressVerified() < it->second->GetChunkAddressVerified()); + chunksLocal.emplace_back(std::move(it->second)); it = Chunks.erase(it); } else { ++it; } } - std::vector> chunksLocal; - for (auto&& i : chunksMap) { - Y_ABORT_UNLESS(i.first.GetColumnId() == entityId); - Y_ABORT_UNLESS(i.first.GetChunk() == chunksLocal.size()); - chunksLocal.emplace_back(i.second); - } std::swap(chunksLocal, chunks); return true; } diff --git a/ydb/core/tx/columnshard/engines/portions/read_with_blobs.h b/ydb/core/tx/columnshard/engines/portions/read_with_blobs.h index 96134c83161b..a9e24eb3c165 100644 --- a/ydb/core/tx/columnshard/engines/portions/read_with_blobs.h +++ b/ydb/core/tx/columnshard/engines/portions/read_with_blobs.h @@ -38,7 +38,7 @@ class TReadPortionInfoWithBlobs: public TBasePortionInfoWithBlobs { static TReadPortionInfoWithBlobs RestorePortion(const TPortionInfo& portion, NBlobOperations::NRead::TCompositeReadBlobs& blobs, const TIndexInfo& indexInfo); - std::shared_ptr RestoreBatch(const ISnapshotSchema::TPtr& data, const ISnapshotSchema& result, const std::set& columnNames = {}) const; + std::shared_ptr RestoreBatch(const ISnapshotSchema& data, const ISnapshotSchema& resultSchema, const std::set& seqColumns) const; static std::optional SyncPortion(TReadPortionInfoWithBlobs&& source, const ISnapshotSchema::TPtr& from, const ISnapshotSchema::TPtr& to, const TString& targetTier, const std::shared_ptr& storages, std::shared_ptr counters); diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h b/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h index 7a248955238c..2776bc319a81 100644 --- a/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h +++ b/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h @@ -100,6 +100,14 @@ class IIndexInfo { return result; } + [[nodiscard]] static std::vector AddSnapshotFieldIds(const std::vector& baseColumnIds) { + std::vector result = baseColumnIds; + for (auto&& i : GetSnapshotColumnIds()) { + result.emplace_back(i); + } + return result; + } + std::optional GetColumnIdOptional(const std::string& name) const; TString GetColumnName(ui32 id, bool required) const; static std::shared_ptr GetColumnFieldOptional(const ui32 columnId); diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp index 2a5ac55ab415..5bdfc2838eb9 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp @@ -157,10 +157,11 @@ void TIndexInfo::SetAllKeys(const std::shared_ptr& operators) /// * apply REPLACE by MergeSort /// * apply PK predicate before REPLACE { + AFL_VERIFY(PKColumnIds.empty()); const auto& primaryKeyNames = NamesOnly(GetPrimaryKeyColumns()); - auto columnIds = GetColumnIds(primaryKeyNames); - AFL_VERIFY(columnIds.size()); - PrimaryKey = MakeArrowSchema(Columns, columnIds); + PKColumnIds = GetColumnIds(primaryKeyNames); + AFL_VERIFY(PKColumnIds.size()); + PrimaryKey = MakeArrowSchema(Columns, PKColumnIds); } for (const auto& [colId, column] : Columns) { diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.h b/ydb/core/tx/columnshard/engines/scheme/index_info.h index b95a679fe25e..622f8c741050 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.h +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.h @@ -252,6 +252,10 @@ struct TIndexInfo : public NTable::TScheme::TTableSchema, public IIndexInfo { std::vector GetColumnNames(const std::vector& ids) const; std::vector GetColumnSTLNames(const std::vector& ids) const; const std::vector& GetColumnIds(const bool withSpecial = true) const; + const std::vector& GetPKColumnIds() const { + AFL_VERIFY(PKColumnIds.size()); + return PKColumnIds; + } std::vector GetEntityIds() const; /// Returns info of columns defined by specific ids. @@ -317,6 +321,7 @@ struct TIndexInfo : public NTable::TScheme::TTableSchema, public IIndexInfo { TString Name; std::vector SchemaColumnIds; std::vector SchemaColumnIdsWithSpecials; + std::vector PKColumnIds; std::shared_ptr Schema; std::shared_ptr SchemaWithSpecials; std::shared_ptr PrimaryKey; diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/filtered_scheme.h b/ydb/core/tx/columnshard/engines/scheme/versions/filtered_scheme.h index e9fa1b41b7c2..1b515d5bb9cf 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/filtered_scheme.h +++ b/ydb/core/tx/columnshard/engines/scheme/versions/filtered_scheme.h @@ -9,7 +9,7 @@ namespace NKikimr::NOlap { class TFilteredSnapshotSchema: public ISnapshotSchema { ISnapshotSchema::TPtr OriginalSnapshot; std::shared_ptr Schema; - std::set ColumnIds; + YDB_READONLY_DEF(std::set, ColumnIds); protected: virtual TString DoDebugString() const override; public: From 82e0b1305c2881aac34b5e713728395a93070863 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Sun, 21 Jul 2024 18:52:01 +0300 Subject: [PATCH 2/4] fixes --- ydb/core/formats/arrow/common/accessor.h | 3 ++ .../changes/compaction/column_cursor.cpp | 6 ++- .../changes/compaction/merge_context.h | 12 +++-- .../engines/changes/compaction/merger.cpp | 15 +++++- .../engines/changes/compaction/merger.h | 1 + .../engines/changes/indexation.cpp | 4 +- .../engines/changes/with_appended.cpp | 47 ------------------- .../engines/changes/with_appended.h | 2 - 8 files changed, 34 insertions(+), 56 deletions(-) diff --git a/ydb/core/formats/arrow/common/accessor.h b/ydb/core/formats/arrow/common/accessor.h index cfadfff13a7f..8d10f2ae2e93 100644 --- a/ydb/core/formats/arrow/common/accessor.h +++ b/ydb/core/formats/arrow/common/accessor.h @@ -95,6 +95,9 @@ class IChunkedArray { ui32 startIndex = 0; ui64 idx = 0; if (chunkCurrent) { + if (position < chunkCurrent->GetFinishPosition()) { + return *chunkCurrent; + } AFL_VERIFY(chunkCurrent->GetChunkIndex() < accessor.GetChunksCount()); startIndex = chunkCurrent->GetChunkIndex(); idx = chunkCurrent->GetStartPosition(); diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.cpp index c99d6b3200f0..270336fd5e66 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.cpp @@ -6,7 +6,11 @@ namespace NKikimr::NOlap::NCompaction { bool TPortionColumnCursor::Fetch(TMergedColumn& column) { Y_ABORT_UNLESS(RecordIndexStart); // NActors::TLogContextGuard lg(NActors::TLogContextBuilder::Build()("portion_id", PortionId)); - CurrentChunk = BlobChunks->GetChunk(CurrentChunk, *RecordIndexStart); + if (CurrentChunk && CurrentChunk->GetStartPosition() <= *RecordIndexStart && *RecordIndexStart < CurrentChunk->GetFinishPosition()) { + + } else { + CurrentChunk = BlobChunks->GetChunk(CurrentChunk, *RecordIndexStart); + } ui32 currentStart = *RecordIndexStart; while (RecordIndexFinish >= CurrentChunk->GetFinishPosition()) { diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/merge_context.h b/ydb/core/tx/columnshard/engines/changes/compaction/merge_context.h index a5da857c2aff..80356224909f 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/merge_context.h +++ b/ydb/core/tx/columnshard/engines/changes/compaction/merge_context.h @@ -23,6 +23,7 @@ class TColumnMergeContext { YDB_READONLY(bool, UseWholeChunksOptimization, true); std::optional ColumnStat; + const TIndexInfo& IndexInfo; public: ISnapshotSchema::TPtr GetSchemaInfo() const { @@ -41,8 +42,9 @@ class TColumnMergeContext { return IndexInfo; } - TColumnMergeContext(const ui32 columnId, const ISnapshotSchema::TPtr& schema, const ui32 portionRowsCountLimit, const ui32 chunkRawBytesLimit, - const std::optional& columnStat) + TColumnMergeContext(const ui32 columnId, const ISnapshotSchema::TPtr& schema, const ui32 portionRowsCountLimit, + const ui32 chunkRawBytesLimit, const std::optional& columnStat, + const NArrow::NSerialization::TSerializerContainer& overrideSerializer) : ColumnId(columnId) , SchemaInfo(schema) , Saver(schema->GetColumnSaver(columnId)) @@ -52,10 +54,12 @@ class TColumnMergeContext { , ChunkRawBytesLimit(chunkRawBytesLimit) , UseWholeChunksOptimization(!schema->GetIndexInfo().GetReplaceKey()->GetFieldByName(ResultField->name())) , ColumnStat(columnStat) - , IndexInfo(schema->GetIndexInfo()) - { + , IndexInfo(schema->GetIndexInfo()) { Y_ABORT_UNLESS(PortionRowsCountLimit); Y_ABORT_UNLESS(ChunkRawBytesLimit); + if (!!overrideSerializer) { + Saver.ResetSerializer(overrideSerializer); + } } }; diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp index 5cbb99229d27..ca960f240cb9 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include namespace NKikimr::NOlap::NCompaction { @@ -77,8 +78,20 @@ std::vector TMerger::Execute(c for (auto&& batchResult : batchResults) { const ui32 portionRecordsCountLimit = batchResult->num_rows() / (batchResult->num_rows() / NSplitter::TSplitSettings().GetExpectedRecordsCountOnPage() + 1) + 1; + + NArrow::NSerialization::TSerializerContainer externalSaver; + if (OptimizationWritingPackMode) { + if (batchResult->num_rows() < 100) { + externalSaver = NArrow::NSerialization::TSerializerContainer( + std::make_shared(arrow::Compression::type::UNCOMPRESSED)); + } else { + externalSaver = NArrow::NSerialization::TSerializerContainer( + std::make_shared(arrow::Compression::type::LZ4_FRAME)); + } + } + NCompaction::TColumnMergeContext context(columnId, resultFiltered, portionRecordsCountLimit, - NSplitter::TSplitSettings().GetExpectedUnpackColumnChunkRawSize(), columnInfo); + NSplitter::TSplitSettings().GetExpectedUnpackColumnChunkRawSize(), columnInfo, externalSaver); NCompaction::TMergedColumn mColumn(context); auto columnPortionIdx = batchResult->GetColumnByName(portionIdFieldName); diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/merger.h b/ydb/core/tx/columnshard/engines/changes/compaction/merger.h index c41a99c53427..390c47e6e5a4 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/merger.h +++ b/ydb/core/tx/columnshard/engines/changes/compaction/merger.h @@ -10,6 +10,7 @@ namespace NKikimr::NOlap::NCompaction { class TMerger { private: + YDB_ACCESSOR(bool, OptimizationWritingPackMode, false); std::vector> Batches; std::vector> Filters; const TConstructionContext& Context; diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp index cd555b42f412..c2eb3ffeb396 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp +++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp @@ -204,7 +204,9 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont for (auto&& i : itGranule->second) { AFL_VERIFY(points.emplace(i, false).second); } - auto localAppended = NCompaction::TMerger(context, SaverContext, std::move(batches), std::move(filters)).Execute(stats, points, filteredSnapshot, pathId, shardingVersion); + NCompaction::TMerger merger(context, SaverContext, std::move(batches), std::move(filters)); + merger.SetOptimizationWritingPackMode(true); + auto localAppended = merger.Execute(stats, points, filteredSnapshot, pathId, shardingVersion); for (auto&& i : localAppended) { AppendedPortions.emplace_back(std::move(i)); } diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp index 4b1d779a6c10..3c4385b62fba 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp @@ -102,53 +102,6 @@ void TChangesWithAppend::DoOnAfterCompile() { } } -std::vector TChangesWithAppend::MakeAppendedPortions(const std::shared_ptr batch, - const ui64 pathId, const TSnapshot& snapshot, const TGranuleMeta* granuleMeta, TConstructionContext& context, const std::optional& overrideSaver) const { - Y_ABORT_UNLESS(batch->num_rows()); - - auto resultSchema = context.SchemaVersions.GetSchema(snapshot); - - std::shared_ptr stats = std::make_shared(); - if (granuleMeta) { - stats = granuleMeta->BuildSerializationStats(resultSchema); - } - auto schema = std::make_shared(resultSchema, stats); - if (overrideSaver) { - schema->SetOverrideSerializer(*overrideSaver); - } - std::vector out; - { - std::vector pages = TBatchSerializedSlice::BuildSimpleSlices(batch, NSplitter::TSplitSettings(), context.Counters.SplitterCounters, schema); - std::vector generalPages; - for (auto&& i : pages) { - generalPages.emplace_back(i.GetPortionChunksToHash(), schema, context.Counters.SplitterCounters); - } - - const NSplitter::TEntityGroups groups = resultSchema->GetIndexInfo().GetEntityGroupsByStorageId(IStoragesManager::DefaultStorageId, *SaverContext.GetStoragesManager()); - TSimilarPacker slicer(NSplitter::TSplitSettings().GetExpectedPortionSize()); - auto packs = slicer.Split(generalPages); - - ui32 recordIdx = 0; - for (auto&& i : packs) { - TGeneralSerializedSlice slicePrimary(std::move(i)); - auto dataWithSecondary = - resultSchema->GetIndexInfo().AppendIndexes(slicePrimary.GetPortionChunksToHash(), SaverContext.GetStoragesManager()).DetachResult(); - TGeneralSerializedSlice slice(dataWithSecondary.GetExternalData(), schema, context.Counters.SplitterCounters); - - auto b = batch->Slice(recordIdx, slice.GetRecordsCount()); - auto constructor = TWritePortionInfoWithBlobsConstructor::BuildByBlobs(slice.GroupChunksByBlobs(groups), - dataWithSecondary.GetSecondaryInplaceData(), pathId, resultSchema->GetVersion(), snapshot, SaverContext.GetStoragesManager()); - - constructor.GetPortionConstructor().AddMetadata(*resultSchema, b); - constructor.GetPortionConstructor().MutableMeta().SetTierName(IStoragesManager::DefaultStorageId); - out.emplace_back(std::move(constructor)); - recordIdx += slice.GetRecordsCount(); - } - } - - return out; -} - void TChangesWithAppend::DoStart(NColumnShard::TColumnShard& /*self*/) { } diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.h b/ydb/core/tx/columnshard/engines/changes/with_appended.h index 59fa8227dbab..e35dfbbe4acc 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.h +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.h @@ -17,8 +17,6 @@ class TChangesWithAppend: public TColumnEngineChanges { virtual void DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) override; virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) override; virtual void DoStart(NColumnShard::TColumnShard& self) override; - std::vector MakeAppendedPortions(const std::shared_ptr batch, const ui64 granule, - const TSnapshot& snapshot, const TGranuleMeta* granuleMeta, TConstructionContext& context, const std::optional& overrideSaver) const; virtual void DoDebugString(TStringOutput& out) const override { out << "remove=" << PortionsToRemove.size() << ";append=" << AppendedPortions.size() << ";"; From 03f1b2969059d332ab40a339b202ccefa0cc3671 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Sun, 21 Jul 2024 18:55:59 +0300 Subject: [PATCH 3/4] fix --- ydb/core/kqp/ut/olap/sys_view_ut.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/kqp/ut/olap/sys_view_ut.cpp b/ydb/core/kqp/ut/olap/sys_view_ut.cpp index dc234ac3bffe..b49726d5a01b 100644 --- a/ydb/core/kqp/ut/olap/sys_view_ut.cpp +++ b/ydb/core/kqp/ut/olap/sys_view_ut.cpp @@ -242,7 +242,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { ui64 rawBytes2; ui64 bytes2; helper.GetVolumes(rawBytes2, bytes2, false, {"new_column_ui64"}); - AFL_VERIFY(rawBytes2 == 6500041)("real", rawBytes2); + AFL_VERIFY(rawBytes2 == 6500023)("real", rawBytes2); AFL_VERIFY(bytes2 == 45360)("b", bytes2); } } From 4c91adb027b3deb8fc2751cf79fe2818b2d59ec8 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Mon, 22 Jul 2024 09:12:41 +0300 Subject: [PATCH 4/4] unify separation points usage --- ydb/core/formats/arrow/reader/merger.cpp | 4 +- ydb/core/formats/arrow/reader/merger.h | 2 +- ydb/core/formats/arrow/reader/position.h | 72 +++++++++++++++++++ ydb/core/kqp/ut/olap/sys_view_ut.cpp | 2 +- .../engines/changes/compaction/merger.cpp | 2 +- .../engines/changes/compaction/merger.h | 2 +- .../engines/changes/general_compaction.cpp | 4 +- .../engines/changes/general_compaction.h | 4 +- .../engines/changes/indexation.cpp | 6 +- .../columnshard/engines/changes/indexation.h | 2 +- .../engines/column_engine_logs.cpp | 2 +- .../engines/storage/granule/granule.h | 2 +- .../storage/optimizer/abstract/optimizer.h | 2 +- .../optimizer/lbuckets/planner/optimizer.h | 18 ++--- .../optimizer/sbuckets/index/bucket.cpp | 2 +- .../storage/optimizer/sbuckets/index/index.h | 6 +- .../optimizer/sbuckets/optimizer/optimizer.h | 2 +- 17 files changed, 101 insertions(+), 33 deletions(-) diff --git a/ydb/core/formats/arrow/reader/merger.cpp b/ydb/core/formats/arrow/reader/merger.cpp index ddae86c1ed28..a09983971be8 100644 --- a/ydb/core/formats/arrow/reader/merger.cpp +++ b/ydb/core/formats/arrow/reader/merger.cpp @@ -185,13 +185,13 @@ void TMergePartialStream::DrainCurrentPosition(TRecordBatchBuilder* builder, std SortHeap.CleanFinished(); } -std::vector> TMergePartialStream::DrainAllParts(const std::map& positions, +std::vector> TMergePartialStream::DrainAllParts(const TIntervalPositions& positions, const std::vector>& resultFields) { std::vector> result; for (auto&& i : positions) { TRecordBatchBuilder indexesBuilder(resultFields); - DrainCurrentTo(indexesBuilder, i.first, i.second); + DrainCurrentTo(indexesBuilder, i.GetPosition(), i.IsIncludedToLeftInterval()); result.emplace_back(indexesBuilder.Finalize()); if (result.back()->num_rows() == 0) { result.pop_back(); diff --git a/ydb/core/formats/arrow/reader/merger.h b/ydb/core/formats/arrow/reader/merger.h index 196edcd09e3c..972e891fe1fd 100644 --- a/ydb/core/formats/arrow/reader/merger.h +++ b/ydb/core/formats/arrow/reader/merger.h @@ -94,7 +94,7 @@ class TMergePartialStream { std::shared_ptr SingleSourceDrain(const TSortableBatchPosition& readTo, const bool includeFinish, std::optional* lastResultPosition = nullptr); bool DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish, std::optional* lastResultPosition = nullptr); bool DrainToControlPoint(TRecordBatchBuilder& builder, const bool includeFinish, std::optional* lastResultPosition = nullptr); - std::vector> DrainAllParts(const std::map& positions, + std::vector> DrainAllParts(const TIntervalPositions& positions, const std::vector>& resultFields); }; diff --git a/ydb/core/formats/arrow/reader/position.h b/ydb/core/formats/arrow/reader/position.h index 8a6e15fd79ac..5b8fd35d5bc7 100644 --- a/ydb/core/formats/arrow/reader/position.h +++ b/ydb/core/formats/arrow/reader/position.h @@ -404,6 +404,78 @@ class TSortableBatchPosition { }; +class TIntervalPosition { +private: + TSortableBatchPosition Position; + const bool LeftIntervalInclude; +public: + const TSortableBatchPosition& GetPosition() const { + return Position; + } + bool IsIncludedToLeftInterval() const { + return LeftIntervalInclude; + } + TIntervalPosition(TSortableBatchPosition&& position, const bool leftIntervalInclude) + : Position(std::move(position)) + , LeftIntervalInclude(leftIntervalInclude) { + + } + + TIntervalPosition(const TSortableBatchPosition& position, const bool leftIntervalInclude) + : Position(position) + , LeftIntervalInclude(leftIntervalInclude) { + + } + + bool operator<(const TIntervalPosition& item) const { + std::partial_ordering cmp = Position.Compare(item.Position); + if (cmp == std::partial_ordering::equivalent) { + return (LeftIntervalInclude ? 1 : 0) < (item.LeftIntervalInclude ? 1 : 0); + } + return cmp == std::partial_ordering::less; + } + + NJson::TJsonValue DebugJson() const { + NJson::TJsonValue result = NJson::JSON_MAP; + result.InsertValue("position", Position.DebugJson()); + result.InsertValue("include", LeftIntervalInclude); + return result; + } +}; + +class TIntervalPositions { +private: + std::vector Positions; +public: + bool IsEmpty() const { + return Positions.empty(); + } + + std::vector::const_iterator begin() const { + return Positions.begin(); + } + + std::vector::const_iterator end() const { + return Positions.end(); + } + + void AddPosition(TSortableBatchPosition&& position, const bool includeLeftInterval) { + TIntervalPosition intervalPosition(std::move(position), includeLeftInterval); + if (Positions.size()) { + AFL_VERIFY(Positions.back() < intervalPosition)("back", Positions.back().DebugJson())("pos", intervalPosition.DebugJson()); + } + Positions.emplace_back(std::move(intervalPosition)); + } + + void AddPosition(const TSortableBatchPosition& position, const bool includeLeftInterval) { + TIntervalPosition intervalPosition(position, includeLeftInterval); + if (Positions.size()) { + AFL_VERIFY(Positions.back() < intervalPosition)("back", Positions.back().DebugJson())("pos", intervalPosition.DebugJson()); + } + Positions.emplace_back(std::move(intervalPosition)); + } +}; + class TRWSortableBatchPosition: public TSortableBatchPosition, public TMoveOnly { private: using TBase = TSortableBatchPosition; diff --git a/ydb/core/kqp/ut/olap/sys_view_ut.cpp b/ydb/core/kqp/ut/olap/sys_view_ut.cpp index b49726d5a01b..13ff57223d8e 100644 --- a/ydb/core/kqp/ut/olap/sys_view_ut.cpp +++ b/ydb/core/kqp/ut/olap/sys_view_ut.cpp @@ -243,7 +243,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { ui64 bytes2; helper.GetVolumes(rawBytes2, bytes2, false, {"new_column_ui64"}); AFL_VERIFY(rawBytes2 == 6500023)("real", rawBytes2); - AFL_VERIFY(bytes2 == 45360)("b", bytes2); + AFL_VERIFY(bytes2 == 38880)("b", bytes2); } } diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp index ca960f240cb9..0b2e93b16d27 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp @@ -14,7 +14,7 @@ namespace NKikimr::NOlap::NCompaction { std::vector TMerger::Execute(const std::shared_ptr& stats, - const std::map& checkPoints, const std::shared_ptr& resultFiltered, + const NArrow::NMerger::TIntervalPositions& checkPoints, const std::shared_ptr& resultFiltered, const ui64 pathId, const std::optional shardingActualVersion) { AFL_VERIFY(Batches.size() == Filters.size()); static const TString portionIdFieldName = "$$__portion_id"; diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/merger.h b/ydb/core/tx/columnshard/engines/changes/compaction/merger.h index 390c47e6e5a4..be9beae47584 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/merger.h +++ b/ydb/core/tx/columnshard/engines/changes/compaction/merger.h @@ -42,7 +42,7 @@ class TMerger { std::vector Execute( const std::shared_ptr& stats, - const std::map& checkPoints, + const NArrow::NMerger::TIntervalPositions& checkPoints, const std::shared_ptr& resultFiltered, const ui64 pathId, const std::optional shardingActualVersion); }; } diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp index 047e4b137a7f..bfbd1eac5133 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp @@ -212,8 +212,8 @@ NColumnShard::ECumulativeCounters TGeneralCompactColumnEngineChanges::GetCounter } void TGeneralCompactColumnEngineChanges::AddCheckPoint( - const NArrow::NMerger::TSortableBatchPosition& position, const bool include, const bool validationDuplications) { - AFL_VERIFY(CheckPoints.emplace(position, include).second || !validationDuplications); + const NArrow::NMerger::TSortableBatchPosition& position, const bool include) { + CheckPoints.AddPosition(position, include); } std::shared_ptr TGeneralCompactColumnEngineChanges::BuildMemoryPredictor() { diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.h b/ydb/core/tx/columnshard/engines/changes/general_compaction.h index 29bf0e99b9cb..ab6f1e18684e 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.h +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.h @@ -9,7 +9,7 @@ class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges { private: using TBase = TCompactColumnEngineChanges; virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) override; - std::map CheckPoints; + NArrow::NMerger::TIntervalPositions CheckPoints; void BuildAppendedPortionsByChunks(TConstructionContext& context, std::vector&& portions) noexcept; std::shared_ptr BuildPortionFilter(const std::optional& shardingActual, @@ -63,7 +63,7 @@ class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges { static std::shared_ptr BuildMemoryPredictor(); - void AddCheckPoint(const NArrow::NMerger::TSortableBatchPosition& position, const bool include = true, const bool validationDuplications = true); + void AddCheckPoint(const NArrow::NMerger::TSortableBatchPosition& position, const bool include); virtual TString TypeString() const override { return StaticTypeName(); diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp index c2eb3ffeb396..9fced30f5c9e 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp +++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp @@ -200,13 +200,9 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont auto itGranule = PathToGranule.find(pathId); AFL_VERIFY(itGranule != PathToGranule.end()); - std::map points; - for (auto&& i : itGranule->second) { - AFL_VERIFY(points.emplace(i, false).second); - } NCompaction::TMerger merger(context, SaverContext, std::move(batches), std::move(filters)); merger.SetOptimizationWritingPackMode(true); - auto localAppended = merger.Execute(stats, points, filteredSnapshot, pathId, shardingVersion); + auto localAppended = merger.Execute(stats, itGranule->second, filteredSnapshot, pathId, shardingVersion); for (auto&& i : localAppended) { AppendedPortions.emplace_back(std::move(i)); } diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.h b/ydb/core/tx/columnshard/engines/changes/indexation.h index d9a1b7c14599..d130612b7451 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.h +++ b/ydb/core/tx/columnshard/engines/changes/indexation.h @@ -32,7 +32,7 @@ class TInsertColumnEngineChanges: public TChangesWithAppend { } public: - THashMap> PathToGranule; // pathId -> positions (sorted by pk) + THashMap PathToGranule; // pathId -> positions (sorted by pk) public: TInsertColumnEngineChanges(std::vector&& dataToIndex, const TSaverContext& saverContext) : TBase(saverContext, NBlobOperations::EConsumer::INDEXATION) diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 51a2bad07ea9..e7fda23311ae 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -285,7 +285,7 @@ std::shared_ptr TColumnEngineForLogs::StartInsert(st if (changes->PathToGranule.contains(pathId)) { continue; } - changes->PathToGranule[pathId] = GetGranulePtrVerified(pathId)->GetBucketPositions(); + AFL_VERIFY(changes->PathToGranule.emplace(pathId, GetGranulePtrVerified(pathId)->GetBucketPositions()).second); } return changes; diff --git a/ydb/core/tx/columnshard/engines/storage/granule/granule.h b/ydb/core/tx/columnshard/engines/storage/granule/granule.h index 3b3db33a72cf..c8b3e302f1e7 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule/granule.h +++ b/ydb/core/tx/columnshard/engines/storage/granule/granule.h @@ -196,7 +196,7 @@ class TGranuleMeta: TNonCopyable { return OptimizerPlanner->SerializeToJsonVisual(); } - std::vector GetBucketPositions() const { + NArrow::NMerger::TIntervalPositions GetBucketPositions() const { return OptimizerPlanner->GetBucketPositions(); } diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h index 21647072eb60..4bd196e552d0 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h @@ -134,7 +134,7 @@ class IOptimizerPlanner { return DoDebugString(); } - virtual std::vector GetBucketPositions() const = 0; + virtual NArrow::NMerger::TIntervalPositions GetBucketPositions() const = 0; bool IsLocked(const std::shared_ptr& dataLocksManager) const { return DoIsLocked(dataLocksManager); } diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/planner/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/planner/optimizer.h index 246b349f2a77..f83183c04e39 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/planner/optimizer.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/planner/optimizer.h @@ -899,15 +899,15 @@ class TPortionsBucket: public TMoveOnly { auto result = std::make_shared(granule, portions, saverContext); if (MainPortion) { NArrow::NMerger::TSortableBatchPosition pos(MainPortion->IndexKeyStart().ToBatch(primaryKeysSchema), 0, primaryKeysSchema->field_names(), {}, false); - result->AddCheckPoint(pos, false, false); + result->AddCheckPoint(pos, false); } if (!nextBorder && MainPortion && !forceMergeForTests) { NArrow::NMerger::TSortableBatchPosition pos(MainPortion->IndexKeyEnd().ToBatch(primaryKeysSchema), 0, primaryKeysSchema->field_names(), {}, false); - result->AddCheckPoint(pos, true, false); + result->AddCheckPoint(pos, true); } if (stopPoint) { NArrow::NMerger::TSortableBatchPosition pos(stopPoint->ToBatch(primaryKeysSchema), 0, primaryKeysSchema->field_names(), {}, false); - result->AddCheckPoint(pos, false, false); + result->AddCheckPoint(pos, false); } return result; } @@ -1181,15 +1181,15 @@ class TPortionBuckets { } } - std::vector GetBucketPositions() const { - std::vector result; + NArrow::NMerger::TIntervalPositions GetBucketPositions() const { + NArrow::NMerger::TIntervalPositions result; for (auto&& i : Buckets) { AFL_VERIFY(i.second->GetStartPos()); - result.emplace_back(*i.second->GetStartPos()); + result.AddPosition(*i.second->GetStartPos(), false); } - if (Buckets.size()) { + if (Buckets.size() && Buckets.rbegin()->second->GetPortion()->GetRecordsCount() > 1) { NArrow::NMerger::TSortableBatchPosition pos(Buckets.rbegin()->second->GetPortion()->IndexKeyEnd().ToBatch(PrimaryKeysSchema), 0, PrimaryKeysSchema->field_names(), {}, false); - result.emplace_back(pos); + result.AddPosition(std::move(pos), false); } return result; } @@ -1254,7 +1254,7 @@ class TOptimizerPlanner: public IOptimizerPlanner { public: - virtual std::vector GetBucketPositions() const override { + virtual NArrow::NMerger::TIntervalPositions GetBucketPositions() const override { return Buckets.GetBucketPositions(); } diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/sbuckets/index/bucket.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/sbuckets/index/bucket.cpp index 4fd497984104..5c70d26a38a3 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/sbuckets/index/bucket.cpp +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/sbuckets/index/bucket.cpp @@ -28,7 +28,7 @@ std::shared_ptr TPortionsBucket::BuildOpti auto result = std::make_shared(granule, context.GetPortions(), saverContext); for (auto&& i : context.GetSplitRightOpenIntervalPoints()) { NArrow::NMerger::TSortableBatchPosition pos(i.ToBatch(primaryKeysSchema), 0, primaryKeysSchema->field_names(), {}, false); - result->AddCheckPoint(pos, false, false); + result->AddCheckPoint(pos, false); } return result; } diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/sbuckets/index/index.h b/ydb/core/tx/columnshard/engines/storage/optimizer/sbuckets/index/index.h index a551fbbb8a1d..56bddb8547fb 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/sbuckets/index/index.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/sbuckets/index/index.h @@ -220,14 +220,14 @@ class TPortionBuckets { return bucketForOptimization->BuildOptimizationTask(granule, locksManager, PrimaryKeysSchema, StoragesManager); } - std::vector GetBucketPositions() const { - std::vector result; + NArrow::NMerger::TIntervalPositions GetBucketPositions() const { + NArrow::NMerger::TIntervalPositions result; for (auto&& i : Buckets) { if (!i.first.HasValue()) { continue; } NArrow::NMerger::TSortableBatchPosition posStart(i.first.GetValueVerified().ToBatch(PrimaryKeysSchema), 0, PrimaryKeysSchema->field_names(), {}, false); - result.emplace_back(posStart); + result.AddPosition(std::move(posStart), false); } return result; } diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/sbuckets/optimizer/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/sbuckets/optimizer/optimizer.h index 4e8595e20f1d..7d756f09deff 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/sbuckets/optimizer/optimizer.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/sbuckets/optimizer/optimizer.h @@ -64,7 +64,7 @@ class TOptimizerPlanner: public IOptimizerPlanner { } public: - virtual std::vector GetBucketPositions() const override { + virtual NArrow::NMerger::TIntervalPositions GetBucketPositions() const override { return Buckets.GetBucketPositions(); }