Skip to content

Commit 0c6af53

Browse files
fixes
1 parent ada5b3c commit 0c6af53

File tree

2 files changed

+17
-8
lines changed

2 files changed

+17
-8
lines changed

ydb/core/formats/arrow/program/graph.cpp

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,9 @@ TConclusion<bool> TGraph::OptimizeFilterWithCoalesce(
134134
DetachNode(filterArg);
135135
RemoveNode(filterNode);
136136
RemoveNode(filterArg);
137+
if (argNode->GetDataFrom().empty() && argNode->GetDataTo().empty()) {
138+
RemoveNode(argNode);
139+
}
137140
return true;
138141
}
139142

@@ -193,10 +196,8 @@ TConclusion<std::vector<std::shared_ptr<IResourceProcessor>>> TGraph::BuildChain
193196
}
194197
}
195198
std::sort(nodeChains.begin(), nodeChains.end());
196-
ui32 foundCount = 0;
197199
for (auto&& [_, i] : Nodes) {
198200
if (i->GetProcessor()->GetProcessorType() != EProcessorType::Filter && i->GetProcessor()->GetOutput().empty()) {
199-
++foundCount;
200201
std::vector<const TGraphNode*> chain = i->GetFetchingChain();
201202
std::vector<const TGraphNode*> actualChain;
202203
for (auto&& c : chain) {
@@ -208,8 +209,16 @@ TConclusion<std::vector<std::shared_ptr<IResourceProcessor>>> TGraph::BuildChain
208209
nodeChains.emplace_back(std::move(actualChain));
209210
}
210211
}
211-
if (!foundCount) {
212-
return TConclusionStatus::Fail("not found projection node");
212+
if (readyNodeIds.size() != Nodes.size()) {
213+
std::set<ui32> notCoveredIds;
214+
TStringBuilder sb;
215+
for (auto&& [id, n] : Nodes) {
216+
if (!readyNodeIds.contains(id)) {
217+
sb << n->DebugJson().GetStringRobust() << "/" << n->GetProcessor()->DebugJson().GetStringRobust() << Endl;
218+
}
219+
}
220+
return TConclusionStatus::Fail(
221+
"not found final nodes: " + ::ToString(readyNodeIds.size()) + " covered from " + ::ToString(Nodes.size()) + ": details = " + sb);
213222
}
214223
std::vector<std::shared_ptr<IResourceProcessor>> result;
215224
for (auto&& c : nodeChains) {

ydb/core/formats/arrow/ut/ut_program_step.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ size_t FilterTest(const std::vector<std::shared_ptr<arrow::Array>>& args, const
4141
TProgramChain::TBuilder builder(resolver);
4242
builder.Add(TCalculationProcessor::Build(TColumnChainInfo::BuildVector({1, 2}), TColumnChainInfo(4), std::make_shared<TSimpleFunction>(op1)).DetachResult());
4343
builder.Add(TCalculationProcessor::Build(TColumnChainInfo::BuildVector({4, 3}), TColumnChainInfo(5), std::make_shared<TSimpleFunction>(op2)).DetachResult());
44-
builder.Add(std::make_shared<TFilterProcessor>(TColumnChainInfo::BuildVector({ 5 }), true));
44+
builder.Add(std::make_shared<TFilterProcessor>(TColumnChainInfo::BuildVector({ 5 })));
4545
builder.Add(std::make_shared<TProjectionProcessor>(TColumnChainInfo::BuildVector({ 4, 5 })));
4646
auto chain = builder.Finish().DetachResult();
4747
auto resources = std::make_shared<NAccessor::TAccessorsCollection>();
@@ -61,7 +61,7 @@ size_t FilterTestUnary(std::vector<std::shared_ptr<arrow::Array>> args, const EO
6161
TProgramChain::TBuilder builder(resolver);
6262
builder.Add(TCalculationProcessor::Build(TColumnChainInfo::BuildVector({1}), TColumnChainInfo(4), std::make_shared<TSimpleFunction>(op1)).DetachResult());
6363
builder.Add(TCalculationProcessor::Build(TColumnChainInfo::BuildVector({2, 4}), TColumnChainInfo(5), std::make_shared<TSimpleFunction>(op2)).DetachResult());
64-
builder.Add(std::make_shared<TFilterProcessor>(TColumnChainInfo::BuildVector({ 5 }), true));
64+
builder.Add(std::make_shared<TFilterProcessor>(TColumnChainInfo::BuildVector({ 5 })));
6565
builder.Add(std::make_shared<TProjectionProcessor>(TColumnChainInfo::BuildVector({ 4, 5 })));
6666
auto chain = builder.Finish().DetachResult();
6767
auto resources = std::make_shared<NAccessor::TAccessorsCollection>();
@@ -488,7 +488,7 @@ Y_UNIT_TEST_SUITE(ProgramStep) {
488488
TProgramChain::TBuilder builder(resolver);
489489
builder.Add(std::make_shared<TConstProcessor>(std::make_shared<arrow::Int64Scalar>(56), 3));
490490
builder.Add(TCalculationProcessor::Build(TColumnChainInfo::BuildVector({1, 3}), TColumnChainInfo(4), std::make_shared<TSimpleFunction>(EOperation::Add)).DetachResult());
491-
builder.Add(std::make_shared<TFilterProcessor>(TColumnChainInfo::BuildVector({ 2 }), true));
491+
builder.Add(std::make_shared<TFilterProcessor>(TColumnChainInfo::BuildVector({ 2 })));
492492
builder.Add(std::make_shared<TProjectionProcessor>(TColumnChainInfo::BuildVector({ 2, 4 })));
493493
auto chain = builder.Finish().DetachResult();
494494
auto resources = std::make_shared<NAccessor::TAccessorsCollection>();
@@ -563,7 +563,7 @@ Y_UNIT_TEST_SUITE(ProgramStep) {
563563
auto andOperator2 = TCalculationProcessor::Build(TColumnChainInfo::BuildVector({1104, 1103}), TColumnChainInfo(1105), std::make_shared<TSimpleFunction>(EOperation::And)).DetachResult();
564564
andOperator2->SetYqlOperationId(0);
565565
builder.Add(andOperator2);
566-
builder.Add(std::make_shared<TFilterProcessor>(TColumnChainInfo::BuildVector({ 1105 }), true));
566+
builder.Add(std::make_shared<TFilterProcessor>(TColumnChainInfo::BuildVector({ 1105 })));
567567
builder.Add(std::make_shared<TProjectionProcessor>(TColumnChainInfo::BuildVector({ 1, 2 })));
568568
auto chain = builder.Finish().DetachResult();
569569
Cerr << chain->DebugJson() << Endl;

0 commit comments

Comments
 (0)