Skip to content

Commit 542e1d3

Browse files
committed
Do not use OlapApply for known functions pushdown
1 parent f43e9dd commit 542e1d3

File tree

4 files changed

+113
-103
lines changed

4 files changed

+113
-103
lines changed

ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp

Lines changed: 85 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,18 @@ static const std::unordered_set<std::string> SecondLevelFilters = {
2424
"ends_with"
2525
};
2626

27+
static TMaybeNode<TExprBase> CombinePrdicateWithOlapAnd(const TVector<TExprBase>& preds, TExprContext& ctx, TPositionHandle pos) {
28+
if (preds.empty()) {
29+
return {};
30+
} else if (preds.size() == 1) {
31+
return preds[0];
32+
} else {
33+
return Build<TKqpOlapAnd>(ctx, pos)
34+
.Add(preds)
35+
.Done();
36+
}
37+
}
38+
2739
struct TFilterOpsLevels {
2840
TFilterOpsLevels(const TMaybeNode<TExprBase>& firstLevel, const TMaybeNode<TExprBase>& secondLevel)
2941
: FirstLevelOps(firstLevel)
@@ -69,6 +81,23 @@ struct TFilterOpsLevels {
6981
}
7082

7183

84+
static TFilterOpsLevels Merge(TVector<TFilterOpsLevels> predicates, TExprContext& ctx, TPositionHandle pos) {
85+
TVector<TExprBase> predicatesFirstLevel;
86+
TVector<TExprBase> predicatesSecondLevel;
87+
for (const auto& p: predicates) {
88+
if (p.FirstLevelOps.IsValid()) {
89+
predicatesFirstLevel.emplace_back(p.FirstLevelOps.Cast());
90+
}
91+
if (p.SecondLevelOps.IsValid()) {
92+
predicatesSecondLevel.emplace_back(p.SecondLevelOps.Cast());
93+
}
94+
}
95+
return {
96+
CombinePrdicateWithOlapAnd(predicatesFirstLevel, ctx, pos),
97+
CombinePrdicateWithOlapAnd(predicatesSecondLevel, ctx, pos),
98+
};
99+
}
100+
72101
TMaybeNode<TExprBase> FirstLevelOps;
73102
TMaybeNode<TExprBase> SecondLevelOps;
74103
};
@@ -674,49 +703,38 @@ TFilterOpsLevels PredicatePushdown(const TExprBase& predicate, const TExprNode&
674703
}
675704

676705
TOLAPPredicateNode WrapPredicates(const std::vector<TOLAPPredicateNode>& predicates, TExprContext& ctx, TPositionHandle pos) {
677-
if (predicates.empty()) {
678-
return {};
679-
}
680-
681-
if (const auto predicatesSize = predicates.size(); 1U == predicatesSize) {
682-
return predicates.front();
706+
707+
TOLAPPredicateNode result;
708+
result.CanBePushed = true;
709+
TVector<NNodes::TExprBase> exprNodes;
710+
for (const auto& pred : predicates) {
711+
exprNodes.emplace_back(pred.ExprNode);
712+
result.CanBePushed &= pred.CanBePushed;
713+
}
714+
if (exprNodes.empty()) {
715+
result.ExprNode = MakeBool<true>(pos, ctx);
683716
} else {
684-
TOLAPPredicateNode result;
685-
result.Children = predicates;
686-
result.CanBePushed = true;
687-
688-
TVector<NNodes::TExprBase> exprNodes;
689-
exprNodes.reserve(predicatesSize);
690-
for (const auto& pred : predicates) {
691-
exprNodes.emplace_back(pred.ExprNode);
692-
result.CanBePushed &= pred.CanBePushed;
693-
}
694-
result.ExprNode = NNodes::Build<NNodes::TCoAnd>(ctx, pos)
695-
.Add(exprNodes)
696-
.Done().Ptr();
697-
return result;
717+
result.ExprNode = CombinePrdicateWithOlapAnd(exprNodes, ctx, pos).Cast().Ptr();
698718
}
719+
return result;
699720
}
700721

701-
void SplitForPartialPushdown(const TOLAPPredicateNode& predicateTree, TOLAPPredicateNode& predicatesToPush, TOLAPPredicateNode& remainingPredicates,
702-
TExprContext& ctx, TPositionHandle pos)
722+
std::pair<std::vector<TOLAPPredicateNode>, std::vector<TOLAPPredicateNode>> SplitForPartialPushdown(const TOLAPPredicateNode& predicateTree)
703723
{
704724
if (predicateTree.CanBePushed) {
705-
predicatesToPush = predicateTree;
706-
remainingPredicates.ExprNode = MakeBool<true>(pos, ctx);
707-
return;
725+
return {{predicateTree}, {}};
708726
}
709727

710728
if (!TCoAnd::Match(predicateTree.ExprNode.Get())) {
711729
// We can partially pushdown predicates from AND operator only.
712730
// For OR operator we would need to have several read operators which is not acceptable.
713731
// TODO: Add support for NOT(op1 OR op2), because it expands to (!op1 AND !op2).
714-
remainingPredicates = predicateTree;
715-
return;
732+
return {{}, {predicateTree}};
716733
}
717734

718735
bool isFoundNotStrictOp = false;
719-
std::vector<TOLAPPredicateNode> pushable, remaining;
736+
std::vector<TOLAPPredicateNode> pushable;
737+
std::vector<TOLAPPredicateNode> remaining;
720738
for (const auto& predicate : predicateTree.Children) {
721739
if (predicate.CanBePushed && !isFoundNotStrictOp) {
722740
pushable.emplace_back(predicate);
@@ -727,8 +745,7 @@ void SplitForPartialPushdown(const TOLAPPredicateNode& predicateTree, TOLAPPredi
727745
remaining.emplace_back(predicate);
728746
}
729747
}
730-
predicatesToPush = WrapPredicates(pushable, ctx, pos);
731-
remainingPredicates = WrapPredicates(remaining, ctx, pos);
748+
return {pushable, remaining};
732749
}
733750

734751
} // anonymous namespace end
@@ -752,6 +769,7 @@ TExprBase KqpPushOlapFilter(TExprBase node, TExprContext& ctx, const TKqpOptimiz
752769
}
753770

754771
const auto& lambda = flatmap.Lambda();
772+
const auto& lambdaArg = lambda.Args().Arg(0).Ref();
755773

756774
YQL_CLOG(TRACE, ProviderKqp) << "Initial OLAP lambda: " << KqpExprToPrettyString(lambda, ctx);
757775

@@ -764,42 +782,53 @@ TExprBase KqpPushOlapFilter(TExprBase node, TExprContext& ctx, const TKqpOptimiz
764782
auto predicate = optionaIf.Predicate();
765783
auto value = optionaIf.Value();
766784

767-
if constexpr (NSsa::RuntimeVersion >= 5U) {
768-
TExprNode::TPtr afterPeephole;
769-
bool hasNonDeterministicFunctions;
770-
if (const auto status = PeepHoleOptimizeNode(optionaIf.Ptr(), afterPeephole, ctx, typesCtx, nullptr, hasNonDeterministicFunctions);
771-
status != IGraphTransformer::TStatus::Ok) {
772-
YQL_CLOG(ERROR, ProviderKqp) << "Peephole OLAP failed." << Endl << ctx.IssueManager.GetIssues().ToString();
773-
return node;
774-
}
775-
776-
const TCoIf simplified(std::move(afterPeephole));
777-
predicate = simplified.Predicate();
778-
value = simplified.ThenValue().Cast<TCoJust>().Input();
779-
}
780-
781785
TOLAPPredicateNode predicateTree;
782786
predicateTree.ExprNode = predicate.Ptr();
783-
const auto& lambdaArg = lambda.Args().Arg(0).Ref();
784-
CollectPredicates(predicate, predicateTree, &lambdaArg, read.Process().Body());
787+
CollectPredicates(predicate, predicateTree, &lambdaArg, read.Process().Body(), false);
785788
YQL_ENSURE(predicateTree.IsValid(), "Collected OLAP predicates are invalid");
786789

787-
TOLAPPredicateNode predicatesToPush, remainingPredicates;
788-
SplitForPartialPushdown(predicateTree, predicatesToPush, remainingPredicates, ctx, node.Pos());
789-
if (!predicatesToPush.IsValid()) {
790-
return node;
790+
auto [pushable, remaining] = SplitForPartialPushdown(predicateTree);
791+
TVector<TFilterOpsLevels> pushedPredicates;
792+
for (const auto& p: pushable) {
793+
pushedPredicates.emplace_back(PredicatePushdown(TExprBase(p.ExprNode), lambdaArg, ctx, node.Pos()));
791794
}
792795

793-
YQL_ENSURE(predicatesToPush.IsValid(), "Predicates to push is invalid");
794-
YQL_ENSURE(remainingPredicates.IsValid(), "Remaining predicates is invalid");
796+
if constexpr (NSsa::RuntimeVersion >= 5U) {
797+
if (!remaining.empty()) {
798+
const auto remainingPredicates = WrapPredicates(remaining, ctx, node.Pos());
799+
const auto recoveredOptinalIfForNonPushedDownPredicates = Build<TCoOptionalIf>(ctx, node.Pos())
800+
.Predicate(remainingPredicates.ExprNode)
801+
.Value(value)
802+
.Build();
803+
TExprNode::TPtr afterPeephole;
804+
bool hasNonDeterministicFunctions;
805+
if (const auto status = PeepHoleOptimizeNode(recoveredOptinalIfForNonPushedDownPredicates.Value().Ptr(), afterPeephole, ctx, typesCtx, nullptr, hasNonDeterministicFunctions);
806+
status != IGraphTransformer::TStatus::Ok) {
807+
YQL_CLOG(ERROR, ProviderKqp) << "Peephole OLAP failed." << Endl << ctx.IssueManager.GetIssues().ToString();
808+
return node;
809+
}
810+
const TCoIf simplified(std::move(afterPeephole));
811+
predicate = simplified.Predicate();
812+
value = simplified.ThenValue().Cast<TCoJust>().Input();
813+
814+
TOLAPPredicateNode predicateTree;
815+
predicateTree.ExprNode = predicate.Ptr();
816+
CollectPredicates(predicate, predicateTree, &lambdaArg, read.Process().Body(), true);
817+
YQL_ENSURE(predicateTree.IsValid(), "Collected OLAP predicates are invalid");
818+
auto [pushableWithApply, remaining2] = SplitForPartialPushdown(predicateTree);
819+
for (const auto& p: pushableWithApply) {
820+
pushedPredicates.emplace_back(PredicatePushdown(TExprBase(p.ExprNode), lambdaArg, ctx, node.Pos()));
821+
}
822+
remaining = std::move(remaining2);
823+
}
824+
}
825+
826+
const auto& pushedFilters = TFilterOpsLevels::Merge(pushedPredicates, ctx, node.Pos());
795827

796-
const auto pushedFilters = PredicatePushdown(TExprBase(predicatesToPush.ExprNode), lambdaArg, ctx, node.Pos());
828+
const auto remainingPredicates = WrapPredicates(remaining, ctx, node.Pos());
797829
// Temporary fix for https://st.yandex-team.ru/KIKIMR-22560
798830
// YQL_ENSURE(pushedFilters.IsValid(), "Pushed predicate should be always valid!");
799831

800-
if (!pushedFilters.IsValid()) {
801-
return node;
802-
}
803832

804833
TMaybeNode<TExprBase> olapFilter;
805834
if (pushedFilters.FirstLevelOps.IsValid()) {

ydb/core/kqp/opt/physical/predicate_collector.cpp

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ bool ExistsCanBePushed(const TCoExists& exists, const TExprNode* lambdaArg) {
336336
return IsMemberColumn(exists.Optional(), lambdaArg);
337337
}
338338

339-
void CollectChildrenPredicates(const TExprNode& opNode, TOLAPPredicateNode& predicateTree, const TExprNode* lambdaArg, const TExprBase& lambdaBody) {
339+
void CollectChildrenPredicates(const TExprNode& opNode, TOLAPPredicateNode& predicateTree, const TExprNode* lambdaArg, const TExprBase& lambdaBody, bool allowOlapApply) {
340340
predicateTree.Children.reserve(opNode.ChildrenSize());
341341
predicateTree.CanBePushed = true;
342342
for (const auto& childNodePtr: opNode.Children()) {
@@ -345,23 +345,23 @@ void CollectChildrenPredicates(const TExprNode& opNode, TOLAPPredicateNode& pred
345345
if (const auto maybeCtor = TMaybeNode<TCoDataCtor>(child.ExprNode))
346346
child.CanBePushed = IsSupportedDataType(maybeCtor.Cast());
347347
else
348-
CollectPredicates(TExprBase(child.ExprNode), child, lambdaArg, lambdaBody);
348+
CollectPredicates(TExprBase(child.ExprNode), child, lambdaArg, lambdaBody, allowOlapApply);
349349
predicateTree.Children.emplace_back(child);
350350
predicateTree.CanBePushed &= child.CanBePushed;
351351
}
352352
}
353353

354354
}
355355

356-
void CollectPredicates(const TExprBase& predicate, TOLAPPredicateNode& predicateTree, const TExprNode* lambdaArg, const TExprBase& lambdaBody) {
356+
void CollectPredicates(const TExprBase& predicate, TOLAPPredicateNode& predicateTree, const TExprNode* lambdaArg, const TExprBase& lambdaBody, bool allowOlapApply) {
357357
if constexpr (NKikimr::NSsa::RuntimeVersion >= 5U) {
358358
if (predicate.Maybe<TCoIf>() || predicate.Maybe<TCoJust>() || predicate.Maybe<TCoCoalesce>()) {
359-
return CollectChildrenPredicates(predicate.Ref(), predicateTree, lambdaArg, lambdaBody);
359+
return CollectChildrenPredicates(predicate.Ref(), predicateTree, lambdaArg, lambdaBody, allowOlapApply);
360360
}
361361
}
362362

363363
if (predicate.Maybe<TCoNot>() || predicate.Maybe<TCoAnd>() || predicate.Maybe<TCoOr>() || predicate.Maybe<TCoXor>()) {
364-
return CollectChildrenPredicates(predicate.Ref(), predicateTree, lambdaArg, lambdaBody);
364+
return CollectChildrenPredicates(predicate.Ref(), predicateTree, lambdaArg, lambdaBody, allowOlapApply);
365365
} else if (const auto maybeCoalesce = predicate.Maybe<TCoCoalesce>()) {
366366
predicateTree.CanBePushed = CoalesceCanBePushed(maybeCoalesce.Cast(), lambdaArg, lambdaBody);
367367
} else if (const auto maybeCompare = predicate.Maybe<TCoCompare>()) {
@@ -371,11 +371,7 @@ void CollectPredicates(const TExprBase& predicate, TOLAPPredicateNode& predicate
371371
} else if (const auto maybeJsonExists = predicate.Maybe<TCoJsonExists>()) {
372372
predicateTree.CanBePushed = JsonExistsCanBePushed(maybeJsonExists.Cast(), lambdaArg);
373373
} else {
374-
if constexpr (NKikimr::NSsa::RuntimeVersion >= 5U) {
375-
predicateTree.CanBePushed = AbstractTreeCanBePushed(predicate, lambdaArg);
376-
} else {
377-
predicateTree.CanBePushed = false;
378-
}
374+
predicateTree.CanBePushed = allowOlapApply ? AbstractTreeCanBePushed(predicate, lambdaArg) : false;
379375
}
380376
}
381377

ydb/core/kqp/opt/physical/predicate_collector.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,6 @@ struct TOLAPPredicateNode {
1515
}
1616
};
1717

18-
void CollectPredicates(const NNodes::TExprBase& predicate, TOLAPPredicateNode& predicateTree, const TExprNode* lambdaArg, const NNodes::TExprBase& lambdaBody);
18+
void CollectPredicates(const NNodes::TExprBase& predicate, TOLAPPredicateNode& predicateTree, const TExprNode* lambdaArg, const NNodes::TExprBase& lambdaBody, bool allowOlapApply);
1919

2020
}

0 commit comments

Comments
 (0)