Skip to content

Commit 1cef7e8

Browse files
Merge b58265b into b0bd8f9
2 parents b0bd8f9 + b58265b commit 1cef7e8

File tree

10 files changed

+497
-3
lines changed

10 files changed

+497
-3
lines changed

ydb/core/formats/arrow/program/abstract.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,8 @@ enum class EProcessorType {
164164
Calculation,
165165
Projection,
166166
Filter,
167-
Aggregation
167+
Aggregation,
168+
Original
168169
};
169170

170171
class TFetchingInfo {
@@ -198,8 +199,15 @@ class IResourceProcessor {
198199
virtual NJson::TJsonValue DoDebugJson() const {
199200
return NJson::JSON_MAP;
200201
}
202+
virtual ui64 DoGetWeight() const {
203+
return 0;
204+
}
201205

202206
public:
207+
ui64 GetWeight() const {
208+
return DoGetWeight();
209+
}
210+
203211
virtual std::optional<TFetchingInfo> BuildFetchTask(const ui32 columnId, const NAccessor::IChunkedArray::EType arrType, const std::shared_ptr<TAccessorsCollection>& resources) const;
204212

205213
virtual bool IsAggregation() const = 0;

ydb/core/formats/arrow/program/assign_internal.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
#include "functions.h"
44
#include "kernel_logic.h"
55

6+
#include <yql/essentials/core/arrow_kernels/request/request.h>
7+
68
namespace NKikimr::NArrow::NSSA {
79

810
class TCalculationProcessor: public IResourceProcessor {
@@ -27,6 +29,23 @@ class TCalculationProcessor: public IResourceProcessor {
2729
return Function->IsAggregation();
2830
}
2931

32+
virtual ui64 DoGetWeight() const override {
33+
if (KernelLogic) {
34+
return 0;
35+
}
36+
if (!YqlOperationId) {
37+
return 10;
38+
} else if ((NYql::TKernelRequestBuilder::EBinaryOp)*YqlOperationId == NYql::TKernelRequestBuilder::EBinaryOp::StartsWith ||
39+
(NYql::TKernelRequestBuilder::EBinaryOp)*YqlOperationId == NYql::TKernelRequestBuilder::EBinaryOp::EndsWith) {
40+
return 7;
41+
} else if ((NYql::TKernelRequestBuilder::EBinaryOp)*YqlOperationId == NYql::TKernelRequestBuilder::EBinaryOp::StringContains) {
42+
return 10;
43+
} else if ((NYql::TKernelRequestBuilder::EBinaryOp)*YqlOperationId == NYql::TKernelRequestBuilder::EBinaryOp::Equals) {
44+
return 5;
45+
}
46+
return 0;
47+
}
48+
3049
public:
3150
virtual std::optional<TFetchingInfo> BuildFetchTask(const ui32 columnId, const NAccessor::IChunkedArray::EType arrType,
3251
const std::shared_ptr<TAccessorsCollection>& resources) const override {

ydb/core/formats/arrow/program/chain.cpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "chain.h"
22
#include "collection.h"
3+
#include "graph.h"
34

45
namespace NKikimr::NArrow::NSSA {
56

@@ -39,7 +40,17 @@ class TColumnUsage {
3940
};
4041
} // namespace
4142

42-
TConclusion<TProgramChain> TProgramChain::Build(std::vector<std::shared_ptr<IResourceProcessor>>&& processors, const IColumnResolver& resolver) {
43+
TConclusion<TProgramChain> TProgramChain::Build(std::vector<std::shared_ptr<IResourceProcessor>>&& processorsExt, const IColumnResolver& resolver) {
44+
NOptimization::TGraph graph(std::move(processorsExt), resolver);
45+
auto conclusion = graph.Collapse();
46+
if (conclusion.IsFail()) {
47+
return conclusion;
48+
}
49+
auto processorsConclusion = graph.BuildChain();
50+
if (processorsConclusion.IsFail()) {
51+
return processorsConclusion;
52+
}
53+
auto processors = processorsConclusion.DetachResult();
4354
THashMap<ui32, TColumnUsage> contextUsage;
4455
ui32 stepIdx = 0;
4556
THashSet<ui32> sourceColumns;

ydb/core/formats/arrow/program/filter.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ class TFilterVisitor: public arrow::ArrayVisitor {
6060
arrow::Status VisitImpl(const TArray& array) {
6161
AFL_VERIFY(Started);
6262
for (ui32 i = 0; i < array.length(); ++i) {
63-
const bool columnValue = (bool)array.Value(i);
63+
const bool columnValue = !array.IsNull(i) && (bool)array.Value(i);
6464
const ui32 currentIdx = CursorIdx++;
6565
FiltersMerged[currentIdx] = FiltersMerged[currentIdx] && columnValue;
6666
}
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
#include "assign_internal.h"
2+
#include "filter.h"
3+
#include "graph.h"
4+
#include "original.h"
5+
6+
#include <yql/essentials/core/arrow_kernels/request/request.h>
7+
8+
namespace NKikimr::NArrow::NSSA::NOptimization {
9+
10+
TGraph::TGraph(std::vector<std::shared_ptr<IResourceProcessor>>&& processors, const IColumnResolver& resolver) {
11+
for (auto&& i : processors) {
12+
auto node = std::make_shared<TGraphNode>(i);
13+
Nodes.emplace(node->GetIdentifier(), node);
14+
for (auto&& output : i->GetOutput()) {
15+
AFL_VERIFY(Producers.emplace(output.GetColumnId(), node.get()).second);
16+
}
17+
for (auto&& input : i->GetInput()) {
18+
if (Producers.find(input.GetColumnId()) != Producers.end()) {
19+
continue;
20+
}
21+
const TString name = resolver.GetColumnName(input.GetColumnId(), false);
22+
if (!!name) {
23+
auto nodeInput = std::make_shared<TGraphNode>(
24+
std::make_shared<TOriginalColumnProcessor>(input.GetColumnId(), resolver.GetColumnName(input.GetColumnId())));
25+
Nodes.emplace(nodeInput->GetIdentifier(), nodeInput);
26+
Producers.emplace(input.GetColumnId(), nodeInput.get());
27+
}
28+
}
29+
}
30+
for (auto&& [_, i] : Nodes) {
31+
for (auto&& p : i->GetProcessor()->GetInput()) {
32+
auto node = GetProducerVerified(p.GetColumnId());
33+
node->AddDataTo(p.GetColumnId(), i);
34+
i->AddDataFrom(p.GetColumnId(), node);
35+
}
36+
}
37+
}
38+
39+
TConclusionStatus TGraph::Collapse() {
40+
bool hasChanges = true;
41+
// Cerr << DebugJson() << Endl;
42+
while (hasChanges) {
43+
hasChanges = false;
44+
for (auto&& [_, n] : Nodes) {
45+
if (n->GetProcessor()->GetProcessorType() == EProcessorType::Filter) {
46+
if (n->GetDataFrom().size() != 1) {
47+
return TConclusionStatus::Fail("incorrect filter incoming columns (!= 1) : " + ::ToString(n->GetDataFrom().size()));
48+
}
49+
auto* first = n->GetDataFrom().begin()->second;
50+
if (first->GetProcessor()->GetProcessorType() == EProcessorType::Calculation) {
51+
auto calc = first->GetProcessorAs<TCalculationProcessor>();
52+
if (calc->GetYqlOperationId() &&
53+
(NYql::TKernelRequestBuilder::EBinaryOp)*calc->GetYqlOperationId() == NYql::TKernelRequestBuilder::EBinaryOp::And) {
54+
for (auto&& c : calc->GetInput()) {
55+
AddNode(std::make_shared<TFilterProcessor>(TColumnChainInfo(c)));
56+
}
57+
DetachNode(n.get());
58+
DetachNode(first);
59+
RemoveNode(n.get());
60+
RemoveNode(first);
61+
Cerr << DebugJson() << Endl;
62+
hasChanges = true;
63+
break;
64+
}
65+
}
66+
if (first->GetProcessor()->GetProcessorType() == EProcessorType::Calculation) {
67+
auto calc = first->GetProcessorAs<TCalculationProcessor>();
68+
if (calc->GetYqlOperationId() &&
69+
(NYql::TKernelRequestBuilder::EBinaryOp)*calc->GetYqlOperationId() == NYql::TKernelRequestBuilder::EBinaryOp::Coalesce) {
70+
if (calc->GetInput().size() != 2) {
71+
return TConclusionStatus::Fail(
72+
"incorrect coalesce incoming columns (!= 2) : " + ::ToString(calc->GetInput().size()));
73+
}
74+
TGraphNode* dataNode = GetProducerVerified(calc->GetInput().front().GetColumnId());
75+
for (auto&& c : dataNode->GetProcessor()->GetOutput()) {
76+
AddNode(std::make_shared<TFilterProcessor>(TColumnChainInfo(c.GetColumnId())));
77+
}
78+
DetachNode(n.get());
79+
DetachNode(first);
80+
RemoveNode(n.get());
81+
RemoveNode(first);
82+
hasChanges = true;
83+
break;
84+
}
85+
}
86+
}
87+
}
88+
}
89+
return TConclusionStatus::Success();
90+
}
91+
92+
class TFilterChain {
93+
private:
94+
YDB_READONLY_DEF(std::vector<const TGraphNode*>, Nodes);
95+
ui64 Weight = 0;
96+
97+
public:
98+
TFilterChain(const std::vector<const TGraphNode*>& nodes)
99+
: Nodes(nodes) {
100+
for (auto&& i : nodes) {
101+
Weight += i->GetProcessor()->GetWeight();
102+
}
103+
}
104+
105+
bool operator<(const TFilterChain& item) const {
106+
return Weight < item.Weight;
107+
}
108+
};
109+
110+
TConclusion<std::vector<std::shared_ptr<IResourceProcessor>>> TGraph::BuildChain() {
111+
std::vector<TFilterChain> nodeChains;
112+
THashSet<i64> readyNodeIds;
113+
for (auto&& [_, i] : Nodes) {
114+
if (i->GetProcessor()->GetProcessorType() == EProcessorType::Filter) {
115+
std::vector<const TGraphNode*> chain = i->GetFetchingChain();
116+
std::vector<const TGraphNode*> actualChain;
117+
for (auto&& c : chain) {
118+
if (readyNodeIds.emplace(c->GetIdentifier()).second) {
119+
actualChain.emplace_back(c);
120+
}
121+
}
122+
AFL_VERIFY(actualChain.size());
123+
nodeChains.emplace_back(std::move(actualChain));
124+
}
125+
}
126+
std::sort(nodeChains.begin(), nodeChains.end());
127+
bool found = false;
128+
for (auto&& [_, i] : Nodes) {
129+
if (i->GetProcessor()->GetProcessorType() == EProcessorType::Projection) {
130+
if (found) {
131+
return TConclusionStatus::Fail("detected projections duplication");
132+
}
133+
found = true;
134+
std::vector<const TGraphNode*> chain = i->GetFetchingChain();
135+
std::vector<const TGraphNode*> actualChain;
136+
for (auto&& c : chain) {
137+
if (readyNodeIds.emplace(c->GetIdentifier()).second) {
138+
actualChain.emplace_back(c);
139+
}
140+
}
141+
AFL_VERIFY(actualChain.size());
142+
nodeChains.emplace_back(std::move(actualChain));
143+
}
144+
}
145+
if (!found) {
146+
return TConclusionStatus::Fail("not found projection node");
147+
}
148+
std::vector<std::shared_ptr<IResourceProcessor>> result;
149+
for (auto&& c : nodeChains) {
150+
for (auto&& p : c.GetNodes()) {
151+
if (p->GetProcessor()->GetProcessorType() != EProcessorType::Original) {
152+
result.emplace_back(p->GetProcessor());
153+
}
154+
}
155+
}
156+
return result;
157+
}
158+
159+
void TGraph::AddNode(const std::shared_ptr<IResourceProcessor>& processor) {
160+
auto node = std::make_shared<TGraphNode>(processor);
161+
Nodes.emplace(node->GetIdentifier(), node);
162+
for (auto&& i : processor->GetInput()) {
163+
auto nodeProducer = GetProducerVerified(i.GetColumnId());
164+
nodeProducer->AddDataTo(i.GetColumnId(), node);
165+
node->AddDataFrom(i.GetColumnId(), nodeProducer);
166+
}
167+
}
168+
169+
void TGraph::RemoveNode(TGraphNode* node) {
170+
Nodes.erase(node->GetIdentifier());
171+
}
172+
173+
void TGraph::DetachNode(TGraphNode* node) {
174+
for (auto&& i : node->GetDataFrom()) {
175+
i.second->RemoveDataTo(i.first.AnotherNodeId(node->GetIdentifier()));
176+
}
177+
for (auto&& i : node->GetDataTo()) {
178+
i.second->RemoveDataFrom(i.first.AnotherNodeId(node->GetIdentifier()));
179+
}
180+
}
181+
182+
std::vector<const TGraphNode*> TGraphNode::GetFetchingChain() const {
183+
std::vector<const TGraphNode*> result;
184+
result.emplace_back(this);
185+
ui32 frontStart = 0;
186+
ui32 frontFinish = result.size();
187+
while (frontFinish > frontStart) {
188+
for (ui32 i = frontStart; i < frontFinish; ++i) {
189+
for (auto&& input : result[i]->GetDataFrom()) {
190+
result.emplace_back(input.second);
191+
}
192+
}
193+
frontStart = frontFinish;
194+
frontFinish = result.size();
195+
}
196+
std::reverse(result.begin(), result.end());
197+
return result;
198+
}
199+
200+
} // namespace NKikimr::NArrow::NSSA::NOptimization

0 commit comments

Comments
 (0)