Skip to content

Commit 5542d97

Browse files
Merge caf92fa into 6afe3c8
2 parents 6afe3c8 + caf92fa commit 5542d97

File tree

11 files changed

+576
-7
lines changed

11 files changed

+576
-7
lines changed

ydb/core/formats/arrow/program/abstract.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,8 @@ enum class EProcessorType {
164164
Calculation,
165165
Projection,
166166
Filter,
167-
Aggregation
167+
Aggregation,
168+
Original
168169
};
169170

170171
class TFetchingInfo {
@@ -198,8 +199,15 @@ class IResourceProcessor {
198199
virtual NJson::TJsonValue DoDebugJson() const {
199200
return NJson::JSON_MAP;
200201
}
202+
virtual ui64 DoGetWeight() const {
203+
return 0;
204+
}
201205

202206
public:
207+
ui64 GetWeight() const {
208+
return DoGetWeight();
209+
}
210+
203211
virtual std::optional<TFetchingInfo> BuildFetchTask(const ui32 columnId, const NAccessor::IChunkedArray::EType arrType, const std::shared_ptr<TAccessorsCollection>& resources) const;
204212

205213
virtual bool IsAggregation() const = 0;

ydb/core/formats/arrow/program/assign_internal.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
#include "functions.h"
44
#include "kernel_logic.h"
55

6+
#include <yql/essentials/core/arrow_kernels/request/request.h>
7+
68
namespace NKikimr::NArrow::NSSA {
79

810
class TCalculationProcessor: public IResourceProcessor {
@@ -27,6 +29,23 @@ class TCalculationProcessor: public IResourceProcessor {
2729
return Function->IsAggregation();
2830
}
2931

32+
virtual ui64 DoGetWeight() const override {
33+
if (KernelLogic) {
34+
return 0;
35+
}
36+
if (!YqlOperationId) {
37+
return 10;
38+
} else if ((NYql::TKernelRequestBuilder::EBinaryOp)*YqlOperationId == NYql::TKernelRequestBuilder::EBinaryOp::StartsWith ||
39+
(NYql::TKernelRequestBuilder::EBinaryOp)*YqlOperationId == NYql::TKernelRequestBuilder::EBinaryOp::EndsWith) {
40+
return 7;
41+
} else if ((NYql::TKernelRequestBuilder::EBinaryOp)*YqlOperationId == NYql::TKernelRequestBuilder::EBinaryOp::StringContains) {
42+
return 10;
43+
} else if ((NYql::TKernelRequestBuilder::EBinaryOp)*YqlOperationId == NYql::TKernelRequestBuilder::EBinaryOp::Equals) {
44+
return 5;
45+
}
46+
return 0;
47+
}
48+
3049
public:
3150
virtual std::optional<TFetchingInfo> BuildFetchTask(const ui32 columnId, const NAccessor::IChunkedArray::EType arrType,
3251
const std::shared_ptr<TAccessorsCollection>& resources) const override {

ydb/core/formats/arrow/program/chain.cpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "chain.h"
22
#include "collection.h"
3+
#include "graph.h"
34

45
namespace NKikimr::NArrow::NSSA {
56

@@ -39,7 +40,17 @@ class TColumnUsage {
3940
};
4041
} // namespace
4142

42-
TConclusion<TProgramChain> TProgramChain::Build(std::vector<std::shared_ptr<IResourceProcessor>>&& processors, const IColumnResolver& resolver) {
43+
TConclusion<TProgramChain> TProgramChain::Build(std::vector<std::shared_ptr<IResourceProcessor>>&& processorsExt, const IColumnResolver& resolver) {
44+
NOptimization::TGraph graph(std::move(processorsExt), resolver);
45+
auto conclusion = graph.Collapse();
46+
if (conclusion.IsFail()) {
47+
return conclusion;
48+
}
49+
auto processorsConclusion = graph.BuildChain();
50+
if (processorsConclusion.IsFail()) {
51+
return processorsConclusion;
52+
}
53+
auto processors = processorsConclusion.DetachResult();
4354
THashMap<ui32, TColumnUsage> contextUsage;
4455
ui32 stepIdx = 0;
4556
THashSet<ui32> sourceColumns;

ydb/core/formats/arrow/program/filter.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ class TFilterVisitor: public arrow::ArrayVisitor {
6060
arrow::Status VisitImpl(const TArray& array) {
6161
AFL_VERIFY(Started);
6262
for (ui32 i = 0; i < array.length(); ++i) {
63-
const bool columnValue = (bool)array.Value(i);
63+
const bool columnValue = !array.IsNull(i) && (bool)array.Value(i);
6464
const ui32 currentIdx = CursorIdx++;
6565
FiltersMerged[currentIdx] = FiltersMerged[currentIdx] && columnValue;
6666
}
Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
#include "assign_const.h"
2+
#include "assign_internal.h"
3+
#include "filter.h"
4+
#include "graph.h"
5+
#include "original.h"
6+
7+
#include <ydb/library/formats/arrow/switch/switch_type.h>
8+
9+
#include <yql/essentials/core/arrow_kernels/request/request.h>
10+
11+
namespace NKikimr::NArrow::NSSA::NOptimization {
12+
13+
TGraph::TGraph(std::vector<std::shared_ptr<IResourceProcessor>>&& processors, const IColumnResolver& resolver) {
14+
for (auto&& i : processors) {
15+
auto node = std::make_shared<TGraphNode>(i);
16+
Nodes.emplace(node->GetIdentifier(), node);
17+
for (auto&& output : i->GetOutput()) {
18+
AFL_VERIFY(Producers.emplace(output.GetColumnId(), node.get()).second);
19+
}
20+
for (auto&& input : i->GetInput()) {
21+
if (Producers.find(input.GetColumnId()) != Producers.end()) {
22+
continue;
23+
}
24+
const TString name = resolver.GetColumnName(input.GetColumnId(), false);
25+
if (!!name) {
26+
auto nodeInput = std::make_shared<TGraphNode>(
27+
std::make_shared<TOriginalColumnProcessor>(input.GetColumnId(), resolver.GetColumnName(input.GetColumnId())));
28+
Nodes.emplace(nodeInput->GetIdentifier(), nodeInput);
29+
Producers.emplace(input.GetColumnId(), nodeInput.get());
30+
}
31+
}
32+
}
33+
for (auto&& [_, i] : Nodes) {
34+
for (auto&& p : i->GetProcessor()->GetInput()) {
35+
auto node = GetProducerVerified(p.GetColumnId());
36+
node->AddDataTo(p.GetColumnId(), i);
37+
i->AddDataFrom(p.GetColumnId(), node);
38+
}
39+
}
40+
}
41+
42+
TConclusion<bool> TGraph::OptimizeFilter(TGraphNode* filterNode) {
43+
if (filterNode->GetProcessor()->GetProcessorType() != EProcessorType::Filter) {
44+
return false;
45+
}
46+
if (filterNode->GetDataFrom().size() != 1) {
47+
return TConclusionStatus::Fail("incorrect filter incoming columns (!= 1) : " + ::ToString(filterNode->GetDataFrom().size()));
48+
}
49+
auto* first = filterNode->GetDataFrom().begin()->second;
50+
if (first->GetProcessor()->GetProcessorType() != EProcessorType::Calculation) {
51+
return false;
52+
}
53+
auto calc = first->GetProcessorAs<TCalculationProcessor>();
54+
if (!calc->GetYqlOperationId()) {
55+
return false;
56+
}
57+
{
58+
auto conclusion = OptimizeFilterWithAnd(filterNode, first, calc);
59+
if (conclusion.IsFail()) {
60+
return conclusion;
61+
}
62+
if (*conclusion) {
63+
return true;
64+
}
65+
}
66+
{
67+
auto conclusion = OptimizeFilterWithCoalesce(filterNode, first, calc);
68+
if (conclusion.IsFail()) {
69+
return conclusion;
70+
}
71+
if (*conclusion) {
72+
return true;
73+
}
74+
}
75+
return false;
76+
}
77+
78+
TConclusion<bool> TGraph::OptimizeFilterWithAnd(
79+
TGraphNode* filterNode, TGraphNode* filterArg, const std::shared_ptr<TCalculationProcessor>& calc) {
80+
if ((NYql::TKernelRequestBuilder::EBinaryOp)*calc->GetYqlOperationId() != NYql::TKernelRequestBuilder::EBinaryOp::And) {
81+
return false;
82+
}
83+
if (calc->GetInput().size() < 2) {
84+
return TConclusionStatus::Fail("incorrect and operation incoming columns (< 2) : " + ::ToString(calc->GetInput().size()));
85+
}
86+
for (auto&& c : calc->GetInput()) {
87+
AddNode(std::make_shared<TFilterProcessor>(TColumnChainInfo(c)));
88+
}
89+
DetachNode(filterNode);
90+
DetachNode(filterArg);
91+
RemoveNode(filterNode);
92+
RemoveNode(filterArg);
93+
Cerr << DebugJson() << Endl;
94+
return true;
95+
}
96+
97+
TConclusion<bool> TGraph::OptimizeFilterWithCoalesce(
98+
TGraphNode* filterNode, TGraphNode* filterArg, const std::shared_ptr<TCalculationProcessor>& calc) {
99+
if ((NYql::TKernelRequestBuilder::EBinaryOp)*calc->GetYqlOperationId() != NYql::TKernelRequestBuilder::EBinaryOp::Coalesce) {
100+
return false;
101+
}
102+
if (calc->GetInput().size() != 2) {
103+
return TConclusionStatus::Fail("incorrect coalesce incoming columns (!= 2) : " + ::ToString(calc->GetInput().size()));
104+
}
105+
TGraphNode* dataNode = GetProducerVerified(calc->GetInput()[0].GetColumnId());
106+
TGraphNode* argNode = GetProducerVerified(calc->GetInput()[1].GetColumnId());
107+
if (argNode->GetProcessor()->GetProcessorType() != EProcessorType::Const) {
108+
return false;
109+
}
110+
auto scalar = argNode->GetProcessorAs<TConstProcessor>()->GetScalarConstant();
111+
if (!scalar) {
112+
return TConclusionStatus::Fail("coalesce with null arg is impossible");
113+
}
114+
if (scalar) {
115+
bool doOptimize = false;
116+
NArrow::SwitchType(scalar->type->id(), [&](const auto& type) {
117+
using TWrap = std::decay_t<decltype(type)>;
118+
using T = typename TWrap::T;
119+
using TScalar = typename arrow::TypeTraits<T>::ScalarType;
120+
auto& typedScalar = static_cast<const TScalar&>(*scalar);
121+
if constexpr (arrow::has_c_type<T>()) {
122+
doOptimize = (typedScalar.value == 0);
123+
}
124+
return true;
125+
});
126+
if (!doOptimize) {
127+
return false;
128+
}
129+
}
130+
for (auto&& c : dataNode->GetProcessor()->GetOutput()) {
131+
AddNode(std::make_shared<TFilterProcessor>(TColumnChainInfo(c.GetColumnId())));
132+
}
133+
DetachNode(filterNode);
134+
DetachNode(filterArg);
135+
RemoveNode(filterNode);
136+
RemoveNode(filterArg);
137+
return true;
138+
}
139+
140+
TConclusionStatus TGraph::Collapse() {
141+
bool hasChanges = true;
142+
// Cerr << DebugJson() << Endl;
143+
while (hasChanges) {
144+
hasChanges = false;
145+
for (auto&& [_, n] : Nodes) {
146+
{
147+
auto conclusion = OptimizeFilter(n.get());
148+
if (conclusion.IsFail()) {
149+
return conclusion;
150+
}
151+
if (*conclusion) {
152+
hasChanges = true;
153+
break;
154+
}
155+
}
156+
}
157+
}
158+
return TConclusionStatus::Success();
159+
}
160+
161+
class TFilterChain {
162+
private:
163+
YDB_READONLY_DEF(std::vector<const TGraphNode*>, Nodes);
164+
ui64 Weight = 0;
165+
166+
public:
167+
TFilterChain(const std::vector<const TGraphNode*>& nodes)
168+
: Nodes(nodes) {
169+
for (auto&& i : nodes) {
170+
Weight += i->GetProcessor()->GetWeight();
171+
}
172+
}
173+
174+
bool operator<(const TFilterChain& item) const {
175+
return Weight < item.Weight;
176+
}
177+
};
178+
179+
TConclusion<std::vector<std::shared_ptr<IResourceProcessor>>> TGraph::BuildChain() {
180+
std::vector<TFilterChain> nodeChains;
181+
THashSet<i64> readyNodeIds;
182+
for (auto&& [_, i] : Nodes) {
183+
if (i->GetProcessor()->GetProcessorType() == EProcessorType::Filter) {
184+
std::vector<const TGraphNode*> chain = i->GetFetchingChain();
185+
std::vector<const TGraphNode*> actualChain;
186+
for (auto&& c : chain) {
187+
if (readyNodeIds.emplace(c->GetIdentifier()).second) {
188+
actualChain.emplace_back(c);
189+
}
190+
}
191+
AFL_VERIFY(actualChain.size());
192+
nodeChains.emplace_back(std::move(actualChain));
193+
}
194+
}
195+
std::sort(nodeChains.begin(), nodeChains.end());
196+
bool found = false;
197+
for (auto&& [_, i] : Nodes) {
198+
if (i->GetProcessor()->GetProcessorType() == EProcessorType::Projection) {
199+
if (found) {
200+
return TConclusionStatus::Fail("detected projections duplication");
201+
}
202+
found = true;
203+
std::vector<const TGraphNode*> chain = i->GetFetchingChain();
204+
std::vector<const TGraphNode*> actualChain;
205+
for (auto&& c : chain) {
206+
if (readyNodeIds.emplace(c->GetIdentifier()).second) {
207+
actualChain.emplace_back(c);
208+
}
209+
}
210+
AFL_VERIFY(actualChain.size());
211+
nodeChains.emplace_back(std::move(actualChain));
212+
}
213+
}
214+
if (!found) {
215+
return TConclusionStatus::Fail("not found projection node");
216+
}
217+
std::vector<std::shared_ptr<IResourceProcessor>> result;
218+
for (auto&& c : nodeChains) {
219+
for (auto&& p : c.GetNodes()) {
220+
if (p->GetProcessor()->GetProcessorType() != EProcessorType::Original) {
221+
result.emplace_back(p->GetProcessor());
222+
}
223+
}
224+
}
225+
return result;
226+
}
227+
228+
void TGraph::AddNode(const std::shared_ptr<IResourceProcessor>& processor) {
229+
auto node = std::make_shared<TGraphNode>(processor);
230+
Nodes.emplace(node->GetIdentifier(), node);
231+
for (auto&& i : processor->GetInput()) {
232+
auto nodeProducer = GetProducerVerified(i.GetColumnId());
233+
nodeProducer->AddDataTo(i.GetColumnId(), node);
234+
node->AddDataFrom(i.GetColumnId(), nodeProducer);
235+
}
236+
}
237+
238+
void TGraph::RemoveNode(TGraphNode* node) {
239+
Nodes.erase(node->GetIdentifier());
240+
}
241+
242+
void TGraph::DetachNode(TGraphNode* node) {
243+
for (auto&& i : node->GetDataFrom()) {
244+
i.second->RemoveDataTo(i.first.AnotherNodeId(node->GetIdentifier()));
245+
}
246+
for (auto&& i : node->GetDataTo()) {
247+
i.second->RemoveDataFrom(i.first.AnotherNodeId(node->GetIdentifier()));
248+
}
249+
}
250+
251+
std::vector<const TGraphNode*> TGraphNode::GetFetchingChain() const {
252+
std::vector<const TGraphNode*> result;
253+
result.emplace_back(this);
254+
ui32 frontStart = 0;
255+
ui32 frontFinish = result.size();
256+
while (frontFinish > frontStart) {
257+
for (ui32 i = frontStart; i < frontFinish; ++i) {
258+
for (auto&& input : result[i]->GetDataFrom()) {
259+
result.emplace_back(input.second);
260+
}
261+
}
262+
frontStart = frontFinish;
263+
frontFinish = result.size();
264+
}
265+
std::reverse(result.begin(), result.end());
266+
return result;
267+
}
268+
269+
} // namespace NKikimr::NArrow::NSSA::NOptimization

0 commit comments

Comments
 (0)