From 506253677f97f866e19cbf0885ac95de850e6ab3 Mon Sep 17 00:00:00 2001 From: Semyon Yentsov Date: Fri, 21 Feb 2025 19:30:35 +0300 Subject: [PATCH 1/9] squashed --- .../tx/columnshard/counters/portion_index.cpp | 6 +- .../engines/reader/common/description.h | 1 - .../constructor/read_metadata.cpp | 7 + .../common_reader/constructor/read_metadata.h | 7 +- .../common_reader/iterator/columns_set.h | 9 ++ .../common_reader/iterator/fetch_steps.h | 2 +- .../common_reader/iterator/fetching.cpp | 57 +++++---- .../reader/common_reader/iterator/fetching.h | 72 +++++------ .../reader/common_reader/iterator/ya.make | 1 + .../plain_reader/constructor/constructor.cpp | 3 +- .../reader/plain_reader/iterator/context.cpp | 120 +++++++++--------- .../simple_reader/constructor/constructor.cpp | 3 +- .../reader/simple_reader/iterator/context.cpp | 66 +++++----- .../simple_reader/iterator/fetching.cpp | 7 +- 14 files changed, 185 insertions(+), 176 deletions(-) diff --git a/ydb/core/tx/columnshard/counters/portion_index.cpp b/ydb/core/tx/columnshard/counters/portion_index.cpp index 0a148c302c20..5e3c8ff7ce28 100644 --- a/ydb/core/tx/columnshard/counters/portion_index.cpp +++ b/ydb/core/tx/columnshard/counters/portion_index.cpp @@ -25,7 +25,7 @@ void TPortionIndexStats::RemovePortion(const NOlap::TPortionInfo& portion) { { auto findClass = TotalStats.find(portionClass); - AFL_VERIFY(!findClass.IsEnd())("path_id", portion.GetPathId()); + AFL_VERIFY(findClass != TotalStats.end())("path_id", portion.GetPathId()); findClass->second.RemovePortion(portion); if (findClass->second.IsEmpty()) { TotalStats.erase(findClass); @@ -34,9 +34,9 @@ void TPortionIndexStats::RemovePortion(const NOlap::TPortionInfo& portion) { { auto findPathId = StatsByPathId.find(portion.GetPathId()); - AFL_VERIFY(!findPathId.IsEnd())("path_id", portion.GetPathId()); + AFL_VERIFY(findPathId != StatsByPathId.end())("path_id", portion.GetPathId()); auto findClass = findPathId->second.find(portionClass); - AFL_VERIFY(!findClass.IsEnd())("path_id", portion.GetPathId()); + AFL_VERIFY(findClass != findPathId->second.end())("path_id", portion.GetPathId()); findClass->second.RemovePortion(portion); if (findClass->second.IsEmpty()) { findPathId->second.erase(findClass); diff --git a/ydb/core/tx/columnshard/engines/reader/common/description.h b/ydb/core/tx/columnshard/engines/reader/common/description.h index 9be71450515e..e81333389d12 100644 --- a/ydb/core/tx/columnshard/engines/reader/common/description.h +++ b/ydb/core/tx/columnshard/engines/reader/common/description.h @@ -31,7 +31,6 @@ struct TReadDescription { std::vector ColumnIds; const std::shared_ptr& GetScanCursor() const { - AFL_VERIFY(ScanCursor); return ScanCursor; } diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp index dfb8fc36f341..8239e3e161ac 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp @@ -42,6 +42,13 @@ TConclusionStatus TReadMetadata::Init( return TConclusionStatus::Success(); } +TReadMetadata::TReadMetadata(const std::shared_ptr& schemaIndex, const TReadDescription& read) + : TBase(schemaIndex, read.PKRangesFilter->IsReverse() ? TReadMetadataBase::ESorting::DESC : TReadMetadataBase::ESorting::ASC, + read.GetProgram(), schemaIndex->GetSchemaVerified(read.GetSnapshot()), read.GetSnapshot(), read.GetScanCursor()) + , PathId(read.PathId) + , ReadStats(std::make_shared()) { +} + std::set TReadMetadata::GetEarlyFilterColumnIds() const { auto& indexInfo = ResultIndexSchema->GetIndexInfo(); const auto& ids = GetProgram().GetEarlyFilterColumns(); diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.h index b7d87c2b3812..c4ff8ac695ff 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.h +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.h @@ -117,12 +117,7 @@ class TReadMetadata: public TReadMetadataBase { NYql::NDqProto::EDqStatsMode StatsMode = NYql::NDqProto::EDqStatsMode::DQ_STATS_MODE_NONE; std::shared_ptr ReadStats; - TReadMetadata(const ui64 pathId, const std::shared_ptr info, const TSnapshot& snapshot, const ESorting sorting, - const TProgramContainer& ssaProgram, const std::shared_ptr& scanCursor) - : TBase(info, sorting, ssaProgram, info->GetSchemaVerified(snapshot), snapshot, scanCursor) - , PathId(pathId) - , ReadStats(std::make_shared()) { - } + TReadMetadata(const std::shared_ptr& schemaIndex, const TReadDescription& read); virtual std::vector GetKeyYqlSchema() const override { return GetResultSchema()->GetIndexInfo().GetPrimaryKeyColumns(); diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/columns_set.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/columns_set.h index 45cbf7c2c951..a21652522430 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/columns_set.h +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/columns_set.h @@ -85,6 +85,15 @@ class TColumnsSetIds { } return result; } + + TColumnsSetIds& operator+=(const TColumnsSetIds& external) { + return (*this = *this + external); + } + + TColumnsSetIds& operator-=(const TColumnsSetIds& external) { + return (*this = *this - external); + } + bool IsEmpty() const { return ColumnIds.empty(); } diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h index fa6f44309f18..7dd9730ef281 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h @@ -126,7 +126,7 @@ class TOptionalAssemblerStep: public IFetchingStep { class TColumnBlobsFetchingStep: public IFetchingStep { private: using TBase = IFetchingStep; - TColumnsSetIds Columns; + YDB_READONLY_DEF(TColumnsSetIds, Columns); protected: virtual TConclusion DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const override; diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp index 937f049a43b9..dd52eea54909 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp @@ -92,12 +92,9 @@ TString TFetchingScript::DebugString() const { return sb; } -TFetchingScript::TFetchingScript(const TSpecialReadContext& /*context*/) { -} - -void TFetchingScript::Allocation(const std::set& entityIds, const EStageFeaturesIndexes stage, const EMemType mType) { +void TFetchingScriptBuilder::AddAllocation(const std::set& entityIds, const EStageFeaturesIndexes stage, const EMemType mType) { if (Steps.size() == 0) { - AddStep(entityIds, mType, stage); + AddStep(std::make_shared(entityIds, mType, stage)); } else { std::optional addIndex; for (i32 i = Steps.size() - 1; i >= 0; --i) { @@ -130,42 +127,52 @@ TString IFetchingStep::DebugString() const { return sb; } -bool TColumnsAccumulator::AddFetchingStep(TFetchingScript& script, const TColumnsSetIds& columns, const EStageFeaturesIndexes stage) { - auto actualColumns = GetNotFetchedAlready(columns); - FetchingReadyColumns = FetchingReadyColumns + (TColumnsSetIds)columns; - if (!actualColumns.IsEmpty()) { - script.Allocation(columns.GetColumnIds(), stage, EMemType::Blob); - script.AddStep(std::make_shared(actualColumns)); - return true; +TFetchingScriptBuilder::TFetchingScriptBuilder(const TSpecialReadContext& context) + : GuaranteeNotOptional(context.GetMergeColumns()) + , FullSchema(context.GetReadMetadata()->GetResultSchema()) { +} + +void TFetchingScriptBuilder::AddFetchingStep(const TColumnsSetIds& columns, const EStageFeaturesIndexes stage) { + auto actualColumns = columns - AddedFetchingColumns; + AddedFetchingColumns += columns; + if (actualColumns.IsEmpty()) { + return; + } + if (Steps.size() && std::dynamic_pointer_cast(Steps.back())) { + TColumnsSetIds fetchingColumns = actualColumns + std::dynamic_pointer_cast(Steps.back())->GetColumns(); + Steps.pop_back(); + AddAllocation(actualColumns.GetColumnIds(), stage, EMemType::Blob); + AddStep(std::make_shared(fetchingColumns)); + } else { + AddAllocation(actualColumns.GetColumnIds(), stage, EMemType::Blob); + AddStep(std::make_shared(actualColumns)); } - return false; } -bool TColumnsAccumulator::AddAssembleStep( - TFetchingScript& script, const TColumnsSetIds& columns, const TString& purposeId, const EStageFeaturesIndexes stage, const bool sequential) { - auto actualColumns = columns - AssemblerReadyColumns; - AssemblerReadyColumns = AssemblerReadyColumns + columns; +void TFetchingScriptBuilder::AddAssembleStep( + const TColumnsSetIds& columns, const TString& purposeId, const EStageFeaturesIndexes stage, const bool sequential) { + auto actualColumns = columns - AddedAssembleColumns; + AddedAssembleColumns += columns; if (actualColumns.IsEmpty()) { - return false; + return; } auto actualSet = std::make_shared(actualColumns.GetColumnIds(), FullSchema); if (sequential) { const auto notSequentialColumnIds = GuaranteeNotOptional->Intersect(*actualSet); if (notSequentialColumnIds.size()) { - script.Allocation(notSequentialColumnIds, stage, EMemType::Raw); + AddAllocation(notSequentialColumnIds, stage, EMemType::Raw); std::shared_ptr cross = actualSet->BuildSamePtr(notSequentialColumnIds); - script.AddStep(cross, purposeId); + AddStep(std::make_shared(cross, purposeId)); *actualSet = *actualSet - *cross; } if (!actualSet->IsEmpty()) { - script.Allocation(notSequentialColumnIds, stage, EMemType::RawSequential); - script.AddStep(actualSet, purposeId); + AddAllocation(notSequentialColumnIds, stage, EMemType::RawSequential); + AddStep(std::make_shared(actualSet, purposeId)); } } else { - script.Allocation(actualColumns.GetColumnIds(), stage, EMemType::Raw); - script.AddStep(actualSet, purposeId); + AddAllocation(actualColumns.GetColumnIds(), stage, EMemType::Raw); + AddStep(std::make_shared(actualSet, purposeId)); } - return true; } TConclusion TProgramStepPrepare::DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const { diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h index 0656a2b9b858..056742d5e116 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h @@ -95,6 +95,10 @@ class IFetchingStep: public TNonCopyable { Signals.AddBytes(size); } + virtual bool Merge(const std::shared_ptr& /*nextStep*/) { + return false; + } + virtual ~IFetchingStep() = default; [[nodiscard]] TConclusion ExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const { @@ -115,15 +119,16 @@ class IFetchingStep: public TNonCopyable { class TFetchingScript { private: - YDB_ACCESSOR(TString, BranchName, "UNDEFINED"); + YDB_READONLY_DEF(TString, BranchName); std::vector> Steps; TAtomic StartInstant; TAtomic FinishInstant; public: - TFetchingScript(const TSpecialReadContext& context); - - void Allocation(const std::set& entityIds, const EStageFeaturesIndexes stage, const EMemType mType); + TFetchingScript(const TString& branchName, std::vector>&& steps) + : BranchName(branchName) + , Steps(std::move(steps)) { + } void AddStepDataSize(const ui32 index, const ui64 size) { GetStep(index)->AddDataSize(size); @@ -145,26 +150,6 @@ class TFetchingScript { return Steps[index]; } - template - std::shared_ptr AddStep(Args... args) { - auto result = std::make_shared(args...); - Steps.emplace_back(result); - return result; - } - - template - std::shared_ptr InsertStep(const ui32 index, Args... args) { - AFL_VERIFY(index <= Steps.size())("index", index)("size", Steps.size()); - auto result = std::make_shared(args...); - Steps.insert(Steps.begin() + index, result); - return result; - } - - void AddStep(const std::shared_ptr& step) { - AFL_VERIFY(step); - Steps.emplace_back(step); - } - bool IsFinished(const ui32 currentStepIdx) const { AFL_VERIFY(currentStepIdx <= Steps.size()); return currentStepIdx == Steps.size(); @@ -231,26 +216,41 @@ class TFetchingScriptOwner: TNonCopyable { } }; -class TColumnsAccumulator { +class TFetchingScriptBuilder { private: - TColumnsSetIds FetchingReadyColumns; - TColumnsSetIds AssemblerReadyColumns; - ISnapshotSchema::TPtr FullSchema; std::shared_ptr GuaranteeNotOptional; + ISnapshotSchema::TPtr FullSchema; + + YDB_ACCESSOR(TString, BranchName, "UNDEFINED"); + std::vector> Steps; + YDB_READONLY_DEF(TColumnsSetIds, AddedFetchingColumns); + YDB_READONLY_DEF(TColumnsSetIds, AddedAssembleColumns); + +private: + void AddAllocation(const std::set& entityIds, const EStageFeaturesIndexes stage, const EMemType mType); + + template + std::shared_ptr InsertStep(const ui32 index, Args... args) { + AFL_VERIFY(index <= Steps.size())("index", index)("size", Steps.size()); + auto result = std::make_shared(args...); + Steps.insert(Steps.begin() + index, result); + return result; + } public: - TColumnsAccumulator(const std::shared_ptr& guaranteeNotOptional, const ISnapshotSchema::TPtr& fullSchema) - : FullSchema(fullSchema) - , GuaranteeNotOptional(guaranteeNotOptional) { + TFetchingScriptBuilder(const TSpecialReadContext& context); + + std::shared_ptr Build()&& { + return std::make_shared(BranchName, std::move(Steps)); } - TColumnsSetIds GetNotFetchedAlready(const TColumnsSetIds& columns) const { - return columns - FetchingReadyColumns; + void AddStep(const std::shared_ptr& step) { + AFL_VERIFY(step); + Steps.emplace_back(step); } - bool AddFetchingStep(TFetchingScript& script, const TColumnsSetIds& columns, const EStageFeaturesIndexes stage); - bool AddAssembleStep(TFetchingScript& script, const TColumnsSetIds& columns, const TString& purposeId, const EStageFeaturesIndexes stage, - const bool sequential); + void AddFetchingStep(const TColumnsSetIds& columns, const EStageFeaturesIndexes stage); + void AddAssembleStep(const TColumnsSetIds& columns, const TString& purposeId, const EStageFeaturesIndexes stage, const bool sequential); }; class TFetchingScriptCursor { diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/ya.make b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/ya.make index 6b4fdd179183..3f2f3295a9e0 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/ya.make +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/ya.make @@ -15,6 +15,7 @@ SRCS( PEERDIR( ydb/core/tx/columnshard/engines/scheme + yql/essentials/minikql ) GENERATE_ENUM_SERIALIZATION(columns_set.h) diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/constructor.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/constructor.cpp index ef01545efc96..d3654d881f57 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/constructor.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/constructor.cpp @@ -35,8 +35,7 @@ NKikimr::TConclusion> TIndexScannerConstructo TDataStorageAccessor dataAccessor(insertTable, index); AFL_VERIFY(read.PathId); - auto readMetadata = std::make_shared(read.PathId, index->CopyVersionedIndexPtr(), read.GetSnapshot(), - IsReverse ? TReadMetadataBase::ESorting::DESC : TReadMetadataBase::ESorting::ASC, read.GetProgram(), nullptr); + auto readMetadata = std::make_shared(index->CopyVersionedIndexPtr(), read); auto initResult = readMetadata->Init(self, read, dataAccessor); if (!initResult) { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp index f2001c538487..50d188e82bdb 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp @@ -28,12 +28,13 @@ std::shared_ptr TSpecialReadContext::DoGetColumnsFetchingPlan(c auto source = std::static_pointer_cast(sourceExt); if (source->NeedAccessorsFetching()) { if (!AskAccumulatorsScript) { - AskAccumulatorsScript = std::make_shared(*this); + NCommon::TFetchingScriptBuilder acc(*this); if (ui64 size = source->PredictAccessorsMemory()) { - AskAccumulatorsScript->AddStep(size, EStageFeaturesIndexes::Accessors); + acc.AddStep(std::make_shared(size, EStageFeaturesIndexes::Accessors)); } - AskAccumulatorsScript->AddStep(); - AskAccumulatorsScript->AddStep(*GetFFColumns()); + acc.AddStep(std::make_shared()); + acc.AddStep(std::make_shared(*GetFFColumns())); + AskAccumulatorsScript = std::move(acc).Build(); } return AskAccumulatorsScript; } @@ -72,126 +73,123 @@ std::shared_ptr TSpecialReadContext::DoGetColumnsFetchingPlan(c if (result.HasScript()) { return result.GetScriptVerified(); } else { - std::shared_ptr result = std::make_shared(*this); - result->SetBranchName("FAKE"); - result->AddStep(); - return result; + NCommon::TFetchingScriptBuilder acc(*this); + acc.SetBranchName("FAKE"); + acc.AddStep(std::make_shared()); + return std::move(acc).Build(); } } } std::shared_ptr TSpecialReadContext::BuildColumnsFetchingPlan(const bool needSnapshots, const bool exclusiveSource, const bool partialUsageByPredicateExt, const bool useIndexes, const bool needFilterSharding, const bool needFilterDeletion) const { - std::shared_ptr result = std::make_shared(*this); const bool partialUsageByPredicate = partialUsageByPredicateExt && GetPredicateColumns()->GetColumnsCount(); - NCommon::TColumnsAccumulator acc(GetMergeColumns(), GetReadMetadata()->GetResultSchema()); + NCommon::TFetchingScriptBuilder acc(*this); if (!!IndexChecker && useIndexes && exclusiveSource) { - result->AddStep(std::make_shared(std::make_shared(IndexChecker->GetIndexIds()))); - result->AddStep(std::make_shared(IndexChecker)); + acc.AddStep(std::make_shared(std::make_shared(IndexChecker->GetIndexIds()))); + acc.AddStep(std::make_shared(IndexChecker)); } bool hasFilterSharding = false; if (needFilterSharding && !GetShardingColumns()->IsEmpty()) { hasFilterSharding = true; - TColumnsSetIds columnsFetch = *GetShardingColumns(); + acc.AddFetchingStep(*GetShardingColumns(), EStageFeaturesIndexes::Filter); if (!exclusiveSource) { - columnsFetch = columnsFetch + *GetPKColumns() + *GetSpecColumns(); + acc.AddFetchingStep(*GetPKColumns(), EStageFeaturesIndexes::Filter); + acc.AddFetchingStep(*GetSpecColumns(), EStageFeaturesIndexes::Filter); } - acc.AddFetchingStep(*result, columnsFetch, EStageFeaturesIndexes::Filter); - acc.AddAssembleStep(*result, columnsFetch, "SPEC_SHARDING", EStageFeaturesIndexes::Filter, false); - result->AddStep(std::make_shared()); + acc.AddAssembleStep(acc.GetAddedFetchingColumns(), "SPEC_SHARDING", EStageFeaturesIndexes::Filter, false); + acc.AddStep(std::make_shared()); } if (!GetEFColumns()->GetColumnsCount() && !partialUsageByPredicate) { - result->SetBranchName("simple"); - TColumnsSetIds columnsFetch = *GetFFColumns(); + acc.SetBranchName("simple"); + acc.AddFetchingStep(*GetFFColumns(), EStageFeaturesIndexes::Fetching); if (needFilterDeletion) { - columnsFetch = columnsFetch + *GetDeletionColumns(); + acc.AddFetchingStep(*GetDeletionColumns(), EStageFeaturesIndexes::Fetching); } if (needSnapshots) { - columnsFetch = columnsFetch + *GetSpecColumns(); + acc.AddFetchingStep(*GetSpecColumns(), EStageFeaturesIndexes::Fetching); } if (!exclusiveSource) { - columnsFetch = columnsFetch + *GetMergeColumns(); + acc.AddFetchingStep(*GetMergeColumns(), EStageFeaturesIndexes::Fetching); } else { - if (columnsFetch.GetColumnsCount() == 1 && GetSpecColumns()->Contains(columnsFetch) && !hasFilterSharding) { + if (acc.GetAddedFetchingColumns().GetColumnsCount() == 1 && GetSpecColumns()->Contains(acc.GetAddedFetchingColumns()) && !hasFilterSharding) { return nullptr; } } - if (columnsFetch.GetColumnsCount() || hasFilterSharding || needFilterDeletion) { - acc.AddFetchingStep(*result, columnsFetch, EStageFeaturesIndexes::Fetching); + if (acc.GetAddedFetchingColumns().GetColumnsCount() || hasFilterSharding || needFilterDeletion) { if (needSnapshots) { - acc.AddAssembleStep(*result, *GetSpecColumns(), "SPEC", EStageFeaturesIndexes::Fetching, false); + acc.AddAssembleStep(*GetSpecColumns(), "SPEC", EStageFeaturesIndexes::Fetching, false); } if (!exclusiveSource) { - acc.AddAssembleStep(*result, *GetMergeColumns(), "LAST_PK", EStageFeaturesIndexes::Fetching, false); + acc.AddAssembleStep(*GetMergeColumns(), "LAST_PK", EStageFeaturesIndexes::Fetching, false); } if (needSnapshots) { - result->AddStep(std::make_shared()); + acc.AddStep(std::make_shared()); } if (needFilterDeletion) { - acc.AddAssembleStep(*result, *GetDeletionColumns(), "SPEC_DELETION", EStageFeaturesIndexes::Fetching, false); - result->AddStep(std::make_shared()); + acc.AddAssembleStep(*GetDeletionColumns(), "SPEC_DELETION", EStageFeaturesIndexes::Fetching, false); + acc.AddStep(std::make_shared()); } - acc.AddAssembleStep(*result, columnsFetch, "LAST", EStageFeaturesIndexes::Fetching, !exclusiveSource); + acc.AddAssembleStep(acc.GetAddedFetchingColumns().GetColumnIds(), "LAST", EStageFeaturesIndexes::Fetching, !exclusiveSource); } else { return nullptr; } } else if (exclusiveSource) { - result->SetBranchName("exclusive"); - TColumnsSet columnsFetch = *GetEFColumns(); + acc.SetBranchName("exclusive"); + acc.AddFetchingStep(*GetEFColumns(), EStageFeaturesIndexes::Filter); if (needFilterDeletion) { - columnsFetch = columnsFetch + *GetDeletionColumns(); + acc.AddFetchingStep(*GetDeletionColumns(), EStageFeaturesIndexes::Filter); } if (needSnapshots || GetFFColumns()->Cross(*GetSpecColumns())) { - columnsFetch = columnsFetch + *GetSpecColumns(); + acc.AddFetchingStep(*GetSpecColumns(), EStageFeaturesIndexes::Filter); } if (partialUsageByPredicate) { - columnsFetch = columnsFetch + *GetPredicateColumns(); + acc.AddFetchingStep(*GetPredicateColumns(), EStageFeaturesIndexes::Filter); } - AFL_VERIFY(columnsFetch.GetColumnsCount()); - acc.AddFetchingStep(*result, columnsFetch, EStageFeaturesIndexes::Filter); + AFL_VERIFY(acc.GetAddedFetchingColumns().GetColumnsCount()); if (needFilterDeletion) { - acc.AddAssembleStep(*result, *GetDeletionColumns(), "SPEC_DELETION", EStageFeaturesIndexes::Filter, false); - result->AddStep(std::make_shared()); + acc.AddAssembleStep(*GetDeletionColumns(), "SPEC_DELETION", EStageFeaturesIndexes::Filter, false); + acc.AddStep(std::make_shared()); } if (partialUsageByPredicate) { - acc.AddAssembleStep(*result, *GetPredicateColumns(), "PREDICATE", EStageFeaturesIndexes::Filter, false); - result->AddStep(std::make_shared()); + acc.AddAssembleStep(*GetPredicateColumns(), "PREDICATE", EStageFeaturesIndexes::Filter, false); + acc.AddStep(std::make_shared()); } if (needSnapshots || GetFFColumns()->Cross(*GetSpecColumns())) { - acc.AddAssembleStep(*result, *GetSpecColumns(), "SPEC", EStageFeaturesIndexes::Filter, false); - result->AddStep(std::make_shared()); + acc.AddAssembleStep(*GetSpecColumns(), "SPEC", EStageFeaturesIndexes::Filter, false); + acc.AddStep(std::make_shared()); } - acc.AddFetchingStep(*result, *GetFFColumns(), EStageFeaturesIndexes::Fetching); - acc.AddAssembleStep(*result, *GetFFColumns(), "LAST", EStageFeaturesIndexes::Fetching, !exclusiveSource); + acc.AddFetchingStep(*GetFFColumns(), EStageFeaturesIndexes::Fetching); + acc.AddAssembleStep(*GetFFColumns(), "LAST", EStageFeaturesIndexes::Fetching, !exclusiveSource); } else { - result->SetBranchName("merge"); - TColumnsSet columnsFetch = *GetMergeColumns() + *GetEFColumns(); + acc.SetBranchName("merge"); + acc.AddFetchingStep(*GetMergeColumns(), EStageFeaturesIndexes::Filter); + acc.AddFetchingStep(*GetEFColumns(), EStageFeaturesIndexes::Filter); if (needFilterDeletion) { - columnsFetch = columnsFetch + *GetDeletionColumns(); + acc.AddFetchingStep(*GetDeletionColumns(), EStageFeaturesIndexes::Filter); } - AFL_VERIFY(columnsFetch.GetColumnsCount()); - acc.AddFetchingStep(*result, columnsFetch, EStageFeaturesIndexes::Filter); + AFL_VERIFY(acc.GetAddedFetchingColumns().GetColumnsCount()); - acc.AddAssembleStep(*result, *GetSpecColumns(), "SPEC", EStageFeaturesIndexes::Filter, false); - acc.AddAssembleStep(*result, *GetPKColumns(), "PK", EStageFeaturesIndexes::Filter, false); + acc.AddAssembleStep(*GetSpecColumns(), "SPEC", EStageFeaturesIndexes::Filter, false); + acc.AddAssembleStep(*GetPKColumns(), "PK", EStageFeaturesIndexes::Filter, false); if (needSnapshots) { - result->AddStep(std::make_shared()); + acc.AddStep(std::make_shared()); } if (needFilterDeletion) { - acc.AddAssembleStep(*result, *GetDeletionColumns(), "SPEC_DELETION", EStageFeaturesIndexes::Filter, false); - result->AddStep(std::make_shared()); + acc.AddAssembleStep(*GetDeletionColumns(), "SPEC_DELETION", EStageFeaturesIndexes::Filter, false); + acc.AddStep(std::make_shared()); } if (partialUsageByPredicate) { - result->AddStep(std::make_shared()); + acc.AddStep(std::make_shared()); } - acc.AddFetchingStep(*result, *GetFFColumns(), EStageFeaturesIndexes::Fetching); - acc.AddAssembleStep(*result, *GetFFColumns(), "LAST", EStageFeaturesIndexes::Fetching, !exclusiveSource); + acc.AddFetchingStep(*GetFFColumns(), EStageFeaturesIndexes::Fetching); + acc.AddAssembleStep(*GetFFColumns(), "LAST", EStageFeaturesIndexes::Fetching, !exclusiveSource); } - result->AddStep(); - return result; + acc.AddStep(std::make_shared()); + return std::move(acc).Build(); } TSpecialReadContext::TSpecialReadContext(const std::shared_ptr& commonContext) diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/constructor.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/constructor.cpp index b7034b00d179..07b1a7653b12 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/constructor.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/constructor.cpp @@ -34,8 +34,7 @@ NKikimr::TConclusion> TIndexScannerConstructo TDataStorageAccessor dataAccessor(insertTable, index); AFL_VERIFY(read.PathId); - auto readMetadata = std::make_shared(read.PathId, index->CopyVersionedIndexPtr(), read.GetSnapshot(), - IsReverse ? TReadMetadataBase::ESorting::DESC : TReadMetadataBase::ESorting::ASC, read.GetProgram(), read.GetScanCursor()); + auto readMetadata = std::make_shared(index->CopyVersionedIndexPtr(), read); auto initResult = readMetadata->Init(self, read, dataAccessor); if (!initResult) { diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp index 3e62f0bd61f4..10548370a005 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp @@ -13,11 +13,12 @@ std::shared_ptr TSpecialReadContext::DoGetColumnsFetchingPlan(c GetFFColumns()->GetColumnIds().contains(NOlap::NPortion::TSpecialColumns::SPEC_COL_PLAN_STEP_INDEX); if (!dontNeedColumns && !source->GetStageData().HasPortionAccessor()) { if (!AskAccumulatorsScript) { - AskAccumulatorsScript = std::make_shared(*this); - AskAccumulatorsScript->AddStep( - source->PredictAccessorsSize(GetFFColumns()->GetColumnIds()), EStageFeaturesIndexes::Accessors); - AskAccumulatorsScript->AddStep(); - AskAccumulatorsScript->AddStep(*GetFFColumns()); + NCommon::TFetchingScriptBuilder acc(*this); + acc.AddStep(std::make_shared + (source->PredictAccessorsSize(GetFFColumns()->GetColumnIds()), EStageFeaturesIndexes::Accessors)); + acc.AddStep(std::make_shared()); + acc.AddStep(std::make_shared(*GetFFColumns())); + AskAccumulatorsScript = std::move(acc).Build(); } return AskAccumulatorsScript; } @@ -57,68 +58,63 @@ std::shared_ptr TSpecialReadContext::DoGetColumnsFetchingPlan(c std::shared_ptr TSpecialReadContext::BuildColumnsFetchingPlan(const bool needSnapshots, const bool partialUsageByPredicateExt, const bool useIndexes, const bool needFilterSharding, const bool needFilterDeletion) const { - std::shared_ptr result = std::make_shared(*this); const bool partialUsageByPredicate = partialUsageByPredicateExt && GetPredicateColumns()->GetColumnsCount(); - NCommon::TColumnsAccumulator acc(GetMergeColumns(), GetReadMetadata()->GetResultSchema()); + NCommon::TFetchingScriptBuilder acc(*this); if (!!IndexChecker && useIndexes) { - result->AddStep(std::make_shared(std::make_shared(IndexChecker->GetIndexIds()))); - result->AddStep(std::make_shared(IndexChecker)); + acc.AddStep(std::make_shared(std::make_shared(IndexChecker->GetIndexIds()))); + acc.AddStep(std::make_shared(IndexChecker)); } if (needFilterSharding && !GetShardingColumns()->IsEmpty()) { const TColumnsSetIds columnsFetch = *GetShardingColumns(); - acc.AddFetchingStep(*result, columnsFetch, EStageFeaturesIndexes::Filter); - acc.AddAssembleStep(*result, columnsFetch, "SPEC_SHARDING", EStageFeaturesIndexes::Filter, false); - result->AddStep(std::make_shared()); + acc.AddFetchingStep(columnsFetch, EStageFeaturesIndexes::Filter); + acc.AddAssembleStep(columnsFetch, "SPEC_SHARDING", EStageFeaturesIndexes::Filter, false); + acc.AddStep(std::make_shared()); } { - result->SetBranchName("exclusive"); - TColumnsSet columnsFetch; + acc.SetBranchName("exclusive"); + acc.AddFetchingStep(*GetEFColumns(), EStageFeaturesIndexes::Filter); if (needFilterDeletion) { - columnsFetch = columnsFetch + *GetDeletionColumns(); + acc.AddFetchingStep(*GetDeletionColumns(), EStageFeaturesIndexes::Filter); } if (needSnapshots || GetFFColumns()->Cross(*GetSpecColumns())) { - columnsFetch = columnsFetch + *GetSpecColumns(); + acc.AddFetchingStep(*GetSpecColumns(), EStageFeaturesIndexes::Filter); } if (partialUsageByPredicate) { - columnsFetch = columnsFetch + *GetPredicateColumns(); - } - - if (columnsFetch.GetColumnsCount()) { - acc.AddFetchingStep(*result, columnsFetch, EStageFeaturesIndexes::Filter); + acc.AddFetchingStep(*GetPredicateColumns(), EStageFeaturesIndexes::Filter); } if (needFilterDeletion) { - acc.AddAssembleStep(*result, *GetDeletionColumns(), "SPEC_DELETION", EStageFeaturesIndexes::Filter, false); - result->AddStep(std::make_shared()); + acc.AddAssembleStep(*GetDeletionColumns(), "SPEC_DELETION", EStageFeaturesIndexes::Filter, false); + acc.AddStep(std::make_shared()); } if (partialUsageByPredicate) { - acc.AddAssembleStep(*result, *GetPredicateColumns(), "PREDICATE", EStageFeaturesIndexes::Filter, false); - result->AddStep(std::make_shared()); + acc.AddAssembleStep(*GetPredicateColumns(), "PREDICATE", EStageFeaturesIndexes::Filter, false); + acc.AddStep(std::make_shared()); } if (needSnapshots || GetFFColumns()->Cross(*GetSpecColumns())) { - acc.AddAssembleStep(*result, *GetSpecColumns(), "SPEC", EStageFeaturesIndexes::Filter, false); - result->AddStep(std::make_shared()); + acc.AddAssembleStep(*GetSpecColumns(), "SPEC", EStageFeaturesIndexes::Filter, false); + acc.AddStep(std::make_shared()); } const auto& chainProgram = GetReadMetadata()->GetProgram().GetChainVerified(); for (ui32 stepIdx = 0; stepIdx < chainProgram->GetProcessors().size(); ++stepIdx) { auto& step = chainProgram->GetProcessors()[stepIdx]; if (!chainProgram->GetLastOriginalDataFilter() && GetReadMetadata()->HasLimit() && step->GetProcessorType() == NArrow::NSSA::EProcessorType::Projection) { - result->AddStep(std::make_shared(GetReadMetadata()->GetLimitRobust(), GetReadMetadata()->IsDescSorted())); + acc.AddStep(std::make_shared(GetReadMetadata()->GetLimitRobust(), GetReadMetadata()->IsDescSorted())); } - result->AddStep(std::make_shared(step)); - result->AddStep(std::make_shared(step)); - result->AddStep(std::make_shared(step)); + acc.AddStep(std::make_shared(step)); + acc.AddStep(std::make_shared(step)); + acc.AddStep(std::make_shared(step)); if (step->GetProcessorType() == NArrow::NSSA::EProcessorType::Filter && GetReadMetadata()->HasLimit() && chainProgram->GetLastOriginalDataFilter() == stepIdx) { - result->AddStep(std::make_shared(GetReadMetadata()->GetLimitRobust(), GetReadMetadata()->IsDescSorted())); + acc.AddStep(std::make_shared(GetReadMetadata()->GetLimitRobust(), GetReadMetadata()->IsDescSorted())); } } } - result->AddStep(); - result->AddStep(); - return result; + acc.AddStep(std::make_shared()); + acc.AddStep(std::make_shared()); + return std::move(acc).Build(); } TSpecialReadContext::TSpecialReadContext(const std::shared_ptr& commonContext) diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp index ee4dbdb5d376..ae9caccadf19 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp @@ -8,8 +8,6 @@ #include -#include - namespace NKikimr::NOlap::NReader::NSimple { TConclusion TIndexBlobsFetchingStep::DoExecuteInplace( @@ -170,7 +168,7 @@ TConclusion TBuildResultStep::DoExecuteInplace(const std::shared_ptr TPrepareResultStep::DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { - std::shared_ptr plan = std::make_shared(*source->GetContext()); + NCommon::TFetchingScriptBuilder acc(*source->GetContext()); if (source->IsSourceInMemory()) { AFL_VERIFY(source->GetStageResult().GetPagesToResultVerified().size() == 1); } @@ -179,8 +177,9 @@ TConclusion TPrepareResultStep::DoExecuteInplace(const std::shared_ptrGetSourceId(), i.GetIndexStart(), i.GetRecordsCount())) { continue; } - plan->AddStep(i.GetIndexStart(), i.GetRecordsCount()); + acc.AddStep(std::make_shared(i.GetIndexStart(), i.GetRecordsCount())); } + auto plan = std::move(acc).Build(); AFL_VERIFY(!plan->IsFinished(0)); source->InitFetchingPlan(plan); From 483ff2c3b97e9cab5daacdc1c8488aabe8b788f2 Mon Sep 17 00:00:00 2001 From: Semyon Yentsov Date: Mon, 3 Mar 2025 19:29:35 +0300 Subject: [PATCH 2/9] clean up --- .../engines/reader/common_reader/iterator/fetching.h | 4 ---- 1 file changed, 4 deletions(-) diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h index 056742d5e116..15ebf3520882 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h @@ -95,10 +95,6 @@ class IFetchingStep: public TNonCopyable { Signals.AddBytes(size); } - virtual bool Merge(const std::shared_ptr& /*nextStep*/) { - return false; - } - virtual ~IFetchingStep() = default; [[nodiscard]] TConclusion ExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const { From 1961ed86d0ab18e2549085e8a58dc190b2142fc5 Mon Sep 17 00:00:00 2001 From: Semyon Yentsov Date: Tue, 4 Mar 2025 14:11:49 +0300 Subject: [PATCH 3/9] rename --- ydb/core/tx/columnshard/engines/reader/common/description.h | 2 +- .../engines/reader/common_reader/constructor/read_metadata.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/core/tx/columnshard/engines/reader/common/description.h b/ydb/core/tx/columnshard/engines/reader/common/description.h index e81333389d12..6c9798e0994b 100644 --- a/ydb/core/tx/columnshard/engines/reader/common/description.h +++ b/ydb/core/tx/columnshard/engines/reader/common/description.h @@ -30,7 +30,7 @@ struct TReadDescription { // List of columns std::vector ColumnIds; - const std::shared_ptr& GetScanCursor() const { + const std::shared_ptr& GetScanCursorOptional() const { return ScanCursor; } diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp index 8239e3e161ac..5ce5b2ebc9e8 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp @@ -44,7 +44,7 @@ TConclusionStatus TReadMetadata::Init( TReadMetadata::TReadMetadata(const std::shared_ptr& schemaIndex, const TReadDescription& read) : TBase(schemaIndex, read.PKRangesFilter->IsReverse() ? TReadMetadataBase::ESorting::DESC : TReadMetadataBase::ESorting::ASC, - read.GetProgram(), schemaIndex->GetSchemaVerified(read.GetSnapshot()), read.GetSnapshot(), read.GetScanCursor()) + read.GetProgram(), schemaIndex->GetSchemaVerified(read.GetSnapshot()), read.GetSnapshot(), read.GetScanCursorOptional()) , PathId(read.PathId) , ReadStats(std::make_shared()) { } From 2790c4134c0fcafaf38e086fa821bd482c6c163b Mon Sep 17 00:00:00 2001 From: Semyon Yentsov Date: Tue, 4 Mar 2025 16:08:19 +0300 Subject: [PATCH 4/9] add ut --- .../simple_reader/iterator/fetching.cpp | 10 +++ .../reader/simple_reader/iterator/fetching.h | 9 +-- .../tx/columnshard/engines/ut/ut_script.cpp | 61 +++++++++++++++++++ ydb/core/tx/columnshard/engines/ut/ya.make | 1 + 4 files changed, 74 insertions(+), 7 deletions(-) create mode 100644 ydb/core/tx/columnshard/engines/ut/ut_script.cpp diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp index ae9caccadf19..63fd491cb010 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp @@ -10,6 +10,16 @@ namespace NKikimr::NOlap::NReader::NSimple { +TConclusion IFetchingStep::DoExecuteInplace( + const std::shared_ptr& sourceExt, const TFetchingScriptCursor& step) const { + const auto source = std::static_pointer_cast(sourceExt); + return DoExecuteInplace(source, step); +} + +ui64 IFetchingStep::GetProcessingDataSize(const std::shared_ptr& source) const { + return GetProcessingDataSize(std::static_pointer_cast(source)); +} + TConclusion TIndexBlobsFetchingStep::DoExecuteInplace( const std::shared_ptr& source, const TFetchingScriptCursor& step) const { return !source->StartFetchingIndexes(source, step, Indexes); diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h index 460ec3b44190..d6538ae2fdf6 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h @@ -32,14 +32,9 @@ class IFetchingStep: public NCommon::IFetchingStep { } virtual TConclusion DoExecuteInplace( - const std::shared_ptr& sourceExt, const TFetchingScriptCursor& step) const override final { - const auto source = std::static_pointer_cast(sourceExt); - return DoExecuteInplace(source, step); - } + const std::shared_ptr& sourceExt, const TFetchingScriptCursor& step) const override final; - virtual ui64 GetProcessingDataSize(const std::shared_ptr& source) const override final { - return GetProcessingDataSize(std::static_pointer_cast(source)); - } + virtual ui64 GetProcessingDataSize(const std::shared_ptr& source) const override final; public: using TBase::TBase; diff --git a/ydb/core/tx/columnshard/engines/ut/ut_script.cpp b/ydb/core/tx/columnshard/engines/ut/ut_script.cpp new file mode 100644 index 000000000000..8b78eb5762a1 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/ut/ut_script.cpp @@ -0,0 +1,61 @@ +#include + +#include +#include +#include +#include +#include + +/* + TReadContext(const std::shared_ptr& storagesManager, + const std::shared_ptr& dataAccessorsManager, + const NColumnShard::TConcreteScanCounters& counters, const TReadMetadataBase::TConstPtr& readMetadata, const TActorId& scanActorId, + const TActorId& resourceSubscribeActorId, const TActorId& readCoordinatorActorId, const TComputeShardingPolicy& computeShardingPolicy, + const ui64 scanId); +*/ + +using namespace NKikimr; +using namespace NKikimr::NOlap; +using namespace NKikimr::NOlap::NReader; + +Y_UNIT_TEST_SUITE(TestScript) { + NSimple::TSpecialReadContext MakeTestReadContext(const THashMap& columns, const std::vector pkIds = {0}) { + auto columnsCopy = columns; + for (ui64 i = 0; i < pkIds.size(); ++i) { + TValidator::CheckNotNull(columnsCopy.FindPtr(pkIds[i]))->KeyOrder = i; + } + + const auto versionedIndex = std::make_shared(); + auto cache = std::make_shared(); + TIndexInfo info = TIndexInfo::BuildDefault(TTestStoragesManager::GetInstance(), columns, pkIds); + AFL_VERIFY(info.BuildFromProto({}, nullptr, cache)); + versionedIndex->AddIndex(TSnapshot::Zero(), cache->UpsertIndexInfo(0, std::move(info))); + + const TReadDescription read(TSnapshot::Max(), false); + + const auto base = std::make_shared(TTestStoragesManager::GetInstance(), nullptr, + NKikimr::NColumnShard::TConcreteScanCounters(NKikimr::NColumnShard::TScanCounters()), + std::make_shared(versionedIndex, read), NActors::TActorId(), NActors::TActorId(), NActors::TActorId(), + NKikimr::NOlap::NReader::TComputeShardingPolicy(), 0); + return NSimple::TSpecialReadContext(base); + } + + Y_UNIT_TEST(StepMerging) { + NCommon::TFetchingScriptBuilder acc( + MakeTestReadContext({ { 0, NTable::TColumn("c0", 0, NScheme::TTypeInfo(NScheme::NTypeIds::Int32), "") }, + { 1, NTable::TColumn("c1", 0, NScheme::TTypeInfo(NScheme::NTypeIds::Int32), "") }, + { 2, NTable::TColumn("c2", 0, NScheme::TTypeInfo(NScheme::NTypeIds::Int32), "") } })); + + acc.AddFetchingStep(std::vector({ 0 }), NSimple::EStageFeaturesIndexes::Filter); + acc.AddFetchingStep(std::vector({ 0 }), NSimple::EStageFeaturesIndexes::Filter); + acc.AddAssembleStep(std::vector({ 0 }), "", NSimple::EStageFeaturesIndexes::Filter, false); + + acc.AddFetchingStep(std::vector({ 0, 1 }), NSimple::EStageFeaturesIndexes::Filter); + acc.AddFetchingStep(std::vector({ 1, 2 }), NSimple::EStageFeaturesIndexes::Fetching); + acc.AddFetchingStep(std::vector({ 0 }), NSimple::EStageFeaturesIndexes::Fetching); + acc.AddFetchingStep(std::vector({ 0 }), NSimple::EStageFeaturesIndexes::Merge); + + auto script = std::move(acc).Build(); + UNIT_ASSERT_STRINGS_EQUAL(script->DebugString(), ""); + } +} diff --git a/ydb/core/tx/columnshard/engines/ut/ya.make b/ydb/core/tx/columnshard/engines/ut/ya.make index 5b6490e05838..215cfe63763d 100644 --- a/ydb/core/tx/columnshard/engines/ut/ya.make +++ b/ydb/core/tx/columnshard/engines/ut/ya.make @@ -34,6 +34,7 @@ SRCS( ut_insert_table.cpp ut_logs_engine.cpp ut_program.cpp + ut_script.cpp helper.cpp ) From 900cbbd369b0d97853ca17142a9a0881058b37a6 Mon Sep 17 00:00:00 2001 From: Semyon Yentsov Date: Tue, 4 Mar 2025 16:15:43 +0300 Subject: [PATCH 5/9] fix ut --- ydb/core/tx/columnshard/engines/ut/ut_script.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/ydb/core/tx/columnshard/engines/ut/ut_script.cpp b/ydb/core/tx/columnshard/engines/ut/ut_script.cpp index 8b78eb5762a1..8b0916e62d05 100644 --- a/ydb/core/tx/columnshard/engines/ut/ut_script.cpp +++ b/ydb/core/tx/columnshard/engines/ut/ut_script.cpp @@ -19,10 +19,9 @@ using namespace NKikimr::NOlap; using namespace NKikimr::NOlap::NReader; Y_UNIT_TEST_SUITE(TestScript) { - NSimple::TSpecialReadContext MakeTestReadContext(const THashMap& columns, const std::vector pkIds = {0}) { - auto columnsCopy = columns; + NSimple::TSpecialReadContext MakeTestReadContext(THashMap columns, const std::vector pkIds = {0}) { for (ui64 i = 0; i < pkIds.size(); ++i) { - TValidator::CheckNotNull(columnsCopy.FindPtr(pkIds[i]))->KeyOrder = i; + TValidator::CheckNotNull(columns.FindPtr(pkIds[i]))->KeyOrder = i; } const auto versionedIndex = std::make_shared(); From 978d940f32b14899d829452fe7e358610bb31b76 Mon Sep 17 00:00:00 2001 From: Semyon Yentsov Date: Tue, 4 Mar 2025 20:09:11 +0300 Subject: [PATCH 6/9] add ut --- .../common_reader/iterator/fetching.cpp | 7 +-- .../reader/common_reader/iterator/fetching.h | 11 +++- .../tx/columnshard/engines/ut/ut_script.cpp | 55 +++++++------------ 3 files changed, 34 insertions(+), 39 deletions(-) diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp index dd52eea54909..6f73d8cc5a7e 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp @@ -72,11 +72,11 @@ TConclusion TFetchingScriptCursor::Execute(const std::shared_ptrGetSumDuration() > TDuration::MilliSeconds(10)) { + if (!onlyLongSteps || i->GetSumDuration() > TDuration::MilliSeconds(10)) { sbBranch << "{" << i->DebugString() << "};"; } } @@ -128,8 +128,7 @@ TString IFetchingStep::DebugString() const { } TFetchingScriptBuilder::TFetchingScriptBuilder(const TSpecialReadContext& context) - : GuaranteeNotOptional(context.GetMergeColumns()) - , FullSchema(context.GetReadMetadata()->GetResultSchema()) { + : TFetchingScriptBuilder(context.GetReadMetadata()->GetResultSchema(), context.GetMergeColumns()) { } void TFetchingScriptBuilder::AddFetchingStep(const TColumnsSetIds& columns, const EStageFeaturesIndexes stage) { diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h index 15ebf3520882..21717f9f3c45 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h @@ -139,7 +139,7 @@ class TFetchingScript { AtomicCas(&StartInstant, TMonotonic::Now().MicroSeconds(), 0); } - TString DebugString() const; + TString DebugString(const bool onlyLongSteps = true) const; const std::shared_ptr& GetStep(const ui32 index) const { AFL_VERIFY(index < Steps.size()); @@ -222,6 +222,11 @@ class TFetchingScriptBuilder { YDB_READONLY_DEF(TColumnsSetIds, AddedFetchingColumns); YDB_READONLY_DEF(TColumnsSetIds, AddedAssembleColumns); + TFetchingScriptBuilder(const ISnapshotSchema::TPtr& schema, const std::shared_ptr& guaranteeNotOptional) + : GuaranteeNotOptional(guaranteeNotOptional) + , FullSchema(schema) { + } + private: void AddAllocation(const std::set& entityIds, const EStageFeaturesIndexes stage, const EMemType mType); @@ -247,6 +252,10 @@ class TFetchingScriptBuilder { void AddFetchingStep(const TColumnsSetIds& columns, const EStageFeaturesIndexes stage); void AddAssembleStep(const TColumnsSetIds& columns, const TString& purposeId, const EStageFeaturesIndexes stage, const bool sequential); + + static TFetchingScriptBuilder MakeForTests(ISnapshotSchema::TPtr schema, std::shared_ptr guaranteeNotOptional = nullptr) { + return TFetchingScriptBuilder(schema, guaranteeNotOptional ? guaranteeNotOptional : std::make_shared()); + } }; class TFetchingScriptCursor { diff --git a/ydb/core/tx/columnshard/engines/ut/ut_script.cpp b/ydb/core/tx/columnshard/engines/ut/ut_script.cpp index 8b0916e62d05..7cf132d28231 100644 --- a/ydb/core/tx/columnshard/engines/ut/ut_script.cpp +++ b/ydb/core/tx/columnshard/engines/ut/ut_script.cpp @@ -1,60 +1,47 @@ #include #include -#include -#include #include -#include - -/* - TReadContext(const std::shared_ptr& storagesManager, - const std::shared_ptr& dataAccessorsManager, - const NColumnShard::TConcreteScanCounters& counters, const TReadMetadataBase::TConstPtr& readMetadata, const TActorId& scanActorId, - const TActorId& resourceSubscribeActorId, const TActorId& readCoordinatorActorId, const TComputeShardingPolicy& computeShardingPolicy, - const ui64 scanId); -*/ +#include using namespace NKikimr; using namespace NKikimr::NOlap; using namespace NKikimr::NOlap::NReader; Y_UNIT_TEST_SUITE(TestScript) { - NSimple::TSpecialReadContext MakeTestReadContext(THashMap columns, const std::vector pkIds = {0}) { + std::shared_ptr MakeTestSchema(THashMap columns, const std::vector pkIds = { 0 }) { for (ui64 i = 0; i < pkIds.size(); ++i) { TValidator::CheckNotNull(columns.FindPtr(pkIds[i]))->KeyOrder = i; } - const auto versionedIndex = std::make_shared(); auto cache = std::make_shared(); TIndexInfo info = TIndexInfo::BuildDefault(TTestStoragesManager::GetInstance(), columns, pkIds); - AFL_VERIFY(info.BuildFromProto({}, nullptr, cache)); - versionedIndex->AddIndex(TSnapshot::Zero(), cache->UpsertIndexInfo(0, std::move(info))); - - const TReadDescription read(TSnapshot::Max(), false); - - const auto base = std::make_shared(TTestStoragesManager::GetInstance(), nullptr, - NKikimr::NColumnShard::TConcreteScanCounters(NKikimr::NColumnShard::TScanCounters()), - std::make_shared(versionedIndex, read), NActors::TActorId(), NActors::TActorId(), NActors::TActorId(), - NKikimr::NOlap::NReader::TComputeShardingPolicy(), 0); - return NSimple::TSpecialReadContext(base); + return std::make_shared(cache->UpsertIndexInfo(0, std::move(info)), TSnapshot(1, 1)); } Y_UNIT_TEST(StepMerging) { - NCommon::TFetchingScriptBuilder acc( - MakeTestReadContext({ { 0, NTable::TColumn("c0", 0, NScheme::TTypeInfo(NScheme::NTypeIds::Int32), "") }, + NCommon::TFetchingScriptBuilder acc = NCommon::TFetchingScriptBuilder::MakeForTests( + MakeTestSchema({ { 0, NTable::TColumn("c0", 0, NScheme::TTypeInfo(NScheme::NTypeIds::Int32), "") }, { 1, NTable::TColumn("c1", 0, NScheme::TTypeInfo(NScheme::NTypeIds::Int32), "") }, { 2, NTable::TColumn("c2", 0, NScheme::TTypeInfo(NScheme::NTypeIds::Int32), "") } })); - acc.AddFetchingStep(std::vector({ 0 }), NSimple::EStageFeaturesIndexes::Filter); - acc.AddFetchingStep(std::vector({ 0 }), NSimple::EStageFeaturesIndexes::Filter); - acc.AddAssembleStep(std::vector({ 0 }), "", NSimple::EStageFeaturesIndexes::Filter, false); - - acc.AddFetchingStep(std::vector({ 0, 1 }), NSimple::EStageFeaturesIndexes::Filter); - acc.AddFetchingStep(std::vector({ 1, 2 }), NSimple::EStageFeaturesIndexes::Fetching); - acc.AddFetchingStep(std::vector({ 0 }), NSimple::EStageFeaturesIndexes::Fetching); - acc.AddFetchingStep(std::vector({ 0 }), NSimple::EStageFeaturesIndexes::Merge); + acc.AddFetchingStep(std::vector({ 0 }), NCommon::EStageFeaturesIndexes::Filter); + acc.AddFetchingStep(std::vector({ 0 }), NCommon::EStageFeaturesIndexes::Filter); + acc.AddAssembleStep(std::vector({ 0 }), "", NCommon::EStageFeaturesIndexes::Filter, false); + acc.AddFetchingStep(std::vector({ 0, 1 }), NCommon::EStageFeaturesIndexes::Filter); + acc.AddFetchingStep(std::vector({ 1, 2 }), NCommon::EStageFeaturesIndexes::Fetching); + acc.AddFetchingStep(std::vector({ 0 }), NCommon::EStageFeaturesIndexes::Fetching); + acc.AddAssembleStep(std::vector({ 0, 1, 2 }), "", NCommon::EStageFeaturesIndexes::Fetching, false); + acc.AddFetchingStep(std::vector({ 0 }), NCommon::EStageFeaturesIndexes::Merge); auto script = std::move(acc).Build(); - UNIT_ASSERT_STRINGS_EQUAL(script->DebugString(), ""); + UNIT_ASSERT_STRINGS_EQUAL(script->DebugString(false), + "{branch:UNDEFINED;steps_10Ms:[" + "{name=ALLOCATE_MEMORY::Filter;duration=0.000000s;size=0;details={stage=Filter;};};" + "{name=ALLOCATE_MEMORY::Fetching;duration=0.000000s;size=0;details={stage=Fetching;};};" + "{name=FETCHING_COLUMNS;duration=0.000000s;size=0;details={columns=0;};};" + "{name=ASSEMBLER;duration=0.000000s;size=0;details={columns=(column_ids=0;column_names=c0;);;};};" + "{name=FETCHING_COLUMNS;duration=0.000000s;size=0;details={columns=1,2;};};" + "{name=ASSEMBLER;duration=0.000000s;size=0;details={columns=(column_ids=1,2;column_names=c1,c2;);;};};]}"); } } From 69308b28c83dcb9527940209eed18253b2ab3d43 Mon Sep 17 00:00:00 2001 From: Semyon Yentsov Date: Thu, 6 Mar 2025 13:04:00 +0300 Subject: [PATCH 7/9] fix ut --- ydb/core/tx/columnshard/engines/ut/ut_script.cpp | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/ydb/core/tx/columnshard/engines/ut/ut_script.cpp b/ydb/core/tx/columnshard/engines/ut/ut_script.cpp index 7cf132d28231..7e8bb304fe0f 100644 --- a/ydb/core/tx/columnshard/engines/ut/ut_script.cpp +++ b/ydb/core/tx/columnshard/engines/ut/ut_script.cpp @@ -2,6 +2,7 @@ #include #include +#include #include using namespace NKikimr; @@ -28,20 +29,25 @@ Y_UNIT_TEST_SUITE(TestScript) { acc.AddFetchingStep(std::vector({ 0 }), NCommon::EStageFeaturesIndexes::Filter); acc.AddFetchingStep(std::vector({ 0 }), NCommon::EStageFeaturesIndexes::Filter); acc.AddAssembleStep(std::vector({ 0 }), "", NCommon::EStageFeaturesIndexes::Filter, false); + acc.AddStep(std::make_shared()); acc.AddFetchingStep(std::vector({ 0, 1 }), NCommon::EStageFeaturesIndexes::Filter); acc.AddFetchingStep(std::vector({ 1, 2 }), NCommon::EStageFeaturesIndexes::Fetching); acc.AddFetchingStep(std::vector({ 0 }), NCommon::EStageFeaturesIndexes::Fetching); acc.AddAssembleStep(std::vector({ 0, 1, 2 }), "", NCommon::EStageFeaturesIndexes::Fetching, false); + acc.AddStep(std::make_shared()); acc.AddFetchingStep(std::vector({ 0 }), NCommon::EStageFeaturesIndexes::Merge); auto script = std::move(acc).Build(); UNIT_ASSERT_STRINGS_EQUAL(script->DebugString(false), "{branch:UNDEFINED;steps_10Ms:[" "{name=ALLOCATE_MEMORY::Filter;duration=0.000000s;size=0;details={stage=Filter;};};" - "{name=ALLOCATE_MEMORY::Fetching;duration=0.000000s;size=0;details={stage=Fetching;};};" "{name=FETCHING_COLUMNS;duration=0.000000s;size=0;details={columns=0;};};" "{name=ASSEMBLER;duration=0.000000s;size=0;details={columns=(column_ids=0;column_names=c0;);;};};" + "{name=DELETION;duration=0.000000s;size=0;details={};};" + "{name=ALLOCATE_MEMORY::Filter;duration=0.000000s;size=0;details={stage=Filter;};};" + "{name=ALLOCATE_MEMORY::Fetching;duration=0.000000s;size=0;details={stage=Fetching;};};" "{name=FETCHING_COLUMNS;duration=0.000000s;size=0;details={columns=1,2;};};" - "{name=ASSEMBLER;duration=0.000000s;size=0;details={columns=(column_ids=1,2;column_names=c1,c2;);;};};]}"); + "{name=ASSEMBLER;duration=0.000000s;size=0;details={columns=(column_ids=1,2;column_names=c1,c2;);;};};" + "{name=DELETION;duration=0.000000s;size=0;details={};};]}"); } } From b76a755ca3446fff851eea78f2e600d73e07a1f7 Mon Sep 17 00:00:00 2001 From: Semyon Yentsov Date: Thu, 6 Mar 2025 16:55:18 +0300 Subject: [PATCH 8/9] simplify ut check --- .../common_reader/iterator/fetch_steps.h | 8 ++++- .../common_reader/iterator/fetching.cpp | 29 +++++++++++++++---- .../reader/common_reader/iterator/fetching.h | 9 +++--- .../reader/plain_reader/iterator/context.cpp | 2 +- .../reader/simple_reader/iterator/context.cpp | 2 +- .../tx/columnshard/engines/ut/ut_script.cpp | 20 ++++++------- 6 files changed, 47 insertions(+), 23 deletions(-) diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h index 7dd9730ef281..0123678d75c9 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h @@ -43,7 +43,13 @@ class TAllocateMemoryStep: public IFetchingStep { virtual TConclusion DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const override; virtual ui64 GetProcessingDataSize(const std::shared_ptr& source) const override; virtual TString DoDebugString() const override { - return TStringBuilder() << "stage=" << StageIndex << ";"; + std::vector columns; + for (const auto& pack : Packs) { + for (const ui32 columnId : pack.GetColumns().GetColumnIds()) { + columns.emplace_back(TStringBuilder() << pack.GetMemType() << ':' << columnId); + } + } + return TStringBuilder() << "stage=" << StageIndex << ";column_ids=[" << JoinSeq(',', columns) << "];"; } public: diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp index 6f73d8cc5a7e..d2a44b42f2ec 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp @@ -72,12 +72,25 @@ TConclusion TFetchingScriptCursor::Execute(const std::shared_ptrGetSumDuration() > TDuration::MilliSeconds(10)) { - sbBranch << "{" << i->DebugString() << "};"; + sbBranch << "{" << i->DebugString() << "};"; + } + if (!sbBranch) { + return ""; + } + sb << "{branch:" << BranchName << ";steps:[" << sbBranch << "]}"; + return sb; +} + +TString TFetchingScript::ProfileDebugString() const { + TStringBuilder sb; + TStringBuilder sbBranch; + for (auto&& i : Steps) { + if (i->GetSumDuration() > TDuration::MilliSeconds(10)) { + sbBranch << "{" << i->DebugString(true) << "};"; } } if (!sbBranch) { @@ -120,10 +133,14 @@ void TFetchingScriptBuilder::AddAllocation(const std::set& entityIds, cons } } -TString IFetchingStep::DebugString() const { +TString IFetchingStep::DebugString(const bool stats) const { TStringBuilder sb; - sb << "name=" << Name << ";duration=" << GetSumDuration() << ";" - << "size=" << 1e-9 * GetSumSize() << ";details={" << DoDebugString() << "};"; + sb << "name=" << Name; + if (stats) { + sb << ";duration=" << GetSumDuration() << ";" + << "size=" << 1e-9 * GetSumSize(); + } + sb << ";details={" << DoDebugString() << "};"; return sb; } diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h index 21717f9f3c45..d9f6d52eca2b 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h @@ -110,7 +110,7 @@ class IFetchingStep: public TNonCopyable { , Signals(TFetchingStepsSignalsCollection::GetSignals(name)) { } - TString DebugString() const; + TString DebugString(const bool stats = false) const; }; class TFetchingScript { @@ -139,7 +139,8 @@ class TFetchingScript { AtomicCas(&StartInstant, TMonotonic::Now().MicroSeconds(), 0); } - TString DebugString(const bool onlyLongSteps = true) const; + TString DebugString() const; + TString ProfileDebugString() const; const std::shared_ptr& GetStep(const ui32 index) const { AFL_VERIFY(index < Steps.size()); @@ -170,9 +171,9 @@ class TFetchingScriptOwner: TNonCopyable { return Script; } - TString DebugString() const { + TString ProfileDebugString() const { if (Script) { - return TStringBuilder() << Script->DebugString() << Endl; + return TStringBuilder() << Script->ProfileDebugString() << Endl; } else { return TStringBuilder() << "NO_SCRIPT" << Endl; } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp index 50d188e82bdb..82362b1cf9e9 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp @@ -205,7 +205,7 @@ TString TSpecialReadContext::ProfileDebugString() const { for (ui32 i = 0; i < (1 << 6); ++i) { auto& script = CacheFetchingScripts[GetBit(i, 0)][GetBit(i, 1)][GetBit(i, 2)][GetBit(i, 3)][GetBit(i, 4)][GetBit(i, 5)]; if (script.HasScript()) { - sb << script.DebugString() << ";"; + sb << script.ProfileDebugString() << ";"; } } return sb; diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp index 10548370a005..3a41ccab53fd 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp @@ -130,7 +130,7 @@ TString TSpecialReadContext::ProfileDebugString() const { for (ui32 i = 0; i < (1 << 5); ++i) { auto& script = CacheFetchingScripts[GetBit(i, 0)][GetBit(i, 1)][GetBit(i, 2)][GetBit(i, 3)][GetBit(i, 4)]; if (script.HasScript()) { - sb << script.DebugString() << ";"; + sb << script.ProfileDebugString() << ";"; } } return sb; diff --git a/ydb/core/tx/columnshard/engines/ut/ut_script.cpp b/ydb/core/tx/columnshard/engines/ut/ut_script.cpp index 7e8bb304fe0f..3cedfa6e8c15 100644 --- a/ydb/core/tx/columnshard/engines/ut/ut_script.cpp +++ b/ydb/core/tx/columnshard/engines/ut/ut_script.cpp @@ -38,16 +38,16 @@ Y_UNIT_TEST_SUITE(TestScript) { acc.AddFetchingStep(std::vector({ 0 }), NCommon::EStageFeaturesIndexes::Merge); auto script = std::move(acc).Build(); - UNIT_ASSERT_STRINGS_EQUAL(script->DebugString(false), + UNIT_ASSERT_STRINGS_EQUAL(script->DebugString(), "{branch:UNDEFINED;steps_10Ms:[" - "{name=ALLOCATE_MEMORY::Filter;duration=0.000000s;size=0;details={stage=Filter;};};" - "{name=FETCHING_COLUMNS;duration=0.000000s;size=0;details={columns=0;};};" - "{name=ASSEMBLER;duration=0.000000s;size=0;details={columns=(column_ids=0;column_names=c0;);;};};" - "{name=DELETION;duration=0.000000s;size=0;details={};};" - "{name=ALLOCATE_MEMORY::Filter;duration=0.000000s;size=0;details={stage=Filter;};};" - "{name=ALLOCATE_MEMORY::Fetching;duration=0.000000s;size=0;details={stage=Fetching;};};" - "{name=FETCHING_COLUMNS;duration=0.000000s;size=0;details={columns=1,2;};};" - "{name=ASSEMBLER;duration=0.000000s;size=0;details={columns=(column_ids=1,2;column_names=c1,c2;);;};};" - "{name=DELETION;duration=0.000000s;size=0;details={};};]}"); + "{name=ALLOCATE_MEMORY::Filter;details={stage=Filter;column_ids=[Blob:0,Raw:0];};};" + "{name=FETCHING_COLUMNS;details={columns=0;};};" + "{name=ASSEMBLER;details={columns=(column_ids=0;column_names=c0;);;};};" + "{name=DELETION;details={};};" + "{name=ALLOCATE_MEMORY::Filter;details={stage=Filter;column_ids=[Blob:1];};};" + "{name=ALLOCATE_MEMORY::Fetching;details={stage=Fetching;column_ids=[Blob:2,Raw:1,Raw:2];};};" + "{name=FETCHING_COLUMNS;details={columns=1,2;};};" + "{name=ASSEMBLER;details={columns=(column_ids=1,2;column_names=c1,c2;);;};};" + "{name=DELETION;details={};};]}"); } } From 05dda985aae9a0325162e5a572f9a2e1d7d8e70c Mon Sep 17 00:00:00 2001 From: Semyon Yentsov Date: Thu, 6 Mar 2025 16:58:59 +0300 Subject: [PATCH 9/9] fix --- ydb/core/tx/columnshard/engines/ut/ut_script.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/tx/columnshard/engines/ut/ut_script.cpp b/ydb/core/tx/columnshard/engines/ut/ut_script.cpp index 3cedfa6e8c15..a50388e8bfb9 100644 --- a/ydb/core/tx/columnshard/engines/ut/ut_script.cpp +++ b/ydb/core/tx/columnshard/engines/ut/ut_script.cpp @@ -39,7 +39,7 @@ Y_UNIT_TEST_SUITE(TestScript) { auto script = std::move(acc).Build(); UNIT_ASSERT_STRINGS_EQUAL(script->DebugString(), - "{branch:UNDEFINED;steps_10Ms:[" + "{branch:UNDEFINED;steps:[" "{name=ALLOCATE_MEMORY::Filter;details={stage=Filter;column_ids=[Blob:0,Raw:0];};};" "{name=FETCHING_COLUMNS;details={columns=0;};};" "{name=ASSEMBLER;details={columns=(column_ids=0;column_names=c0;);;};};"