diff --git a/ydb/core/formats/arrow/program/abstract.cpp b/ydb/core/formats/arrow/program/abstract.cpp index 5f3850c1880f..dd6e58ad7c01 100644 --- a/ydb/core/formats/arrow/program/abstract.cpp +++ b/ydb/core/formats/arrow/program/abstract.cpp @@ -20,13 +20,13 @@ NJson::TJsonValue IResourceProcessor::DebugJson() const { return result; } -TConclusionStatus IResourceProcessor::Execute(const std::shared_ptr& resources) const { +TConclusionStatus IResourceProcessor::Execute(const std::shared_ptr& resources, const TProcessorContext& context) const { for (auto&& i : Output) { if (resources->HasColumn(i.GetColumnId())) { return TConclusionStatus::Fail("column " + ::ToString(i.GetColumnId()) + " has already"); } } - return DoExecute(resources); + return DoExecute(resources, context); } std::optional IResourceProcessor::BuildFetchTask( diff --git a/ydb/core/formats/arrow/program/abstract.h b/ydb/core/formats/arrow/program/abstract.h index 35563d089a8c..7365d30c3c19 100644 --- a/ydb/core/formats/arrow/program/abstract.h +++ b/ydb/core/formats/arrow/program/abstract.h @@ -164,7 +164,8 @@ enum class EProcessorType { Calculation, Projection, Filter, - Aggregation + Aggregation, + Original }; class TFetchingInfo { @@ -187,20 +188,53 @@ class TFetchingInfo { } }; +class TProcessorContext { +protected: + std::vector ColumnsToFetch; + std::vector OriginalColumnsToUse; + std::vector ColumnsToDrop; + +public: + const std::vector& GetColumnsToFetch() const { + return ColumnsToFetch; + } + const std::vector& GetOriginalColumnsToUse() const { + return OriginalColumnsToUse; + } + const std::vector& GetColumnsToDrop() const { + return ColumnsToDrop; + } + + TProcessorContext( + std::vector&& toFetch, std::vector&& originalToUse, std::vector&& toDrop) + : ColumnsToFetch(std::move(toFetch)) + , OriginalColumnsToUse(std::move(originalToUse)) + , ColumnsToDrop(std::move(toDrop)) { + } +}; + class IResourceProcessor { private: YDB_READONLY_DEF(std::vector, Input); YDB_READONLY_DEF(std::vector, Output); YDB_READONLY(EProcessorType, ProcessorType, EProcessorType::Unknown); - virtual TConclusionStatus DoExecute(const std::shared_ptr& resources) const = 0; + virtual TConclusionStatus DoExecute(const std::shared_ptr& resources, const TProcessorContext& context) const = 0; virtual NJson::TJsonValue DoDebugJson() const { return NJson::JSON_MAP; } + virtual ui64 DoGetWeight() const { + return 0; + } public: - virtual std::optional BuildFetchTask(const ui32 columnId, const NAccessor::IChunkedArray::EType arrType, const std::shared_ptr& resources) const; + ui64 GetWeight() const { + return DoGetWeight(); + } + + virtual std::optional BuildFetchTask( + const ui32 columnId, const NAccessor::IChunkedArray::EType arrType, const std::shared_ptr& resources) const; virtual bool IsAggregation() const = 0; @@ -224,25 +258,21 @@ class IResourceProcessor { , ProcessorType(type) { } - [[nodiscard]] TConclusionStatus Execute(const std::shared_ptr& resources) const; + [[nodiscard]] TConclusionStatus Execute(const std::shared_ptr& resources, const TProcessorContext& context) const; }; -class TResourceProcessorStep { +class TResourceProcessorStep: public TProcessorContext { private: - YDB_READONLY_DEF(std::vector, ColumnsToFetch); - YDB_READONLY_DEF(std::vector, OriginalColumnsToUse); + using TBase = TProcessorContext; YDB_READONLY_DEF(std::shared_ptr, Processor); - YDB_READONLY_DEF(std::vector, ColumnsToDrop); public: NJson::TJsonValue DebugJson() const; TResourceProcessorStep(std::vector&& toFetch, std::vector&& originalToUse, std::shared_ptr&& processor, std::vector&& toDrop) - : ColumnsToFetch(std::move(toFetch)) - , OriginalColumnsToUse(std::move(originalToUse)) - , Processor(std::move(processor)) - , ColumnsToDrop(std::move(toDrop)) { + : TBase(std::move(toFetch), std::move(originalToUse), std::move(toDrop)) + , Processor(std::move(processor)) { AFL_VERIFY(Processor); } diff --git a/ydb/core/formats/arrow/program/aggr_keys.cpp b/ydb/core/formats/arrow/program/aggr_keys.cpp index 3b16025caa61..c1c3d29a1993 100644 --- a/ydb/core/formats/arrow/program/aggr_keys.cpp +++ b/ydb/core/formats/arrow/program/aggr_keys.cpp @@ -66,7 +66,8 @@ CH::AggFunctionId TWithKeysAggregationOption::GetHouseFunction(const EAggregate return CH::AggFunctionId::AGG_UNSPECIFIED; } -TConclusionStatus TWithKeysAggregationProcessor::DoExecute(const std::shared_ptr& resources) const { +TConclusionStatus TWithKeysAggregationProcessor::DoExecute( + const std::shared_ptr& resources, const TProcessorContext& /*context*/) const { CH::GroupByOptions funcOpts; funcOpts.assigns.reserve(AggregationKeys.size() + Aggregations.size()); funcOpts.has_nullable_key = false; diff --git a/ydb/core/formats/arrow/program/aggr_keys.h b/ydb/core/formats/arrow/program/aggr_keys.h index 9950927aa464..24cadba8a63e 100644 --- a/ydb/core/formats/arrow/program/aggr_keys.h +++ b/ydb/core/formats/arrow/program/aggr_keys.h @@ -20,7 +20,7 @@ class TAggregateFunction: public TInternalFunction { } virtual TConclusion Call( const TExecFunctionContext& context, const std::shared_ptr& resources) const override; - + TConclusion PrepareResult(arrow::Datum&& datum) const override { if (!datum.is_scalar()) { return TConclusionStatus::Fail("Aggregate result is not a scalar."); @@ -103,13 +103,14 @@ class TAggregateFunction: public TInternalFunction { return ""; } - virtual TConclusionStatus CheckIO(const std::vector& /*input*/, const std::vector& output) const override { + virtual TConclusionStatus CheckIO( + const std::vector& /*input*/, const std::vector& output) const override { if (output.size() != 1) { return TConclusionStatus::Fail("output size != 1 (" + ::ToString(output.size()) + ")"); } -// if (input.size() != 1) { -// return TConclusionStatus::Fail("input size != 1 (" + ::ToString(input.size()) + ")"); -// } + // if (input.size() != 1) { + // return TConclusionStatus::Fail("input size != 1 (" + ::ToString(input.size()) + ")"); + // } return TConclusionStatus::Success(); } }; @@ -149,7 +150,7 @@ class TWithKeysAggregationProcessor: public IResourceProcessor { std::vector AggregationKeys; std::vector Aggregations; - virtual TConclusionStatus DoExecute(const std::shared_ptr& resources) const override; + virtual TConclusionStatus DoExecute(const std::shared_ptr& resources, const TProcessorContext& context) const override; TWithKeysAggregationProcessor(std::vector&& input, std::vector&& output, std::vector&& aggregationKeys, std::vector&& aggregations) diff --git a/ydb/core/formats/arrow/program/assign_const.cpp b/ydb/core/formats/arrow/program/assign_const.cpp index 1d01cb7cd69c..726cbd2ffddd 100644 --- a/ydb/core/formats/arrow/program/assign_const.cpp +++ b/ydb/core/formats/arrow/program/assign_const.cpp @@ -10,7 +10,7 @@ namespace NKikimr::NArrow::NSSA { -TConclusionStatus TConstProcessor::DoExecute(const std::shared_ptr& resources) const { +TConclusionStatus TConstProcessor::DoExecute(const std::shared_ptr& resources, const TProcessorContext& /*context*/) const { AFL_VERIFY(GetInput().empty()); resources->AddConstantVerified(GetOutputColumnIdOnce(), ScalarConstant); return TConclusionStatus::Success(); diff --git a/ydb/core/formats/arrow/program/assign_const.h b/ydb/core/formats/arrow/program/assign_const.h index 5cbebe617b01..283721847718 100644 --- a/ydb/core/formats/arrow/program/assign_const.h +++ b/ydb/core/formats/arrow/program/assign_const.h @@ -8,7 +8,7 @@ class TConstProcessor: public IResourceProcessor { using TBase = IResourceProcessor; YDB_READONLY_DEF(std::shared_ptr, ScalarConstant); - virtual TConclusionStatus DoExecute(const std::shared_ptr& resources) const override; + virtual TConclusionStatus DoExecute(const std::shared_ptr& resources, const TProcessorContext& context) const override; virtual bool IsAggregation() const override { return false; diff --git a/ydb/core/formats/arrow/program/assign_internal.cpp b/ydb/core/formats/arrow/program/assign_internal.cpp index db87ede4d552..0d2fc2833df8 100644 --- a/ydb/core/formats/arrow/program/assign_internal.cpp +++ b/ydb/core/formats/arrow/program/assign_internal.cpp @@ -4,7 +4,8 @@ namespace NKikimr::NArrow::NSSA { -TConclusionStatus TCalculationProcessor::DoExecute(const std::shared_ptr& resources) const { +TConclusionStatus TCalculationProcessor::DoExecute( + const std::shared_ptr& resources, const TProcessorContext& /*context*/) const { if (KernelLogic) { auto resultKernel = KernelLogic->Execute(GetInput(), GetOutput(), resources); if (resultKernel.IsFail()) { diff --git a/ydb/core/formats/arrow/program/assign_internal.h b/ydb/core/formats/arrow/program/assign_internal.h index de97c1c4c8a7..f2e7caf5f4d1 100644 --- a/ydb/core/formats/arrow/program/assign_internal.h +++ b/ydb/core/formats/arrow/program/assign_internal.h @@ -3,6 +3,8 @@ #include "functions.h" #include "kernel_logic.h" +#include + namespace NKikimr::NArrow::NSSA { class TCalculationProcessor: public IResourceProcessor { @@ -14,7 +16,7 @@ class TCalculationProcessor: public IResourceProcessor { std::shared_ptr Function; - virtual TConclusionStatus DoExecute(const std::shared_ptr& resources) const override; + virtual TConclusionStatus DoExecute(const std::shared_ptr& resources, const TProcessorContext& context) const override; TCalculationProcessor(std::vector&& input, std::vector&& output, const std::shared_ptr& function, const std::shared_ptr& kernelLogic) @@ -27,6 +29,23 @@ class TCalculationProcessor: public IResourceProcessor { return Function->IsAggregation(); } + virtual ui64 DoGetWeight() const override { + if (KernelLogic) { + return 0; + } + if (!YqlOperationId) { + return 10; + } else if ((NYql::TKernelRequestBuilder::EBinaryOp)*YqlOperationId == NYql::TKernelRequestBuilder::EBinaryOp::StartsWith || + (NYql::TKernelRequestBuilder::EBinaryOp)*YqlOperationId == NYql::TKernelRequestBuilder::EBinaryOp::EndsWith) { + return 7; + } else if ((NYql::TKernelRequestBuilder::EBinaryOp)*YqlOperationId == NYql::TKernelRequestBuilder::EBinaryOp::StringContains) { + return 10; + } else if ((NYql::TKernelRequestBuilder::EBinaryOp)*YqlOperationId == NYql::TKernelRequestBuilder::EBinaryOp::Equals) { + return 5; + } + return 0; + } + public: virtual std::optional BuildFetchTask(const ui32 columnId, const NAccessor::IChunkedArray::EType arrType, const std::shared_ptr& resources) const override { diff --git a/ydb/core/formats/arrow/program/chain.cpp b/ydb/core/formats/arrow/program/chain.cpp index b3b9f3033740..4aa193e0e816 100644 --- a/ydb/core/formats/arrow/program/chain.cpp +++ b/ydb/core/formats/arrow/program/chain.cpp @@ -1,5 +1,6 @@ #include "chain.h" #include "collection.h" +#include "graph.h" namespace NKikimr::NArrow::NSSA { @@ -39,7 +40,17 @@ class TColumnUsage { }; } // namespace -TConclusion TProgramChain::Build(std::vector>&& processors, const IColumnResolver& resolver) { +TConclusion TProgramChain::Build(std::vector>&& processorsExt, const IColumnResolver& resolver) { + NOptimization::TGraph graph(std::move(processorsExt), resolver); + auto conclusion = graph.Collapse(); + if (conclusion.IsFail()) { + return conclusion; + } + auto processorsConclusion = graph.BuildChain(); + if (processorsConclusion.IsFail()) { + return processorsConclusion; + } + auto processors = processorsConclusion.DetachResult(); THashMap contextUsage; ui32 stepIdx = 0; THashSet sourceColumns; @@ -149,7 +160,7 @@ TConclusionStatus TProgramChain::Initialize() { TConclusionStatus TProgramChain::Apply(const std::shared_ptr& resources) const { for (auto&& i : Processors) { - auto status = i->Execute(resources); + auto status = i->Execute(resources, i); if (status.IsFail()) { return status; } diff --git a/ydb/core/formats/arrow/program/filter.cpp b/ydb/core/formats/arrow/program/filter.cpp index f40e498b4db8..0ca6276f9ae0 100644 --- a/ydb/core/formats/arrow/program/filter.cpp +++ b/ydb/core/formats/arrow/program/filter.cpp @@ -60,21 +60,21 @@ class TFilterVisitor: public arrow::ArrayVisitor { arrow::Status VisitImpl(const TArray& array) { AFL_VERIFY(Started); for (ui32 i = 0; i < array.length(); ++i) { - const bool columnValue = (bool)array.Value(i); const ui32 currentIdx = CursorIdx++; - FiltersMerged[currentIdx] = FiltersMerged[currentIdx] && columnValue; + FiltersMerged[currentIdx] = FiltersMerged[currentIdx] && !array.IsNull(i) && (bool)array.Value(i); } AFL_VERIFY(CursorIdx <= FiltersMerged.size()); return arrow::Status::OK(); } }; -TConclusionStatus TFilterProcessor::DoExecute(const std::shared_ptr& resources) const { +TConclusionStatus TFilterProcessor::DoExecute(const std::shared_ptr& resources, const TProcessorContext& context) const { std::vector> inputColumns; - if (ReuseColumns) { - inputColumns = resources->GetAccessors(TColumnChainInfo::ExtractColumnIds(GetInput())); - } else { + AFL_VERIFY(context.GetColumnsToDrop().size() <= 1)("size", context.GetColumnsToDrop().size()); + if (context.GetColumnsToDrop().size() && GetInputColumnIdOnce() == context.GetColumnsToDrop().front()) { inputColumns = resources->ExtractAccessors(TColumnChainInfo::ExtractColumnIds(GetInput())); + } else { + inputColumns = resources->GetAccessors(TColumnChainInfo::ExtractColumnIds(GetInput())); } TFilterVisitor filterVisitor(inputColumns.front()->GetRecordsCount()); for (auto& arr : inputColumns) { diff --git a/ydb/core/formats/arrow/program/filter.h b/ydb/core/formats/arrow/program/filter.h index 434a47077a52..dbccdf5592e3 100644 --- a/ydb/core/formats/arrow/program/filter.h +++ b/ydb/core/formats/arrow/program/filter.h @@ -6,25 +6,20 @@ namespace NKikimr::NArrow::NSSA { class TFilterProcessor: public IResourceProcessor { private: using TBase = IResourceProcessor; - const bool ReuseColumns; - virtual TConclusionStatus DoExecute(const std::shared_ptr& resources) const override; + virtual TConclusionStatus DoExecute(const std::shared_ptr& resources, const TProcessorContext& context) const override; virtual bool IsAggregation() const override { return false; } public: - TFilterProcessor(std::vector&& input, const bool reuseColumns = false) - : TBase(std::move(input), {}, EProcessorType::Filter) - , ReuseColumns(reuseColumns) - { - AFL_VERIFY(GetInput().size()); + TFilterProcessor(std::vector&& input) + : TBase(std::move(input), {}, EProcessorType::Filter) { + AFL_VERIFY(GetInput().size() == 1)("size", GetInput().size()); } - TFilterProcessor(const TColumnChainInfo& input, const bool reuseColumns = false) - : TBase({ input }, {}, EProcessorType::Filter) - , ReuseColumns(reuseColumns) - { + TFilterProcessor(const TColumnChainInfo& input) + : TBase({ input }, {}, EProcessorType::Filter) { } }; diff --git a/ydb/core/formats/arrow/program/graph.cpp b/ydb/core/formats/arrow/program/graph.cpp new file mode 100644 index 000000000000..7246dd5d69b2 --- /dev/null +++ b/ydb/core/formats/arrow/program/graph.cpp @@ -0,0 +1,281 @@ +#include "assign_const.h" +#include "assign_internal.h" +#include "filter.h" +#include "graph.h" +#include "original.h" + +#include + +#include + +namespace NKikimr::NArrow::NSSA::NOptimization { + +TGraph::TGraph(std::vector>&& processors, const IColumnResolver& resolver) { + for (auto&& i : processors) { + auto node = std::make_shared(i); + Nodes.emplace(node->GetIdentifier(), node); + for (auto&& output : i->GetOutput()) { + AFL_VERIFY(Producers.emplace(output.GetColumnId(), node.get()).second); + } + for (auto&& input : i->GetInput()) { + if (Producers.find(input.GetColumnId()) != Producers.end()) { + continue; + } + const TString name = resolver.GetColumnName(input.GetColumnId(), false); + if (!!name) { + auto nodeInput = std::make_shared( + std::make_shared(input.GetColumnId(), resolver.GetColumnName(input.GetColumnId()))); + Nodes.emplace(nodeInput->GetIdentifier(), nodeInput); + Producers.emplace(input.GetColumnId(), nodeInput.get()); + } + } + } + for (auto&& [_, i] : Nodes) { + for (auto&& p : i->GetProcessor()->GetInput()) { + auto node = GetProducerVerified(p.GetColumnId()); + node->AddDataTo(p.GetColumnId(), i); + i->AddDataFrom(p.GetColumnId(), node); + } + } +} + +TConclusion TGraph::OptimizeFilter(TGraphNode* filterNode) { + if (filterNode->GetProcessor()->GetProcessorType() != EProcessorType::Filter) { + return false; + } + if (filterNode->GetDataFrom().size() != 1) { + return TConclusionStatus::Fail("incorrect filter incoming columns (!= 1) : " + ::ToString(filterNode->GetDataFrom().size())); + } + auto* first = filterNode->GetDataFrom().begin()->second; + if (first->GetProcessor()->GetProcessorType() != EProcessorType::Calculation) { + return false; + } + auto calc = first->GetProcessorAs(); + if (!calc->GetYqlOperationId()) { + return false; + } + { + auto conclusion = OptimizeFilterWithAnd(filterNode, first, calc); + if (conclusion.IsFail()) { + return conclusion; + } + if (*conclusion) { + return true; + } + } + { + auto conclusion = OptimizeFilterWithCoalesce(filterNode, first, calc); + if (conclusion.IsFail()) { + return conclusion; + } + if (*conclusion) { + return true; + } + } + return false; +} + +TConclusion TGraph::OptimizeFilterWithAnd( + TGraphNode* filterNode, TGraphNode* filterArg, const std::shared_ptr& calc) { + if ((NYql::TKernelRequestBuilder::EBinaryOp)*calc->GetYqlOperationId() != NYql::TKernelRequestBuilder::EBinaryOp::And) { + return false; + } + if (calc->GetInput().size() < 2) { + return TConclusionStatus::Fail("incorrect and operation incoming columns (< 2) : " + ::ToString(calc->GetInput().size())); + } + for (auto&& c : calc->GetInput()) { + AddNode(std::make_shared(TColumnChainInfo(c))); + } + DetachNode(filterNode); + DetachNode(filterArg); + RemoveNode(filterNode); + RemoveNode(filterArg); + Cerr << DebugJson() << Endl; + return true; +} + +TConclusion TGraph::OptimizeFilterWithCoalesce( + TGraphNode* filterNode, TGraphNode* filterArg, const std::shared_ptr& calc) { + if ((NYql::TKernelRequestBuilder::EBinaryOp)*calc->GetYqlOperationId() != NYql::TKernelRequestBuilder::EBinaryOp::Coalesce) { + return false; + } + if (calc->GetInput().size() != 2) { + return TConclusionStatus::Fail("incorrect coalesce incoming columns (!= 2) : " + ::ToString(calc->GetInput().size())); + } + TGraphNode* dataNode = GetProducerVerified(calc->GetInput()[0].GetColumnId()); + TGraphNode* argNode = GetProducerVerified(calc->GetInput()[1].GetColumnId()); + if (argNode->GetProcessor()->GetProcessorType() != EProcessorType::Const) { + return false; + } + auto scalar = argNode->GetProcessorAs()->GetScalarConstant(); + if (!scalar) { + return TConclusionStatus::Fail("coalesce with null arg is impossible"); + } + if (scalar) { + bool doOptimize = false; + NArrow::SwitchType(scalar->type->id(), [&](const auto& type) { + using TWrap = std::decay_t; + using T = typename TWrap::T; + using TScalar = typename arrow::TypeTraits::ScalarType; + auto& typedScalar = static_cast(*scalar); + if constexpr (arrow::has_c_type()) { + doOptimize = (typedScalar.value == 0); + } + return true; + }); + if (!doOptimize) { + return false; + } + } + for (auto&& c : dataNode->GetProcessor()->GetOutput()) { + AddNode(std::make_shared(TColumnChainInfo(c.GetColumnId()))); + } + DetachNode(filterNode); + DetachNode(filterArg); + RemoveNode(filterNode); + RemoveNode(filterArg); + if (argNode->GetDataFrom().empty() && argNode->GetDataTo().empty()) { + RemoveNode(argNode); + } + return true; +} + +TConclusionStatus TGraph::Collapse() { + bool hasChanges = true; + // Cerr << DebugJson() << Endl; + while (hasChanges) { + hasChanges = false; + for (auto&& [_, n] : Nodes) { + { + auto conclusion = OptimizeFilter(n.get()); + if (conclusion.IsFail()) { + return conclusion; + } + if (*conclusion) { + hasChanges = true; + break; + } + } + } + } + return TConclusionStatus::Success(); +} + +class TFilterChain { +private: + YDB_READONLY_DEF(std::vector, Nodes); + ui64 Weight = 0; + +public: + TFilterChain(const std::vector& nodes) + : Nodes(nodes) { + for (auto&& i : nodes) { + Weight += i->GetProcessor()->GetWeight(); + } + } + + bool operator<(const TFilterChain& item) const { + return Weight < item.Weight; + } +}; + +TConclusion>> TGraph::BuildChain() { + std::vector nodeChains; + THashSet readyNodeIds; + for (auto&& [_, i] : Nodes) { + if (i->GetProcessor()->GetProcessorType() == EProcessorType::Filter) { + std::vector chain = i->GetFetchingChain(); + std::vector actualChain; + for (auto&& c : chain) { + if (readyNodeIds.emplace(c->GetIdentifier()).second) { + actualChain.emplace_back(c); + } + } + AFL_VERIFY(actualChain.size()); + nodeChains.emplace_back(std::move(actualChain)); + } + } + std::sort(nodeChains.begin(), nodeChains.end()); + for (auto&& [_, i] : Nodes) { + if (i->GetProcessor()->GetProcessorType() != EProcessorType::Filter && i->GetProcessor()->GetOutput().empty()) { + std::vector chain = i->GetFetchingChain(); + std::vector actualChain; + for (auto&& c : chain) { + if (readyNodeIds.emplace(c->GetIdentifier()).second) { + actualChain.emplace_back(c); + } + } + AFL_VERIFY(actualChain.size()); + nodeChains.emplace_back(std::move(actualChain)); + } + } + if (readyNodeIds.size() != Nodes.size()) { + std::set notCoveredIds; + TStringBuilder sb; + ui32 count = 0; + for (auto&& [id, n] : Nodes) { + if (!readyNodeIds.contains(id)) { + if (n->GetProcessor()->GetProcessorType() != EProcessorType::Const) { + ++count; + } + sb << n->DebugJson().GetStringRobust() << "/" << n->GetProcessor()->DebugJson().GetStringRobust() << Endl; + } + } + if (count) { + return TConclusionStatus::Fail( + "not found final nodes: " + ::ToString(readyNodeIds.size()) + " covered from " + ::ToString(Nodes.size()) + ": details = " + sb); + } + } + std::vector> result; + for (auto&& c : nodeChains) { + for (auto&& p : c.GetNodes()) { + if (p->GetProcessor()->GetProcessorType() != EProcessorType::Original) { + result.emplace_back(p->GetProcessor()); + } + } + } + return result; +} + +void TGraph::AddNode(const std::shared_ptr& processor) { + auto node = std::make_shared(processor); + Nodes.emplace(node->GetIdentifier(), node); + for (auto&& i : processor->GetInput()) { + auto nodeProducer = GetProducerVerified(i.GetColumnId()); + nodeProducer->AddDataTo(i.GetColumnId(), node); + node->AddDataFrom(i.GetColumnId(), nodeProducer); + } +} + +void TGraph::RemoveNode(TGraphNode* node) { + Nodes.erase(node->GetIdentifier()); +} + +void TGraph::DetachNode(TGraphNode* node) { + for (auto&& i : node->GetDataFrom()) { + i.second->RemoveDataTo(i.first.AnotherNodeId(node->GetIdentifier())); + } + for (auto&& i : node->GetDataTo()) { + i.second->RemoveDataFrom(i.first.AnotherNodeId(node->GetIdentifier())); + } +} + +std::vector TGraphNode::GetFetchingChain() const { + std::vector result; + result.emplace_back(this); + ui32 frontStart = 0; + ui32 frontFinish = result.size(); + while (frontFinish > frontStart) { + for (ui32 i = frontStart; i < frontFinish; ++i) { + for (auto&& input : result[i]->GetDataFrom()) { + result.emplace_back(input.second); + } + } + frontStart = frontFinish; + frontFinish = result.size(); + } + std::reverse(result.begin(), result.end()); + return result; +} + +} // namespace NKikimr::NArrow::NSSA::NOptimization diff --git a/ydb/core/formats/arrow/program/graph.h b/ydb/core/formats/arrow/program/graph.h new file mode 100644 index 000000000000..d1e27b7094f8 --- /dev/null +++ b/ydb/core/formats/arrow/program/graph.h @@ -0,0 +1,147 @@ +#pragma once +#include "abstract.h" + +#include + +namespace NKikimr::NArrow::NSSA { +class TCalculationProcessor; +} + +namespace NKikimr::NArrow::NSSA::NOptimization { + +class TGraphNode { +private: + static inline TAtomicCounter Counter = 0; + YDB_READONLY(i64, Identifier, Counter.Inc()); + YDB_READONLY_DEF(std::shared_ptr, Processor); + class TAddress { + private: + const ui32 ColumnId; + const i64 NodeId; + + public: + TAddress(const ui32 columnId, const i64 nodeId) + : ColumnId(columnId) + , NodeId(nodeId) { + } + + TAddress AnotherNodeId(const i64 nodeId) const { + return TAddress(ColumnId, nodeId); + } + + bool operator<(const TAddress& item) const { + return std::tie(ColumnId, NodeId) < std::tie(item.ColumnId, item.NodeId); + } + + NJson::TJsonValue DebugJson() const { + NJson::TJsonValue result = NJson::JSON_MAP; + result.InsertValue("c", ColumnId); + result.InsertValue("n", NodeId); + return result; + } + }; + std::map DataFrom; + std::map DataTo; + +public: + NJson::TJsonValue DebugJson() const { + NJson::TJsonValue result = NJson::JSON_MAP; + result.InsertValue("id", Identifier); + auto& inputArr = result.InsertValue("input", NJson::JSON_ARRAY); + for (auto&& i : DataFrom) { + inputArr.AppendValue(i.first.DebugJson()); + } + auto& outputArr = result.InsertValue("output", NJson::JSON_ARRAY); + for (auto&& i : DataTo) { + outputArr.AppendValue(i.first.DebugJson()); + } + return result; + } + + const std::map& GetDataTo() const { + return DataTo; + } + + const std::map& GetDataFrom() const { + return DataFrom; + } + + TGraphNode(const std::shared_ptr& processor) + : Processor(processor) { + AFL_VERIFY(Processor); + } + + template + std::shared_ptr GetProcessorAs() const { + return std::static_pointer_cast(Processor); + } + + void AddDataFrom(const ui32 columnId, const std::shared_ptr& node) { + AFL_VERIFY(node); + AFL_VERIFY(DataFrom.emplace(TAddress(columnId, node->GetIdentifier()), node.get()).second); + } + + void AddDataFrom(const ui32 columnId, TGraphNode* node) { + AFL_VERIFY(node); + AFL_VERIFY(DataFrom.emplace(TAddress(columnId, node->GetIdentifier()), node).second); + } + + void AddDataTo(const ui32 columnId, TGraphNode* node) { + AFL_VERIFY(node); + AFL_VERIFY(DataTo.emplace(TAddress(columnId, node->GetIdentifier()), node).second); + } + + void AddDataTo(const ui32 columnId, const std::shared_ptr& node) { + AFL_VERIFY(node); + AFL_VERIFY(DataTo.emplace(TAddress(columnId, node->GetIdentifier()), node.get()).second); + } + + void RemoveDataFrom(const TAddress& addr) { + AFL_VERIFY(DataFrom.erase(addr))("addr", addr.DebugJson())("info", DebugJson()); + } + + void RemoveDataTo(const TAddress& addr) { + AFL_VERIFY(DataTo.erase(addr))("addr", addr.DebugJson())("info", DebugJson()); + } + + std::vector GetFetchingChain() const; +}; + +class TGraph { +private: + std::map> Nodes; + THashMap Producers; + TGraphNode* GetProducerVerified(const ui32 columnId) { + auto it = Producers.find(columnId); + AFL_VERIFY(it != Producers.end()); + return it->second; + } + TConclusion OptimizeFilter(TGraphNode* filterNode); + TConclusion OptimizeFilterWithCoalesce(TGraphNode* filterNode, TGraphNode* filterArg, const std::shared_ptr& calc); + TConclusion OptimizeFilterWithAnd(TGraphNode* filterNode, TGraphNode* filterArg, const std::shared_ptr& calc); + + void RemoveNode(TGraphNode* node); + void DetachNode(TGraphNode* node); + void Connect(TGraphNode* from, TGraphNode* to, const ui32 columnId) { + from->AddDataTo(columnId, to); + to->AddDataFrom(columnId, from); + } + + void AddNode(const std::shared_ptr& processor); + NJson::TJsonValue DebugJson() const { + NJson::TJsonValue result = NJson::JSON_MAP; + auto& nodesArr = result.InsertValue("nodes", NJson::JSON_ARRAY); + for (auto&& i : Nodes) { + nodesArr.AppendValue(i.second->DebugJson()); + } + return result; + } + +public: + TGraph(std::vector>&& processors, const IColumnResolver& resolver); + + TConclusionStatus Collapse(); + + TConclusion>> BuildChain(); +}; +} // namespace NKikimr::NArrow::NSSA::NOptimization diff --git a/ydb/core/formats/arrow/program/original.cpp b/ydb/core/formats/arrow/program/original.cpp new file mode 100644 index 000000000000..8dcb5daff7bc --- /dev/null +++ b/ydb/core/formats/arrow/program/original.cpp @@ -0,0 +1,5 @@ +#include "original.h" + +namespace NKikimr::NArrow::NSSA { + +} // namespace NKikimr::NArrow::NSSA diff --git a/ydb/core/formats/arrow/program/original.h b/ydb/core/formats/arrow/program/original.h new file mode 100644 index 000000000000..f5716173ce1e --- /dev/null +++ b/ydb/core/formats/arrow/program/original.h @@ -0,0 +1,39 @@ +#pragma once +#include "abstract.h" +#include "functions.h" +#include "kernel_logic.h" + +namespace NKikimr::NArrow::NSSA { + +class TOriginalColumnProcessor: public IResourceProcessor { +private: + using TBase = IResourceProcessor; + + YDB_ACCESSOR(ui32, ColumnId, 0); + YDB_ACCESSOR_DEF(TString, ColumnName); + + virtual TConclusionStatus DoExecute( + const std::shared_ptr& /*resources*/, const TProcessorContext& /*context*/) const override { + AFL_VERIFY(false); + return TConclusionStatus::Success(); + } + + virtual bool IsAggregation() const override { + return false; + } + +public: + TOriginalColumnProcessor(const ui32 columnId, const TString& columnName) + : TBase({}, { columnId }, EProcessorType::Original) + , ColumnName(columnName) { + AFL_VERIFY(!!ColumnName); + } + + virtual std::optional BuildFetchTask(const ui32 columnId, const NAccessor::IChunkedArray::EType arrType, + const std::shared_ptr& resources) const override { + AFL_VERIFY(false); + return TBase::BuildFetchTask(columnId, arrType, resources); + } +}; + +} // namespace NKikimr::NArrow::NSSA diff --git a/ydb/core/formats/arrow/program/projection.cpp b/ydb/core/formats/arrow/program/projection.cpp index 37951230f50e..092aa4f4c45a 100644 --- a/ydb/core/formats/arrow/program/projection.cpp +++ b/ydb/core/formats/arrow/program/projection.cpp @@ -3,7 +3,8 @@ namespace NKikimr::NArrow::NSSA { -TConclusionStatus TProjectionProcessor::DoExecute(const std::shared_ptr& resources) const { +TConclusionStatus TProjectionProcessor::DoExecute( + const std::shared_ptr& resources, const TProcessorContext& /*context*/) const { resources->RemainOnly(TColumnChainInfo::ExtractColumnIds(GetInput()), true); return TConclusionStatus::Success(); } diff --git a/ydb/core/formats/arrow/program/projection.h b/ydb/core/formats/arrow/program/projection.h index 7084d33150d9..2c8c36437364 100644 --- a/ydb/core/formats/arrow/program/projection.h +++ b/ydb/core/formats/arrow/program/projection.h @@ -7,7 +7,7 @@ class TProjectionProcessor: public IResourceProcessor { private: using TBase = IResourceProcessor; - virtual TConclusionStatus DoExecute(const std::shared_ptr& resources) const override; + virtual TConclusionStatus DoExecute(const std::shared_ptr& resources, const TProcessorContext& context) const override; virtual bool IsAggregation() const override { return false; diff --git a/ydb/core/formats/arrow/program/ya.make b/ydb/core/formats/arrow/program/ya.make index 8b6b536668a8..860b73373570 100644 --- a/ydb/core/formats/arrow/program/ya.make +++ b/ydb/core/formats/arrow/program/ya.make @@ -23,6 +23,8 @@ ENDIF() SRCS( abstract.cpp + graph.cpp + original.cpp collection.cpp functions.cpp aggr_keys.cpp @@ -39,4 +41,6 @@ SRCS( GENERATE_ENUM_SERIALIZATION(abstract.h) GENERATE_ENUM_SERIALIZATION(aggr_common.h) +YQL_LAST_ABI_VERSION() + END() diff --git a/ydb/core/formats/arrow/ut/ut_program_step.cpp b/ydb/core/formats/arrow/ut/ut_program_step.cpp index a0c40c2a2008..e38b72e4b42d 100644 --- a/ydb/core/formats/arrow/ut/ut_program_step.cpp +++ b/ydb/core/formats/arrow/ut/ut_program_step.cpp @@ -41,7 +41,7 @@ size_t FilterTest(const std::vector>& args, const TProgramChain::TBuilder builder(resolver); builder.Add(TCalculationProcessor::Build(TColumnChainInfo::BuildVector({1, 2}), TColumnChainInfo(4), std::make_shared(op1)).DetachResult()); builder.Add(TCalculationProcessor::Build(TColumnChainInfo::BuildVector({4, 3}), TColumnChainInfo(5), std::make_shared(op2)).DetachResult()); - builder.Add(std::make_shared(TColumnChainInfo::BuildVector({ 5 }), true)); + builder.Add(std::make_shared(TColumnChainInfo::BuildVector({ 5 }))); builder.Add(std::make_shared(TColumnChainInfo::BuildVector({ 4, 5 }))); auto chain = builder.Finish().DetachResult(); auto resources = std::make_shared(); @@ -61,7 +61,7 @@ size_t FilterTestUnary(std::vector> args, const EO TProgramChain::TBuilder builder(resolver); builder.Add(TCalculationProcessor::Build(TColumnChainInfo::BuildVector({1}), TColumnChainInfo(4), std::make_shared(op1)).DetachResult()); builder.Add(TCalculationProcessor::Build(TColumnChainInfo::BuildVector({2, 4}), TColumnChainInfo(5), std::make_shared(op2)).DetachResult()); - builder.Add(std::make_shared(TColumnChainInfo::BuildVector({ 5 }), true)); + builder.Add(std::make_shared(TColumnChainInfo::BuildVector({ 5 }))); builder.Add(std::make_shared(TColumnChainInfo::BuildVector({ 4, 5 }))); auto chain = builder.Finish().DetachResult(); auto resources = std::make_shared(); @@ -488,7 +488,7 @@ Y_UNIT_TEST_SUITE(ProgramStep) { TProgramChain::TBuilder builder(resolver); builder.Add(std::make_shared(std::make_shared(56), 3)); builder.Add(TCalculationProcessor::Build(TColumnChainInfo::BuildVector({1, 3}), TColumnChainInfo(4), std::make_shared(EOperation::Add)).DetachResult()); - builder.Add(std::make_shared(TColumnChainInfo::BuildVector({ 2 }), true)); + builder.Add(std::make_shared(TColumnChainInfo::BuildVector({ 2 }))); builder.Add(std::make_shared(TColumnChainInfo::BuildVector({ 2, 4 }))); auto chain = builder.Finish().DetachResult(); auto resources = std::make_shared(); @@ -501,6 +501,75 @@ Y_UNIT_TEST_SUITE(ProgramStep) { AFL_VERIFY(resources->GetRecordsCountVerified() == 2); } + Y_UNIT_TEST(TestValueFromNull) { + arrow::UInt32Builder sb; + sb.AppendNulls(10).ok(); + auto arr = std::dynamic_pointer_cast(*sb.Finish()); + AFL_VERIFY(arr->Value(0) == 0)("val", arr->Value(0)); + } + + Y_UNIT_TEST(SplitFilterSimple) { + std::vector data = { "aa", "aaa", "aaaa", "bbbbb" }; + arrow::StringBuilder sb; + sb.AppendValues(data).ok(); + + auto schema = std::make_shared( + std::vector{ std::make_shared("int", arrow::int64()), std::make_shared("string", arrow::utf8()) }); + auto batch = arrow::RecordBatch::Make(schema, 4, std::vector{ NumVecToArray(arrow::int64(), { 64, 5, 1, 43 }), *sb.Finish() }); + UNIT_ASSERT(batch->ValidateFull().ok()); + + TSchemaColumnResolver resolver(schema); + TProgramChain::TBuilder builder(resolver); + builder.Add(std::make_shared(std::make_shared(56), 3)); + builder.Add(std::make_shared(std::make_shared(0), 4)); + + { + auto proc = TCalculationProcessor::Build(TColumnChainInfo::BuildVector({2}), TColumnChainInfo(1001), std::make_shared(EOperation::MatchSubstring)).DetachResult(); + proc->SetYqlOperationId((ui32)NYql::TKernelRequestBuilder::EBinaryOp::StringContains); + builder.Add(proc); + } + { + auto proc = TCalculationProcessor::Build(TColumnChainInfo::BuildVector({1001, 4}), TColumnChainInfo(1101), std::make_shared(EOperation::Add)).DetachResult(); + proc->SetYqlOperationId((ui32)NYql::TKernelRequestBuilder::EBinaryOp::Coalesce); + builder.Add(proc); + } + { + auto proc = + TCalculationProcessor::Build(TColumnChainInfo::BuildVector({2}), TColumnChainInfo(1002), std::make_shared(EOperation::StartsWith)).DetachResult(); + proc->SetYqlOperationId((ui32)NYql::TKernelRequestBuilder::EBinaryOp::StartsWith); + builder.Add(proc); + } + { + auto proc = TCalculationProcessor::Build(TColumnChainInfo::BuildVector({1002, 4}), TColumnChainInfo(1102), std::make_shared(EOperation::Add)).DetachResult(); + proc->SetYqlOperationId((ui32)NYql::TKernelRequestBuilder::EBinaryOp::Coalesce); + builder.Add(proc); + } + { + auto proc = + TCalculationProcessor::Build(TColumnChainInfo::BuildVector({1, 3}), TColumnChainInfo(1003), std::make_shared(EOperation::Equal)).DetachResult(); + proc->SetYqlOperationId((ui32)NYql::TKernelRequestBuilder::EBinaryOp::Equals); + builder.Add(proc); + } + { + auto proc = TCalculationProcessor::Build(TColumnChainInfo::BuildVector({1003, 4}), TColumnChainInfo(1103), std::make_shared(EOperation::Add)).DetachResult(); + proc->SetYqlOperationId((ui32)NYql::TKernelRequestBuilder::EBinaryOp::Coalesce); + builder.Add(proc); + } + + auto andOperator1 = TCalculationProcessor::Build(TColumnChainInfo::BuildVector({1101, 1102}), TColumnChainInfo(1104), std::make_shared(EOperation::And)).DetachResult(); + andOperator1->SetYqlOperationId(0); + builder.Add(andOperator1); + + auto andOperator2 = TCalculationProcessor::Build(TColumnChainInfo::BuildVector({1104, 1103}), TColumnChainInfo(1105), std::make_shared(EOperation::And)).DetachResult(); + andOperator2->SetYqlOperationId(0); + builder.Add(andOperator2); + builder.Add(std::make_shared(TColumnChainInfo::BuildVector({ 1105 }))); + builder.Add(std::make_shared(TColumnChainInfo::BuildVector({ 1, 2 }))); + auto chain = builder.Finish().DetachResult(); + Cerr << chain->DebugJson() << Endl; + AFL_VERIFY(chain->DebugString() == R"({"processors":[{"processor":{"internal":{},"type":"Const","output":"3"}},{"processor":{"internal":{},"type":"Calculation","input":"1,3","output":"1003"},"fetch":"1","drop":"3"},{"processor":{"internal":{},"type":"Filter","input":"1003"},"drop":"1003"},{"processor":{"internal":{},"type":"Calculation","input":"2","output":"1002"},"fetch":"2"},{"processor":{"internal":{},"type":"Filter","input":"1002"},"drop":"1002"},{"processor":{"internal":{},"type":"Calculation","input":"2","output":"1001"}},{"processor":{"internal":{},"type":"Filter","input":"1001"},"drop":"1001"},{"processor":{"internal":{},"type":"Projection","input":"1,2"}}]})"); + } + Y_UNIT_TEST(Projection) { auto schema = std::make_shared( std::vector{ std::make_shared("x", arrow::int64()), std::make_shared("y", arrow::boolean()) }); diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp index 937f049a43b9..aecf4dd77a65 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp @@ -207,7 +207,7 @@ TConclusion TProgramStepPrepare::DoExecuteInplace(const std::shared_ptr TProgramStep::DoExecuteInplace(const std::shared_ptr& source, const TFetchingScriptCursor& /*cursor*/) const { // NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()( // "program", source->GetContext()->GetCommonContext()->GetReadMetadata()->GetProgram().ProtoDebugString()); - auto result = Step->Execute(source->GetStageData().GetTable()); + auto result = Step->Execute(source->GetStageData().GetTable(), Step); if (result.IsFail()) { return result; } diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/sub_columns_fetching.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/sub_columns_fetching.h index b6f29ff023aa..276751c9fa4f 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/sub_columns_fetching.h +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/sub_columns_fetching.h @@ -78,10 +78,9 @@ class TColumnChunkRestoreInfo { i.second.GetBlobDataVerified().size()); std::vector chunks = { NArrow::NAccessor::TDeserializeChunkedArray::TChunk( GetRecordsCount(), i.second.GetBlobDataVerified()) }; -// const ui32 filledRecordsCount = PartialArray->GetHeader().GetColumnStats().GetColumnRecordsCount(i.second.GetColumnIdx()); const std::shared_ptr arrOriginal = deserialize - ? columnLoader->ApplyVerified(i.second.GetBlobDataVerified(), GetRecordsCount()/*, filledRecordsCount*/) + ? columnLoader->ApplyVerified(i.second.GetBlobDataVerified(), GetRecordsCount()) : std::make_shared(GetRecordsCount(), columnLoader, std::move(chunks), true); if (applyFilter) { PartialArray->AddColumn(i.first, applyFilter->Apply(arrOriginal)); @@ -122,8 +121,8 @@ class TColumnChunkRestoreInfo { AFL_VERIFY(!PartialArray); HeaderRange = std::nullopt; PartialArray = NArrow::NAccessor::NSubColumns::TConstructor::BuildPartialReader(blob, ChunkExternalInfo).DetachResult(); -// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("columns", PartialArray->GetHeader().GetColumnStats().DebugJson().GetStringRobust())( -// "others", PartialArray->GetHeader().GetOtherStats().DebugJson().GetStringRobust()); + // AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("columns", PartialArray->GetHeader().GetColumnStats().DebugJson().GetStringRobust())( + // "others", PartialArray->GetHeader().GetOtherStats().DebugJson().GetStringRobust()); } void InitPartialReader(const std::shared_ptr& accessor) {