Skip to content

Commit 26a80d3

Browse files
authored
Merge 5c3c867 into 6c60d46
2 parents 6c60d46 + 5c3c867 commit 26a80d3

File tree

4 files changed

+114
-103
lines changed

4 files changed

+114
-103
lines changed

ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp

Lines changed: 86 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,18 @@ static const std::unordered_set<std::string> SecondLevelFilters = {
2424
"ends_with"
2525
};
2626

27+
static TMaybeNode<TExprBase> CombinePrdicateWithOlapAnd(const TVector<TExprBase>& preds, TExprContext& ctx, TPositionHandle pos) {
28+
if (preds.empty()) {
29+
return {};
30+
} else if (preds.size() == 1) {
31+
return preds[0];
32+
} else {
33+
return Build<TKqpOlapAnd>(ctx, pos)
34+
.Add(preds)
35+
.Done();
36+
}
37+
}
38+
2739
struct TFilterOpsLevels {
2840
TFilterOpsLevels(const TMaybeNode<TExprBase>& firstLevel, const TMaybeNode<TExprBase>& secondLevel)
2941
: FirstLevelOps(firstLevel)
@@ -69,6 +81,23 @@ struct TFilterOpsLevels {
6981
}
7082

7183

84+
static TFilterOpsLevels Merge(TVector<TFilterOpsLevels> predicates, TExprContext& ctx, TPositionHandle pos) {
85+
TVector<TExprBase> predicatesFirstLevel;
86+
TVector<TExprBase> predicatesSecondLevel;
87+
for (const auto& p: predicates) {
88+
if (p.FirstLevelOps.IsValid()) {
89+
predicatesFirstLevel.emplace_back(p.FirstLevelOps.Cast());
90+
}
91+
if (p.SecondLevelOps.IsValid()) {
92+
predicatesSecondLevel.emplace_back(p.SecondLevelOps.Cast());
93+
}
94+
}
95+
return {
96+
CombinePrdicateWithOlapAnd(predicatesFirstLevel, ctx, pos),
97+
CombinePrdicateWithOlapAnd(predicatesSecondLevel, ctx, pos),
98+
};
99+
}
100+
72101
TMaybeNode<TExprBase> FirstLevelOps;
73102
TMaybeNode<TExprBase> SecondLevelOps;
74103
};
@@ -674,49 +703,39 @@ TFilterOpsLevels PredicatePushdown(const TExprBase& predicate, const TExprNode&
674703
}
675704

676705
TOLAPPredicateNode WrapPredicates(const std::vector<TOLAPPredicateNode>& predicates, TExprContext& ctx, TPositionHandle pos) {
677-
if (predicates.empty()) {
678-
return {};
679-
}
706+
707+
TOLAPPredicateNode result;
708+
result.CanBePushed = true;
709+
TVector<NNodes::TExprBase> exprNodes;
680710

681-
if (const auto predicatesSize = predicates.size(); 1U == predicatesSize) {
682-
return predicates.front();
711+
for (const auto& pred : predicates) {
712+
exprNodes.emplace_back(pred.ExprNode);
713+
result.CanBePushed &= pred.CanBePushed;
714+
}
715+
if (exprNodes.empty()) {
716+
result.ExprNode = MakeBool<true>(pos, ctx);
683717
} else {
684-
TOLAPPredicateNode result;
685-
result.Children = predicates;
686-
result.CanBePushed = true;
687-
688-
TVector<NNodes::TExprBase> exprNodes;
689-
exprNodes.reserve(predicatesSize);
690-
for (const auto& pred : predicates) {
691-
exprNodes.emplace_back(pred.ExprNode);
692-
result.CanBePushed &= pred.CanBePushed;
693-
}
694-
result.ExprNode = NNodes::Build<NNodes::TCoAnd>(ctx, pos)
695-
.Add(exprNodes)
696-
.Done().Ptr();
697-
return result;
718+
result.ExprNode = CombinePrdicateWithOlapAnd(exprNodes, ctx, pos).Cast().Ptr();
698719
}
720+
return result;
699721
}
700722

701-
void SplitForPartialPushdown(const TOLAPPredicateNode& predicateTree, TOLAPPredicateNode& predicatesToPush, TOLAPPredicateNode& remainingPredicates,
702-
TExprContext& ctx, TPositionHandle pos)
723+
std::pair<std::vector<TOLAPPredicateNode>, std::vector<TOLAPPredicateNode>> SplitForPartialPushdown(const TOLAPPredicateNode& predicateTree)
703724
{
704725
if (predicateTree.CanBePushed) {
705-
predicatesToPush = predicateTree;
706-
remainingPredicates.ExprNode = MakeBool<true>(pos, ctx);
707-
return;
726+
return {{predicateTree}, {}};
708727
}
709728

710729
if (!TCoAnd::Match(predicateTree.ExprNode.Get())) {
711730
// We can partially pushdown predicates from AND operator only.
712731
// For OR operator we would need to have several read operators which is not acceptable.
713732
// TODO: Add support for NOT(op1 OR op2), because it expands to (!op1 AND !op2).
714-
remainingPredicates = predicateTree;
715-
return;
733+
return {{}, {predicateTree}};
716734
}
717735

718736
bool isFoundNotStrictOp = false;
719-
std::vector<TOLAPPredicateNode> pushable, remaining;
737+
std::vector<TOLAPPredicateNode> pushable;
738+
std::vector<TOLAPPredicateNode> remaining;
720739
for (const auto& predicate : predicateTree.Children) {
721740
if (predicate.CanBePushed && !isFoundNotStrictOp) {
722741
pushable.emplace_back(predicate);
@@ -727,8 +746,7 @@ void SplitForPartialPushdown(const TOLAPPredicateNode& predicateTree, TOLAPPredi
727746
remaining.emplace_back(predicate);
728747
}
729748
}
730-
predicatesToPush = WrapPredicates(pushable, ctx, pos);
731-
remainingPredicates = WrapPredicates(remaining, ctx, pos);
749+
return {pushable, remaining};
732750
}
733751

734752
} // anonymous namespace end
@@ -752,6 +770,7 @@ TExprBase KqpPushOlapFilter(TExprBase node, TExprContext& ctx, const TKqpOptimiz
752770
}
753771

754772
const auto& lambda = flatmap.Lambda();
773+
const auto& lambdaArg = lambda.Args().Arg(0).Ref();
755774

756775
YQL_CLOG(TRACE, ProviderKqp) << "Initial OLAP lambda: " << KqpExprToPrettyString(lambda, ctx);
757776

@@ -764,42 +783,53 @@ TExprBase KqpPushOlapFilter(TExprBase node, TExprContext& ctx, const TKqpOptimiz
764783
auto predicate = optionaIf.Predicate();
765784
auto value = optionaIf.Value();
766785

767-
if constexpr (NSsa::RuntimeVersion >= 5U) {
768-
TExprNode::TPtr afterPeephole;
769-
bool hasNonDeterministicFunctions;
770-
if (const auto status = PeepHoleOptimizeNode(optionaIf.Ptr(), afterPeephole, ctx, typesCtx, nullptr, hasNonDeterministicFunctions);
771-
status != IGraphTransformer::TStatus::Ok) {
772-
YQL_CLOG(ERROR, ProviderKqp) << "Peephole OLAP failed." << Endl << ctx.IssueManager.GetIssues().ToString();
773-
return node;
774-
}
775-
776-
const TCoIf simplified(std::move(afterPeephole));
777-
predicate = simplified.Predicate();
778-
value = simplified.ThenValue().Cast<TCoJust>().Input();
779-
}
780-
781786
TOLAPPredicateNode predicateTree;
782787
predicateTree.ExprNode = predicate.Ptr();
783-
const auto& lambdaArg = lambda.Args().Arg(0).Ref();
784-
CollectPredicates(predicate, predicateTree, &lambdaArg, read.Process().Body());
788+
CollectPredicates(predicate, predicateTree, &lambdaArg, read.Process().Body(), false);
785789
YQL_ENSURE(predicateTree.IsValid(), "Collected OLAP predicates are invalid");
786790

787-
TOLAPPredicateNode predicatesToPush, remainingPredicates;
788-
SplitForPartialPushdown(predicateTree, predicatesToPush, remainingPredicates, ctx, node.Pos());
789-
if (!predicatesToPush.IsValid()) {
790-
return node;
791+
auto [pushable, remaining] = SplitForPartialPushdown(predicateTree);
792+
TVector<TFilterOpsLevels> pushedPredicates;
793+
for (const auto& p: pushable) {
794+
pushedPredicates.emplace_back(PredicatePushdown(TExprBase(p.ExprNode), lambdaArg, ctx, node.Pos()));
791795
}
792796

793-
YQL_ENSURE(predicatesToPush.IsValid(), "Predicates to push is invalid");
794-
YQL_ENSURE(remainingPredicates.IsValid(), "Remaining predicates is invalid");
797+
if constexpr (NSsa::RuntimeVersion >= 5U) {
798+
if (!remaining.empty()) {
799+
const auto remainingPredicates = WrapPredicates(remaining, ctx, node.Pos());
800+
const auto recoveredOptinalIfForNonPushedDownPredicates = Build<TCoOptionalIf>(ctx, node.Pos())
801+
.Predicate(remainingPredicates.ExprNode)
802+
.Value(value)
803+
.Build();
804+
TExprNode::TPtr afterPeephole;
805+
bool hasNonDeterministicFunctions;
806+
if (const auto status = PeepHoleOptimizeNode(recoveredOptinalIfForNonPushedDownPredicates.Value().Ptr(), afterPeephole, ctx, typesCtx, nullptr, hasNonDeterministicFunctions);
807+
status != IGraphTransformer::TStatus::Ok) {
808+
YQL_CLOG(ERROR, ProviderKqp) << "Peephole OLAP failed." << Endl << ctx.IssueManager.GetIssues().ToString();
809+
return node;
810+
}
811+
const TCoIf simplified(std::move(afterPeephole));
812+
predicate = simplified.Predicate();
813+
value = simplified.ThenValue().Cast<TCoJust>().Input();
814+
815+
TOLAPPredicateNode predicateTree;
816+
predicateTree.ExprNode = predicate.Ptr();
817+
CollectPredicates(predicate, predicateTree, &lambdaArg, read.Process().Body(), true);
818+
YQL_ENSURE(predicateTree.IsValid(), "Collected OLAP predicates are invalid");
819+
auto [pushableWithApply, remaining2] = SplitForPartialPushdown(predicateTree);
820+
for (const auto& p: pushableWithApply) {
821+
pushedPredicates.emplace_back(PredicatePushdown(TExprBase(p.ExprNode), lambdaArg, ctx, node.Pos()));
822+
}
823+
remaining = std::move(remaining2);
824+
}
825+
}
826+
827+
const auto& pushedFilters = TFilterOpsLevels::Merge(pushedPredicates, ctx, node.Pos());
795828

796-
const auto pushedFilters = PredicatePushdown(TExprBase(predicatesToPush.ExprNode), lambdaArg, ctx, node.Pos());
829+
const auto remainingPredicates = WrapPredicates(remaining, ctx, node.Pos());
797830
// Temporary fix for https://st.yandex-team.ru/KIKIMR-22560
798831
// YQL_ENSURE(pushedFilters.IsValid(), "Pushed predicate should be always valid!");
799832

800-
if (!pushedFilters.IsValid()) {
801-
return node;
802-
}
803833

804834
TMaybeNode<TExprBase> olapFilter;
805835
if (pushedFilters.FirstLevelOps.IsValid()) {
@@ -824,7 +854,7 @@ TExprBase KqpPushOlapFilter(TExprBase node, TExprContext& ctx, const TKqpOptimiz
824854
.Build()
825855
.Done();
826856

827-
YQL_CLOG(TRACE, ProviderKqp) << "Pushed OLAP lambda: " << KqpExprToPrettyString(newProcessLambda, ctx);
857+
YQL_CLOG(ERROR, ProviderKqp) << "Pushed OLAP lambda: " << KqpExprToPrettyString(newProcessLambda, ctx);
828858

829859
#ifdef ENABLE_COLUMNS_PRUNING
830860
TMaybeNode<TCoAtomList> readColumns = BuildColumnsFromLambda(lambda, ctx, node.Pos());

ydb/core/kqp/opt/physical/predicate_collector.cpp

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ bool ExistsCanBePushed(const TCoExists& exists, const TExprNode* lambdaArg) {
336336
return IsMemberColumn(exists.Optional(), lambdaArg);
337337
}
338338

339-
void CollectChildrenPredicates(const TExprNode& opNode, TOLAPPredicateNode& predicateTree, const TExprNode* lambdaArg, const TExprBase& lambdaBody) {
339+
void CollectChildrenPredicates(const TExprNode& opNode, TOLAPPredicateNode& predicateTree, const TExprNode* lambdaArg, const TExprBase& lambdaBody, bool allowOlapApply) {
340340
predicateTree.Children.reserve(opNode.ChildrenSize());
341341
predicateTree.CanBePushed = true;
342342
for (const auto& childNodePtr: opNode.Children()) {
@@ -345,23 +345,23 @@ void CollectChildrenPredicates(const TExprNode& opNode, TOLAPPredicateNode& pred
345345
if (const auto maybeCtor = TMaybeNode<TCoDataCtor>(child.ExprNode))
346346
child.CanBePushed = IsSupportedDataType(maybeCtor.Cast());
347347
else
348-
CollectPredicates(TExprBase(child.ExprNode), child, lambdaArg, lambdaBody);
348+
CollectPredicates(TExprBase(child.ExprNode), child, lambdaArg, lambdaBody, allowOlapApply);
349349
predicateTree.Children.emplace_back(child);
350350
predicateTree.CanBePushed &= child.CanBePushed;
351351
}
352352
}
353353

354354
}
355355

356-
void CollectPredicates(const TExprBase& predicate, TOLAPPredicateNode& predicateTree, const TExprNode* lambdaArg, const TExprBase& lambdaBody) {
356+
void CollectPredicates(const TExprBase& predicate, TOLAPPredicateNode& predicateTree, const TExprNode* lambdaArg, const TExprBase& lambdaBody, bool allowOlapApply) {
357357
if constexpr (NKikimr::NSsa::RuntimeVersion >= 5U) {
358358
if (predicate.Maybe<TCoIf>() || predicate.Maybe<TCoJust>() || predicate.Maybe<TCoCoalesce>()) {
359-
return CollectChildrenPredicates(predicate.Ref(), predicateTree, lambdaArg, lambdaBody);
359+
return CollectChildrenPredicates(predicate.Ref(), predicateTree, lambdaArg, lambdaBody, allowOlapApply);
360360
}
361361
}
362362

363363
if (predicate.Maybe<TCoNot>() || predicate.Maybe<TCoAnd>() || predicate.Maybe<TCoOr>() || predicate.Maybe<TCoXor>()) {
364-
return CollectChildrenPredicates(predicate.Ref(), predicateTree, lambdaArg, lambdaBody);
364+
return CollectChildrenPredicates(predicate.Ref(), predicateTree, lambdaArg, lambdaBody, allowOlapApply);
365365
} else if (const auto maybeCoalesce = predicate.Maybe<TCoCoalesce>()) {
366366
predicateTree.CanBePushed = CoalesceCanBePushed(maybeCoalesce.Cast(), lambdaArg, lambdaBody);
367367
} else if (const auto maybeCompare = predicate.Maybe<TCoCompare>()) {
@@ -371,11 +371,7 @@ void CollectPredicates(const TExprBase& predicate, TOLAPPredicateNode& predicate
371371
} else if (const auto maybeJsonExists = predicate.Maybe<TCoJsonExists>()) {
372372
predicateTree.CanBePushed = JsonExistsCanBePushed(maybeJsonExists.Cast(), lambdaArg);
373373
} else {
374-
if constexpr (NKikimr::NSsa::RuntimeVersion >= 5U) {
375-
predicateTree.CanBePushed = AbstractTreeCanBePushed(predicate, lambdaArg);
376-
} else {
377-
predicateTree.CanBePushed = false;
378-
}
374+
predicateTree.CanBePushed = allowOlapApply ? AbstractTreeCanBePushed(predicate, lambdaArg) : false;
379375
}
380376
}
381377

ydb/core/kqp/opt/physical/predicate_collector.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,6 @@ struct TOLAPPredicateNode {
1515
}
1616
};
1717

18-
void CollectPredicates(const NNodes::TExprBase& predicate, TOLAPPredicateNode& predicateTree, const TExprNode* lambdaArg, const NNodes::TExprBase& lambdaBody);
18+
void CollectPredicates(const NNodes::TExprBase& predicate, TOLAPPredicateNode& predicateTree, const TExprNode* lambdaArg, const NNodes::TExprBase& lambdaBody, bool allowOlapApply);
1919

2020
}

0 commit comments

Comments
 (0)