diff --git a/ydb/core/tx/columnshard/counters/portion_index.cpp b/ydb/core/tx/columnshard/counters/portion_index.cpp index 0a148c302c20..5e3c8ff7ce28 100644 --- a/ydb/core/tx/columnshard/counters/portion_index.cpp +++ b/ydb/core/tx/columnshard/counters/portion_index.cpp @@ -25,7 +25,7 @@ void TPortionIndexStats::RemovePortion(const NOlap::TPortionInfo& portion) { { auto findClass = TotalStats.find(portionClass); - AFL_VERIFY(!findClass.IsEnd())("path_id", portion.GetPathId()); + AFL_VERIFY(findClass != TotalStats.end())("path_id", portion.GetPathId()); findClass->second.RemovePortion(portion); if (findClass->second.IsEmpty()) { TotalStats.erase(findClass); @@ -34,9 +34,9 @@ void TPortionIndexStats::RemovePortion(const NOlap::TPortionInfo& portion) { { auto findPathId = StatsByPathId.find(portion.GetPathId()); - AFL_VERIFY(!findPathId.IsEnd())("path_id", portion.GetPathId()); + AFL_VERIFY(findPathId != StatsByPathId.end())("path_id", portion.GetPathId()); auto findClass = findPathId->second.find(portionClass); - AFL_VERIFY(!findClass.IsEnd())("path_id", portion.GetPathId()); + AFL_VERIFY(findClass != findPathId->second.end())("path_id", portion.GetPathId()); findClass->second.RemovePortion(portion); if (findClass->second.IsEmpty()) { findPathId->second.erase(findClass); diff --git a/ydb/core/tx/columnshard/engines/reader/common/description.h b/ydb/core/tx/columnshard/engines/reader/common/description.h index 9be71450515e..6c9798e0994b 100644 --- a/ydb/core/tx/columnshard/engines/reader/common/description.h +++ b/ydb/core/tx/columnshard/engines/reader/common/description.h @@ -30,8 +30,7 @@ struct TReadDescription { // List of columns std::vector ColumnIds; - const std::shared_ptr& GetScanCursor() const { - AFL_VERIFY(ScanCursor); + const std::shared_ptr& GetScanCursorOptional() const { return ScanCursor; } diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp index dfb8fc36f341..5ce5b2ebc9e8 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp @@ -42,6 +42,13 @@ TConclusionStatus TReadMetadata::Init( return TConclusionStatus::Success(); } +TReadMetadata::TReadMetadata(const std::shared_ptr& schemaIndex, const TReadDescription& read) + : TBase(schemaIndex, read.PKRangesFilter->IsReverse() ? TReadMetadataBase::ESorting::DESC : TReadMetadataBase::ESorting::ASC, + read.GetProgram(), schemaIndex->GetSchemaVerified(read.GetSnapshot()), read.GetSnapshot(), read.GetScanCursorOptional()) + , PathId(read.PathId) + , ReadStats(std::make_shared()) { +} + std::set TReadMetadata::GetEarlyFilterColumnIds() const { auto& indexInfo = ResultIndexSchema->GetIndexInfo(); const auto& ids = GetProgram().GetEarlyFilterColumns(); diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.h index b7d87c2b3812..c4ff8ac695ff 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.h +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.h @@ -117,12 +117,7 @@ class TReadMetadata: public TReadMetadataBase { NYql::NDqProto::EDqStatsMode StatsMode = NYql::NDqProto::EDqStatsMode::DQ_STATS_MODE_NONE; std::shared_ptr ReadStats; - TReadMetadata(const ui64 pathId, const std::shared_ptr info, const TSnapshot& snapshot, const ESorting sorting, - const TProgramContainer& ssaProgram, const std::shared_ptr& scanCursor) - : TBase(info, sorting, ssaProgram, info->GetSchemaVerified(snapshot), snapshot, scanCursor) - , PathId(pathId) - , ReadStats(std::make_shared()) { - } + TReadMetadata(const std::shared_ptr& schemaIndex, const TReadDescription& read); virtual std::vector GetKeyYqlSchema() const override { return GetResultSchema()->GetIndexInfo().GetPrimaryKeyColumns(); diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/columns_set.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/columns_set.h index 45cbf7c2c951..a21652522430 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/columns_set.h +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/columns_set.h @@ -85,6 +85,15 @@ class TColumnsSetIds { } return result; } + + TColumnsSetIds& operator+=(const TColumnsSetIds& external) { + return (*this = *this + external); + } + + TColumnsSetIds& operator-=(const TColumnsSetIds& external) { + return (*this = *this - external); + } + bool IsEmpty() const { return ColumnIds.empty(); } diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h index fa6f44309f18..0123678d75c9 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h @@ -43,7 +43,13 @@ class TAllocateMemoryStep: public IFetchingStep { virtual TConclusion DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const override; virtual ui64 GetProcessingDataSize(const std::shared_ptr& source) const override; virtual TString DoDebugString() const override { - return TStringBuilder() << "stage=" << StageIndex << ";"; + std::vector columns; + for (const auto& pack : Packs) { + for (const ui32 columnId : pack.GetColumns().GetColumnIds()) { + columns.emplace_back(TStringBuilder() << pack.GetMemType() << ':' << columnId); + } + } + return TStringBuilder() << "stage=" << StageIndex << ";column_ids=[" << JoinSeq(',', columns) << "];"; } public: @@ -126,7 +132,7 @@ class TOptionalAssemblerStep: public IFetchingStep { class TColumnBlobsFetchingStep: public IFetchingStep { private: using TBase = IFetchingStep; - TColumnsSetIds Columns; + YDB_READONLY_DEF(TColumnsSetIds, Columns); protected: virtual TConclusion DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const override; diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp index 937f049a43b9..d2a44b42f2ec 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp @@ -73,11 +73,24 @@ TConclusion TFetchingScriptCursor::Execute(const std::shared_ptrDebugString() << "};"; + } + if (!sbBranch) { + return ""; + } + sb << "{branch:" << BranchName << ";steps:[" << sbBranch << "]}"; + return sb; +} + +TString TFetchingScript::ProfileDebugString() const { TStringBuilder sb; TStringBuilder sbBranch; for (auto&& i : Steps) { if (i->GetSumDuration() > TDuration::MilliSeconds(10)) { - sbBranch << "{" << i->DebugString() << "};"; + sbBranch << "{" << i->DebugString(true) << "};"; } } if (!sbBranch) { @@ -92,12 +105,9 @@ TString TFetchingScript::DebugString() const { return sb; } -TFetchingScript::TFetchingScript(const TSpecialReadContext& /*context*/) { -} - -void TFetchingScript::Allocation(const std::set& entityIds, const EStageFeaturesIndexes stage, const EMemType mType) { +void TFetchingScriptBuilder::AddAllocation(const std::set& entityIds, const EStageFeaturesIndexes stage, const EMemType mType) { if (Steps.size() == 0) { - AddStep(entityIds, mType, stage); + AddStep(std::make_shared(entityIds, mType, stage)); } else { std::optional addIndex; for (i32 i = Steps.size() - 1; i >= 0; --i) { @@ -123,49 +133,62 @@ void TFetchingScript::Allocation(const std::set& entityIds, const EStageFe } } -TString IFetchingStep::DebugString() const { +TString IFetchingStep::DebugString(const bool stats) const { TStringBuilder sb; - sb << "name=" << Name << ";duration=" << GetSumDuration() << ";" - << "size=" << 1e-9 * GetSumSize() << ";details={" << DoDebugString() << "};"; + sb << "name=" << Name; + if (stats) { + sb << ";duration=" << GetSumDuration() << ";" + << "size=" << 1e-9 * GetSumSize(); + } + sb << ";details={" << DoDebugString() << "};"; return sb; } -bool TColumnsAccumulator::AddFetchingStep(TFetchingScript& script, const TColumnsSetIds& columns, const EStageFeaturesIndexes stage) { - auto actualColumns = GetNotFetchedAlready(columns); - FetchingReadyColumns = FetchingReadyColumns + (TColumnsSetIds)columns; - if (!actualColumns.IsEmpty()) { - script.Allocation(columns.GetColumnIds(), stage, EMemType::Blob); - script.AddStep(std::make_shared(actualColumns)); - return true; +TFetchingScriptBuilder::TFetchingScriptBuilder(const TSpecialReadContext& context) + : TFetchingScriptBuilder(context.GetReadMetadata()->GetResultSchema(), context.GetMergeColumns()) { +} + +void TFetchingScriptBuilder::AddFetchingStep(const TColumnsSetIds& columns, const EStageFeaturesIndexes stage) { + auto actualColumns = columns - AddedFetchingColumns; + AddedFetchingColumns += columns; + if (actualColumns.IsEmpty()) { + return; + } + if (Steps.size() && std::dynamic_pointer_cast(Steps.back())) { + TColumnsSetIds fetchingColumns = actualColumns + std::dynamic_pointer_cast(Steps.back())->GetColumns(); + Steps.pop_back(); + AddAllocation(actualColumns.GetColumnIds(), stage, EMemType::Blob); + AddStep(std::make_shared(fetchingColumns)); + } else { + AddAllocation(actualColumns.GetColumnIds(), stage, EMemType::Blob); + AddStep(std::make_shared(actualColumns)); } - return false; } -bool TColumnsAccumulator::AddAssembleStep( - TFetchingScript& script, const TColumnsSetIds& columns, const TString& purposeId, const EStageFeaturesIndexes stage, const bool sequential) { - auto actualColumns = columns - AssemblerReadyColumns; - AssemblerReadyColumns = AssemblerReadyColumns + columns; +void TFetchingScriptBuilder::AddAssembleStep( + const TColumnsSetIds& columns, const TString& purposeId, const EStageFeaturesIndexes stage, const bool sequential) { + auto actualColumns = columns - AddedAssembleColumns; + AddedAssembleColumns += columns; if (actualColumns.IsEmpty()) { - return false; + return; } auto actualSet = std::make_shared(actualColumns.GetColumnIds(), FullSchema); if (sequential) { const auto notSequentialColumnIds = GuaranteeNotOptional->Intersect(*actualSet); if (notSequentialColumnIds.size()) { - script.Allocation(notSequentialColumnIds, stage, EMemType::Raw); + AddAllocation(notSequentialColumnIds, stage, EMemType::Raw); std::shared_ptr cross = actualSet->BuildSamePtr(notSequentialColumnIds); - script.AddStep(cross, purposeId); + AddStep(std::make_shared(cross, purposeId)); *actualSet = *actualSet - *cross; } if (!actualSet->IsEmpty()) { - script.Allocation(notSequentialColumnIds, stage, EMemType::RawSequential); - script.AddStep(actualSet, purposeId); + AddAllocation(notSequentialColumnIds, stage, EMemType::RawSequential); + AddStep(std::make_shared(actualSet, purposeId)); } } else { - script.Allocation(actualColumns.GetColumnIds(), stage, EMemType::Raw); - script.AddStep(actualSet, purposeId); + AddAllocation(actualColumns.GetColumnIds(), stage, EMemType::Raw); + AddStep(std::make_shared(actualSet, purposeId)); } - return true; } TConclusion TProgramStepPrepare::DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& step) const { diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h index 0656a2b9b858..d9f6d52eca2b 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h @@ -110,20 +110,21 @@ class IFetchingStep: public TNonCopyable { , Signals(TFetchingStepsSignalsCollection::GetSignals(name)) { } - TString DebugString() const; + TString DebugString(const bool stats = false) const; }; class TFetchingScript { private: - YDB_ACCESSOR(TString, BranchName, "UNDEFINED"); + YDB_READONLY_DEF(TString, BranchName); std::vector> Steps; TAtomic StartInstant; TAtomic FinishInstant; public: - TFetchingScript(const TSpecialReadContext& context); - - void Allocation(const std::set& entityIds, const EStageFeaturesIndexes stage, const EMemType mType); + TFetchingScript(const TString& branchName, std::vector>&& steps) + : BranchName(branchName) + , Steps(std::move(steps)) { + } void AddStepDataSize(const ui32 index, const ui64 size) { GetStep(index)->AddDataSize(size); @@ -139,32 +140,13 @@ class TFetchingScript { } TString DebugString() const; + TString ProfileDebugString() const; const std::shared_ptr& GetStep(const ui32 index) const { AFL_VERIFY(index < Steps.size()); return Steps[index]; } - template - std::shared_ptr AddStep(Args... args) { - auto result = std::make_shared(args...); - Steps.emplace_back(result); - return result; - } - - template - std::shared_ptr InsertStep(const ui32 index, Args... args) { - AFL_VERIFY(index <= Steps.size())("index", index)("size", Steps.size()); - auto result = std::make_shared(args...); - Steps.insert(Steps.begin() + index, result); - return result; - } - - void AddStep(const std::shared_ptr& step) { - AFL_VERIFY(step); - Steps.emplace_back(step); - } - bool IsFinished(const ui32 currentStepIdx) const { AFL_VERIFY(currentStepIdx <= Steps.size()); return currentStepIdx == Steps.size(); @@ -189,9 +171,9 @@ class TFetchingScriptOwner: TNonCopyable { return Script; } - TString DebugString() const { + TString ProfileDebugString() const { if (Script) { - return TStringBuilder() << Script->DebugString() << Endl; + return TStringBuilder() << Script->ProfileDebugString() << Endl; } else { return TStringBuilder() << "NO_SCRIPT" << Endl; } @@ -231,26 +213,50 @@ class TFetchingScriptOwner: TNonCopyable { } }; -class TColumnsAccumulator { +class TFetchingScriptBuilder { private: - TColumnsSetIds FetchingReadyColumns; - TColumnsSetIds AssemblerReadyColumns; - ISnapshotSchema::TPtr FullSchema; std::shared_ptr GuaranteeNotOptional; + ISnapshotSchema::TPtr FullSchema; + + YDB_ACCESSOR(TString, BranchName, "UNDEFINED"); + std::vector> Steps; + YDB_READONLY_DEF(TColumnsSetIds, AddedFetchingColumns); + YDB_READONLY_DEF(TColumnsSetIds, AddedAssembleColumns); + + TFetchingScriptBuilder(const ISnapshotSchema::TPtr& schema, const std::shared_ptr& guaranteeNotOptional) + : GuaranteeNotOptional(guaranteeNotOptional) + , FullSchema(schema) { + } + +private: + void AddAllocation(const std::set& entityIds, const EStageFeaturesIndexes stage, const EMemType mType); + + template + std::shared_ptr InsertStep(const ui32 index, Args... args) { + AFL_VERIFY(index <= Steps.size())("index", index)("size", Steps.size()); + auto result = std::make_shared(args...); + Steps.insert(Steps.begin() + index, result); + return result; + } public: - TColumnsAccumulator(const std::shared_ptr& guaranteeNotOptional, const ISnapshotSchema::TPtr& fullSchema) - : FullSchema(fullSchema) - , GuaranteeNotOptional(guaranteeNotOptional) { + TFetchingScriptBuilder(const TSpecialReadContext& context); + + std::shared_ptr Build()&& { + return std::make_shared(BranchName, std::move(Steps)); } - TColumnsSetIds GetNotFetchedAlready(const TColumnsSetIds& columns) const { - return columns - FetchingReadyColumns; + void AddStep(const std::shared_ptr& step) { + AFL_VERIFY(step); + Steps.emplace_back(step); } - bool AddFetchingStep(TFetchingScript& script, const TColumnsSetIds& columns, const EStageFeaturesIndexes stage); - bool AddAssembleStep(TFetchingScript& script, const TColumnsSetIds& columns, const TString& purposeId, const EStageFeaturesIndexes stage, - const bool sequential); + void AddFetchingStep(const TColumnsSetIds& columns, const EStageFeaturesIndexes stage); + void AddAssembleStep(const TColumnsSetIds& columns, const TString& purposeId, const EStageFeaturesIndexes stage, const bool sequential); + + static TFetchingScriptBuilder MakeForTests(ISnapshotSchema::TPtr schema, std::shared_ptr guaranteeNotOptional = nullptr) { + return TFetchingScriptBuilder(schema, guaranteeNotOptional ? guaranteeNotOptional : std::make_shared()); + } }; class TFetchingScriptCursor { diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/ya.make b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/ya.make index 6b4fdd179183..3f2f3295a9e0 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/ya.make +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/ya.make @@ -15,6 +15,7 @@ SRCS( PEERDIR( ydb/core/tx/columnshard/engines/scheme + yql/essentials/minikql ) GENERATE_ENUM_SERIALIZATION(columns_set.h) diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/constructor.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/constructor.cpp index ef01545efc96..d3654d881f57 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/constructor.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/constructor.cpp @@ -35,8 +35,7 @@ NKikimr::TConclusion> TIndexScannerConstructo TDataStorageAccessor dataAccessor(insertTable, index); AFL_VERIFY(read.PathId); - auto readMetadata = std::make_shared(read.PathId, index->CopyVersionedIndexPtr(), read.GetSnapshot(), - IsReverse ? TReadMetadataBase::ESorting::DESC : TReadMetadataBase::ESorting::ASC, read.GetProgram(), nullptr); + auto readMetadata = std::make_shared(index->CopyVersionedIndexPtr(), read); auto initResult = readMetadata->Init(self, read, dataAccessor); if (!initResult) { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp index f2001c538487..82362b1cf9e9 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp @@ -28,12 +28,13 @@ std::shared_ptr TSpecialReadContext::DoGetColumnsFetchingPlan(c auto source = std::static_pointer_cast(sourceExt); if (source->NeedAccessorsFetching()) { if (!AskAccumulatorsScript) { - AskAccumulatorsScript = std::make_shared(*this); + NCommon::TFetchingScriptBuilder acc(*this); if (ui64 size = source->PredictAccessorsMemory()) { - AskAccumulatorsScript->AddStep(size, EStageFeaturesIndexes::Accessors); + acc.AddStep(std::make_shared(size, EStageFeaturesIndexes::Accessors)); } - AskAccumulatorsScript->AddStep(); - AskAccumulatorsScript->AddStep(*GetFFColumns()); + acc.AddStep(std::make_shared()); + acc.AddStep(std::make_shared(*GetFFColumns())); + AskAccumulatorsScript = std::move(acc).Build(); } return AskAccumulatorsScript; } @@ -72,126 +73,123 @@ std::shared_ptr TSpecialReadContext::DoGetColumnsFetchingPlan(c if (result.HasScript()) { return result.GetScriptVerified(); } else { - std::shared_ptr result = std::make_shared(*this); - result->SetBranchName("FAKE"); - result->AddStep(); - return result; + NCommon::TFetchingScriptBuilder acc(*this); + acc.SetBranchName("FAKE"); + acc.AddStep(std::make_shared()); + return std::move(acc).Build(); } } } std::shared_ptr TSpecialReadContext::BuildColumnsFetchingPlan(const bool needSnapshots, const bool exclusiveSource, const bool partialUsageByPredicateExt, const bool useIndexes, const bool needFilterSharding, const bool needFilterDeletion) const { - std::shared_ptr result = std::make_shared(*this); const bool partialUsageByPredicate = partialUsageByPredicateExt && GetPredicateColumns()->GetColumnsCount(); - NCommon::TColumnsAccumulator acc(GetMergeColumns(), GetReadMetadata()->GetResultSchema()); + NCommon::TFetchingScriptBuilder acc(*this); if (!!IndexChecker && useIndexes && exclusiveSource) { - result->AddStep(std::make_shared(std::make_shared(IndexChecker->GetIndexIds()))); - result->AddStep(std::make_shared(IndexChecker)); + acc.AddStep(std::make_shared(std::make_shared(IndexChecker->GetIndexIds()))); + acc.AddStep(std::make_shared(IndexChecker)); } bool hasFilterSharding = false; if (needFilterSharding && !GetShardingColumns()->IsEmpty()) { hasFilterSharding = true; - TColumnsSetIds columnsFetch = *GetShardingColumns(); + acc.AddFetchingStep(*GetShardingColumns(), EStageFeaturesIndexes::Filter); if (!exclusiveSource) { - columnsFetch = columnsFetch + *GetPKColumns() + *GetSpecColumns(); + acc.AddFetchingStep(*GetPKColumns(), EStageFeaturesIndexes::Filter); + acc.AddFetchingStep(*GetSpecColumns(), EStageFeaturesIndexes::Filter); } - acc.AddFetchingStep(*result, columnsFetch, EStageFeaturesIndexes::Filter); - acc.AddAssembleStep(*result, columnsFetch, "SPEC_SHARDING", EStageFeaturesIndexes::Filter, false); - result->AddStep(std::make_shared()); + acc.AddAssembleStep(acc.GetAddedFetchingColumns(), "SPEC_SHARDING", EStageFeaturesIndexes::Filter, false); + acc.AddStep(std::make_shared()); } if (!GetEFColumns()->GetColumnsCount() && !partialUsageByPredicate) { - result->SetBranchName("simple"); - TColumnsSetIds columnsFetch = *GetFFColumns(); + acc.SetBranchName("simple"); + acc.AddFetchingStep(*GetFFColumns(), EStageFeaturesIndexes::Fetching); if (needFilterDeletion) { - columnsFetch = columnsFetch + *GetDeletionColumns(); + acc.AddFetchingStep(*GetDeletionColumns(), EStageFeaturesIndexes::Fetching); } if (needSnapshots) { - columnsFetch = columnsFetch + *GetSpecColumns(); + acc.AddFetchingStep(*GetSpecColumns(), EStageFeaturesIndexes::Fetching); } if (!exclusiveSource) { - columnsFetch = columnsFetch + *GetMergeColumns(); + acc.AddFetchingStep(*GetMergeColumns(), EStageFeaturesIndexes::Fetching); } else { - if (columnsFetch.GetColumnsCount() == 1 && GetSpecColumns()->Contains(columnsFetch) && !hasFilterSharding) { + if (acc.GetAddedFetchingColumns().GetColumnsCount() == 1 && GetSpecColumns()->Contains(acc.GetAddedFetchingColumns()) && !hasFilterSharding) { return nullptr; } } - if (columnsFetch.GetColumnsCount() || hasFilterSharding || needFilterDeletion) { - acc.AddFetchingStep(*result, columnsFetch, EStageFeaturesIndexes::Fetching); + if (acc.GetAddedFetchingColumns().GetColumnsCount() || hasFilterSharding || needFilterDeletion) { if (needSnapshots) { - acc.AddAssembleStep(*result, *GetSpecColumns(), "SPEC", EStageFeaturesIndexes::Fetching, false); + acc.AddAssembleStep(*GetSpecColumns(), "SPEC", EStageFeaturesIndexes::Fetching, false); } if (!exclusiveSource) { - acc.AddAssembleStep(*result, *GetMergeColumns(), "LAST_PK", EStageFeaturesIndexes::Fetching, false); + acc.AddAssembleStep(*GetMergeColumns(), "LAST_PK", EStageFeaturesIndexes::Fetching, false); } if (needSnapshots) { - result->AddStep(std::make_shared()); + acc.AddStep(std::make_shared()); } if (needFilterDeletion) { - acc.AddAssembleStep(*result, *GetDeletionColumns(), "SPEC_DELETION", EStageFeaturesIndexes::Fetching, false); - result->AddStep(std::make_shared()); + acc.AddAssembleStep(*GetDeletionColumns(), "SPEC_DELETION", EStageFeaturesIndexes::Fetching, false); + acc.AddStep(std::make_shared()); } - acc.AddAssembleStep(*result, columnsFetch, "LAST", EStageFeaturesIndexes::Fetching, !exclusiveSource); + acc.AddAssembleStep(acc.GetAddedFetchingColumns().GetColumnIds(), "LAST", EStageFeaturesIndexes::Fetching, !exclusiveSource); } else { return nullptr; } } else if (exclusiveSource) { - result->SetBranchName("exclusive"); - TColumnsSet columnsFetch = *GetEFColumns(); + acc.SetBranchName("exclusive"); + acc.AddFetchingStep(*GetEFColumns(), EStageFeaturesIndexes::Filter); if (needFilterDeletion) { - columnsFetch = columnsFetch + *GetDeletionColumns(); + acc.AddFetchingStep(*GetDeletionColumns(), EStageFeaturesIndexes::Filter); } if (needSnapshots || GetFFColumns()->Cross(*GetSpecColumns())) { - columnsFetch = columnsFetch + *GetSpecColumns(); + acc.AddFetchingStep(*GetSpecColumns(), EStageFeaturesIndexes::Filter); } if (partialUsageByPredicate) { - columnsFetch = columnsFetch + *GetPredicateColumns(); + acc.AddFetchingStep(*GetPredicateColumns(), EStageFeaturesIndexes::Filter); } - AFL_VERIFY(columnsFetch.GetColumnsCount()); - acc.AddFetchingStep(*result, columnsFetch, EStageFeaturesIndexes::Filter); + AFL_VERIFY(acc.GetAddedFetchingColumns().GetColumnsCount()); if (needFilterDeletion) { - acc.AddAssembleStep(*result, *GetDeletionColumns(), "SPEC_DELETION", EStageFeaturesIndexes::Filter, false); - result->AddStep(std::make_shared()); + acc.AddAssembleStep(*GetDeletionColumns(), "SPEC_DELETION", EStageFeaturesIndexes::Filter, false); + acc.AddStep(std::make_shared()); } if (partialUsageByPredicate) { - acc.AddAssembleStep(*result, *GetPredicateColumns(), "PREDICATE", EStageFeaturesIndexes::Filter, false); - result->AddStep(std::make_shared()); + acc.AddAssembleStep(*GetPredicateColumns(), "PREDICATE", EStageFeaturesIndexes::Filter, false); + acc.AddStep(std::make_shared()); } if (needSnapshots || GetFFColumns()->Cross(*GetSpecColumns())) { - acc.AddAssembleStep(*result, *GetSpecColumns(), "SPEC", EStageFeaturesIndexes::Filter, false); - result->AddStep(std::make_shared()); + acc.AddAssembleStep(*GetSpecColumns(), "SPEC", EStageFeaturesIndexes::Filter, false); + acc.AddStep(std::make_shared()); } - acc.AddFetchingStep(*result, *GetFFColumns(), EStageFeaturesIndexes::Fetching); - acc.AddAssembleStep(*result, *GetFFColumns(), "LAST", EStageFeaturesIndexes::Fetching, !exclusiveSource); + acc.AddFetchingStep(*GetFFColumns(), EStageFeaturesIndexes::Fetching); + acc.AddAssembleStep(*GetFFColumns(), "LAST", EStageFeaturesIndexes::Fetching, !exclusiveSource); } else { - result->SetBranchName("merge"); - TColumnsSet columnsFetch = *GetMergeColumns() + *GetEFColumns(); + acc.SetBranchName("merge"); + acc.AddFetchingStep(*GetMergeColumns(), EStageFeaturesIndexes::Filter); + acc.AddFetchingStep(*GetEFColumns(), EStageFeaturesIndexes::Filter); if (needFilterDeletion) { - columnsFetch = columnsFetch + *GetDeletionColumns(); + acc.AddFetchingStep(*GetDeletionColumns(), EStageFeaturesIndexes::Filter); } - AFL_VERIFY(columnsFetch.GetColumnsCount()); - acc.AddFetchingStep(*result, columnsFetch, EStageFeaturesIndexes::Filter); + AFL_VERIFY(acc.GetAddedFetchingColumns().GetColumnsCount()); - acc.AddAssembleStep(*result, *GetSpecColumns(), "SPEC", EStageFeaturesIndexes::Filter, false); - acc.AddAssembleStep(*result, *GetPKColumns(), "PK", EStageFeaturesIndexes::Filter, false); + acc.AddAssembleStep(*GetSpecColumns(), "SPEC", EStageFeaturesIndexes::Filter, false); + acc.AddAssembleStep(*GetPKColumns(), "PK", EStageFeaturesIndexes::Filter, false); if (needSnapshots) { - result->AddStep(std::make_shared()); + acc.AddStep(std::make_shared()); } if (needFilterDeletion) { - acc.AddAssembleStep(*result, *GetDeletionColumns(), "SPEC_DELETION", EStageFeaturesIndexes::Filter, false); - result->AddStep(std::make_shared()); + acc.AddAssembleStep(*GetDeletionColumns(), "SPEC_DELETION", EStageFeaturesIndexes::Filter, false); + acc.AddStep(std::make_shared()); } if (partialUsageByPredicate) { - result->AddStep(std::make_shared()); + acc.AddStep(std::make_shared()); } - acc.AddFetchingStep(*result, *GetFFColumns(), EStageFeaturesIndexes::Fetching); - acc.AddAssembleStep(*result, *GetFFColumns(), "LAST", EStageFeaturesIndexes::Fetching, !exclusiveSource); + acc.AddFetchingStep(*GetFFColumns(), EStageFeaturesIndexes::Fetching); + acc.AddAssembleStep(*GetFFColumns(), "LAST", EStageFeaturesIndexes::Fetching, !exclusiveSource); } - result->AddStep(); - return result; + acc.AddStep(std::make_shared()); + return std::move(acc).Build(); } TSpecialReadContext::TSpecialReadContext(const std::shared_ptr& commonContext) @@ -207,7 +205,7 @@ TString TSpecialReadContext::ProfileDebugString() const { for (ui32 i = 0; i < (1 << 6); ++i) { auto& script = CacheFetchingScripts[GetBit(i, 0)][GetBit(i, 1)][GetBit(i, 2)][GetBit(i, 3)][GetBit(i, 4)][GetBit(i, 5)]; if (script.HasScript()) { - sb << script.DebugString() << ";"; + sb << script.ProfileDebugString() << ";"; } } return sb; diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/constructor.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/constructor.cpp index b7034b00d179..07b1a7653b12 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/constructor.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/constructor.cpp @@ -34,8 +34,7 @@ NKikimr::TConclusion> TIndexScannerConstructo TDataStorageAccessor dataAccessor(insertTable, index); AFL_VERIFY(read.PathId); - auto readMetadata = std::make_shared(read.PathId, index->CopyVersionedIndexPtr(), read.GetSnapshot(), - IsReverse ? TReadMetadataBase::ESorting::DESC : TReadMetadataBase::ESorting::ASC, read.GetProgram(), read.GetScanCursor()); + auto readMetadata = std::make_shared(index->CopyVersionedIndexPtr(), read); auto initResult = readMetadata->Init(self, read, dataAccessor); if (!initResult) { diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp index 3e62f0bd61f4..3a41ccab53fd 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp @@ -13,11 +13,12 @@ std::shared_ptr TSpecialReadContext::DoGetColumnsFetchingPlan(c GetFFColumns()->GetColumnIds().contains(NOlap::NPortion::TSpecialColumns::SPEC_COL_PLAN_STEP_INDEX); if (!dontNeedColumns && !source->GetStageData().HasPortionAccessor()) { if (!AskAccumulatorsScript) { - AskAccumulatorsScript = std::make_shared(*this); - AskAccumulatorsScript->AddStep( - source->PredictAccessorsSize(GetFFColumns()->GetColumnIds()), EStageFeaturesIndexes::Accessors); - AskAccumulatorsScript->AddStep(); - AskAccumulatorsScript->AddStep(*GetFFColumns()); + NCommon::TFetchingScriptBuilder acc(*this); + acc.AddStep(std::make_shared + (source->PredictAccessorsSize(GetFFColumns()->GetColumnIds()), EStageFeaturesIndexes::Accessors)); + acc.AddStep(std::make_shared()); + acc.AddStep(std::make_shared(*GetFFColumns())); + AskAccumulatorsScript = std::move(acc).Build(); } return AskAccumulatorsScript; } @@ -57,68 +58,63 @@ std::shared_ptr TSpecialReadContext::DoGetColumnsFetchingPlan(c std::shared_ptr TSpecialReadContext::BuildColumnsFetchingPlan(const bool needSnapshots, const bool partialUsageByPredicateExt, const bool useIndexes, const bool needFilterSharding, const bool needFilterDeletion) const { - std::shared_ptr result = std::make_shared(*this); const bool partialUsageByPredicate = partialUsageByPredicateExt && GetPredicateColumns()->GetColumnsCount(); - NCommon::TColumnsAccumulator acc(GetMergeColumns(), GetReadMetadata()->GetResultSchema()); + NCommon::TFetchingScriptBuilder acc(*this); if (!!IndexChecker && useIndexes) { - result->AddStep(std::make_shared(std::make_shared(IndexChecker->GetIndexIds()))); - result->AddStep(std::make_shared(IndexChecker)); + acc.AddStep(std::make_shared(std::make_shared(IndexChecker->GetIndexIds()))); + acc.AddStep(std::make_shared(IndexChecker)); } if (needFilterSharding && !GetShardingColumns()->IsEmpty()) { const TColumnsSetIds columnsFetch = *GetShardingColumns(); - acc.AddFetchingStep(*result, columnsFetch, EStageFeaturesIndexes::Filter); - acc.AddAssembleStep(*result, columnsFetch, "SPEC_SHARDING", EStageFeaturesIndexes::Filter, false); - result->AddStep(std::make_shared()); + acc.AddFetchingStep(columnsFetch, EStageFeaturesIndexes::Filter); + acc.AddAssembleStep(columnsFetch, "SPEC_SHARDING", EStageFeaturesIndexes::Filter, false); + acc.AddStep(std::make_shared()); } { - result->SetBranchName("exclusive"); - TColumnsSet columnsFetch; + acc.SetBranchName("exclusive"); + acc.AddFetchingStep(*GetEFColumns(), EStageFeaturesIndexes::Filter); if (needFilterDeletion) { - columnsFetch = columnsFetch + *GetDeletionColumns(); + acc.AddFetchingStep(*GetDeletionColumns(), EStageFeaturesIndexes::Filter); } if (needSnapshots || GetFFColumns()->Cross(*GetSpecColumns())) { - columnsFetch = columnsFetch + *GetSpecColumns(); + acc.AddFetchingStep(*GetSpecColumns(), EStageFeaturesIndexes::Filter); } if (partialUsageByPredicate) { - columnsFetch = columnsFetch + *GetPredicateColumns(); - } - - if (columnsFetch.GetColumnsCount()) { - acc.AddFetchingStep(*result, columnsFetch, EStageFeaturesIndexes::Filter); + acc.AddFetchingStep(*GetPredicateColumns(), EStageFeaturesIndexes::Filter); } if (needFilterDeletion) { - acc.AddAssembleStep(*result, *GetDeletionColumns(), "SPEC_DELETION", EStageFeaturesIndexes::Filter, false); - result->AddStep(std::make_shared()); + acc.AddAssembleStep(*GetDeletionColumns(), "SPEC_DELETION", EStageFeaturesIndexes::Filter, false); + acc.AddStep(std::make_shared()); } if (partialUsageByPredicate) { - acc.AddAssembleStep(*result, *GetPredicateColumns(), "PREDICATE", EStageFeaturesIndexes::Filter, false); - result->AddStep(std::make_shared()); + acc.AddAssembleStep(*GetPredicateColumns(), "PREDICATE", EStageFeaturesIndexes::Filter, false); + acc.AddStep(std::make_shared()); } if (needSnapshots || GetFFColumns()->Cross(*GetSpecColumns())) { - acc.AddAssembleStep(*result, *GetSpecColumns(), "SPEC", EStageFeaturesIndexes::Filter, false); - result->AddStep(std::make_shared()); + acc.AddAssembleStep(*GetSpecColumns(), "SPEC", EStageFeaturesIndexes::Filter, false); + acc.AddStep(std::make_shared()); } const auto& chainProgram = GetReadMetadata()->GetProgram().GetChainVerified(); for (ui32 stepIdx = 0; stepIdx < chainProgram->GetProcessors().size(); ++stepIdx) { auto& step = chainProgram->GetProcessors()[stepIdx]; if (!chainProgram->GetLastOriginalDataFilter() && GetReadMetadata()->HasLimit() && step->GetProcessorType() == NArrow::NSSA::EProcessorType::Projection) { - result->AddStep(std::make_shared(GetReadMetadata()->GetLimitRobust(), GetReadMetadata()->IsDescSorted())); + acc.AddStep(std::make_shared(GetReadMetadata()->GetLimitRobust(), GetReadMetadata()->IsDescSorted())); } - result->AddStep(std::make_shared(step)); - result->AddStep(std::make_shared(step)); - result->AddStep(std::make_shared(step)); + acc.AddStep(std::make_shared(step)); + acc.AddStep(std::make_shared(step)); + acc.AddStep(std::make_shared(step)); if (step->GetProcessorType() == NArrow::NSSA::EProcessorType::Filter && GetReadMetadata()->HasLimit() && chainProgram->GetLastOriginalDataFilter() == stepIdx) { - result->AddStep(std::make_shared(GetReadMetadata()->GetLimitRobust(), GetReadMetadata()->IsDescSorted())); + acc.AddStep(std::make_shared(GetReadMetadata()->GetLimitRobust(), GetReadMetadata()->IsDescSorted())); } } } - result->AddStep(); - result->AddStep(); - return result; + acc.AddStep(std::make_shared()); + acc.AddStep(std::make_shared()); + return std::move(acc).Build(); } TSpecialReadContext::TSpecialReadContext(const std::shared_ptr& commonContext) @@ -134,7 +130,7 @@ TString TSpecialReadContext::ProfileDebugString() const { for (ui32 i = 0; i < (1 << 5); ++i) { auto& script = CacheFetchingScripts[GetBit(i, 0)][GetBit(i, 1)][GetBit(i, 2)][GetBit(i, 3)][GetBit(i, 4)]; if (script.HasScript()) { - sb << script.DebugString() << ";"; + sb << script.ProfileDebugString() << ";"; } } return sb; diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp index ee4dbdb5d376..63fd491cb010 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp @@ -8,10 +8,18 @@ #include -#include - namespace NKikimr::NOlap::NReader::NSimple { +TConclusion IFetchingStep::DoExecuteInplace( + const std::shared_ptr& sourceExt, const TFetchingScriptCursor& step) const { + const auto source = std::static_pointer_cast(sourceExt); + return DoExecuteInplace(source, step); +} + +ui64 IFetchingStep::GetProcessingDataSize(const std::shared_ptr& source) const { + return GetProcessingDataSize(std::static_pointer_cast(source)); +} + TConclusion TIndexBlobsFetchingStep::DoExecuteInplace( const std::shared_ptr& source, const TFetchingScriptCursor& step) const { return !source->StartFetchingIndexes(source, step, Indexes); @@ -170,7 +178,7 @@ TConclusion TBuildResultStep::DoExecuteInplace(const std::shared_ptr TPrepareResultStep::DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& /*step*/) const { - std::shared_ptr plan = std::make_shared(*source->GetContext()); + NCommon::TFetchingScriptBuilder acc(*source->GetContext()); if (source->IsSourceInMemory()) { AFL_VERIFY(source->GetStageResult().GetPagesToResultVerified().size() == 1); } @@ -179,8 +187,9 @@ TConclusion TPrepareResultStep::DoExecuteInplace(const std::shared_ptrGetSourceId(), i.GetIndexStart(), i.GetRecordsCount())) { continue; } - plan->AddStep(i.GetIndexStart(), i.GetRecordsCount()); + acc.AddStep(std::make_shared(i.GetIndexStart(), i.GetRecordsCount())); } + auto plan = std::move(acc).Build(); AFL_VERIFY(!plan->IsFinished(0)); source->InitFetchingPlan(plan); diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h index 460ec3b44190..d6538ae2fdf6 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h @@ -32,14 +32,9 @@ class IFetchingStep: public NCommon::IFetchingStep { } virtual TConclusion DoExecuteInplace( - const std::shared_ptr& sourceExt, const TFetchingScriptCursor& step) const override final { - const auto source = std::static_pointer_cast(sourceExt); - return DoExecuteInplace(source, step); - } + const std::shared_ptr& sourceExt, const TFetchingScriptCursor& step) const override final; - virtual ui64 GetProcessingDataSize(const std::shared_ptr& source) const override final { - return GetProcessingDataSize(std::static_pointer_cast(source)); - } + virtual ui64 GetProcessingDataSize(const std::shared_ptr& source) const override final; public: using TBase::TBase; diff --git a/ydb/core/tx/columnshard/engines/ut/ut_script.cpp b/ydb/core/tx/columnshard/engines/ut/ut_script.cpp new file mode 100644 index 000000000000..a50388e8bfb9 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/ut/ut_script.cpp @@ -0,0 +1,53 @@ +#include + +#include +#include +#include +#include + +using namespace NKikimr; +using namespace NKikimr::NOlap; +using namespace NKikimr::NOlap::NReader; + +Y_UNIT_TEST_SUITE(TestScript) { + std::shared_ptr MakeTestSchema(THashMap columns, const std::vector pkIds = { 0 }) { + for (ui64 i = 0; i < pkIds.size(); ++i) { + TValidator::CheckNotNull(columns.FindPtr(pkIds[i]))->KeyOrder = i; + } + + auto cache = std::make_shared(); + TIndexInfo info = TIndexInfo::BuildDefault(TTestStoragesManager::GetInstance(), columns, pkIds); + return std::make_shared(cache->UpsertIndexInfo(0, std::move(info)), TSnapshot(1, 1)); + } + + Y_UNIT_TEST(StepMerging) { + NCommon::TFetchingScriptBuilder acc = NCommon::TFetchingScriptBuilder::MakeForTests( + MakeTestSchema({ { 0, NTable::TColumn("c0", 0, NScheme::TTypeInfo(NScheme::NTypeIds::Int32), "") }, + { 1, NTable::TColumn("c1", 0, NScheme::TTypeInfo(NScheme::NTypeIds::Int32), "") }, + { 2, NTable::TColumn("c2", 0, NScheme::TTypeInfo(NScheme::NTypeIds::Int32), "") } })); + + acc.AddFetchingStep(std::vector({ 0 }), NCommon::EStageFeaturesIndexes::Filter); + acc.AddFetchingStep(std::vector({ 0 }), NCommon::EStageFeaturesIndexes::Filter); + acc.AddAssembleStep(std::vector({ 0 }), "", NCommon::EStageFeaturesIndexes::Filter, false); + acc.AddStep(std::make_shared()); + acc.AddFetchingStep(std::vector({ 0, 1 }), NCommon::EStageFeaturesIndexes::Filter); + acc.AddFetchingStep(std::vector({ 1, 2 }), NCommon::EStageFeaturesIndexes::Fetching); + acc.AddFetchingStep(std::vector({ 0 }), NCommon::EStageFeaturesIndexes::Fetching); + acc.AddAssembleStep(std::vector({ 0, 1, 2 }), "", NCommon::EStageFeaturesIndexes::Fetching, false); + acc.AddStep(std::make_shared()); + acc.AddFetchingStep(std::vector({ 0 }), NCommon::EStageFeaturesIndexes::Merge); + + auto script = std::move(acc).Build(); + UNIT_ASSERT_STRINGS_EQUAL(script->DebugString(), + "{branch:UNDEFINED;steps:[" + "{name=ALLOCATE_MEMORY::Filter;details={stage=Filter;column_ids=[Blob:0,Raw:0];};};" + "{name=FETCHING_COLUMNS;details={columns=0;};};" + "{name=ASSEMBLER;details={columns=(column_ids=0;column_names=c0;);;};};" + "{name=DELETION;details={};};" + "{name=ALLOCATE_MEMORY::Filter;details={stage=Filter;column_ids=[Blob:1];};};" + "{name=ALLOCATE_MEMORY::Fetching;details={stage=Fetching;column_ids=[Blob:2,Raw:1,Raw:2];};};" + "{name=FETCHING_COLUMNS;details={columns=1,2;};};" + "{name=ASSEMBLER;details={columns=(column_ids=1,2;column_names=c1,c2;);;};};" + "{name=DELETION;details={};};]}"); + } +} diff --git a/ydb/core/tx/columnshard/engines/ut/ya.make b/ydb/core/tx/columnshard/engines/ut/ya.make index 5b6490e05838..215cfe63763d 100644 --- a/ydb/core/tx/columnshard/engines/ut/ya.make +++ b/ydb/core/tx/columnshard/engines/ut/ya.make @@ -34,6 +34,7 @@ SRCS( ut_insert_table.cpp ut_logs_engine.cpp ut_program.cpp + ut_script.cpp helper.cpp )