Skip to content

Commit 9a63fbd

Browse files
committed
Split pushdown for different callable kinds
1 parent 38b28a9 commit 9a63fbd

File tree

3 files changed

+69
-39
lines changed

3 files changed

+69
-39
lines changed

ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp

Lines changed: 49 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -685,45 +685,40 @@ TFilterOpsLevels PredicatePushdown(const TExprBase& predicate, const TExprNode&
685685
}
686686

687687
TOLAPPredicateNode WrapPredicates(const std::vector<TOLAPPredicateNode>& predicates, TExprContext& ctx, TPositionHandle pos) {
688+
TOLAPPredicateNode result;
688689
if (predicates.empty()) {
689-
return {};
690-
}
691-
692-
if (const auto predicatesSize = predicates.size(); 1U == predicatesSize) {
693-
return predicates.front();
690+
result.ExprNode = MakeBool<true>(pos, ctx);;
691+
} else if (predicates.size() == 1) {
692+
result = predicates.front();
694693
} else {
695694
TOLAPPredicateNode result;
696695
result.Children = predicates;
697696
result.CanBePushed = true;
698697

699698
TVector<NNodes::TExprBase> exprNodes;
700-
exprNodes.reserve(predicatesSize);
699+
exprNodes.reserve(predicates.size());
701700
for (const auto& pred : predicates) {
702701
exprNodes.emplace_back(pred.ExprNode);
703702
result.CanBePushed &= pred.CanBePushed;
704703
}
705704
result.ExprNode = NNodes::Build<NNodes::TCoAnd>(ctx, pos)
706705
.Add(exprNodes)
707706
.Done().Ptr();
708-
return result;
709707
}
708+
return result;
710709
}
711710

712-
void SplitForPartialPushdown(const TOLAPPredicateNode& predicateTree, TOLAPPredicateNode& predicatesToPush, TOLAPPredicateNode& remainingPredicates,
713-
TExprContext& ctx, TPositionHandle pos)
711+
std::pair<std::vector<TOLAPPredicateNode>, std::vector<TOLAPPredicateNode>> SplitForPartialPushdown(const TOLAPPredicateNode& predicateTree)
714712
{
715713
if (predicateTree.CanBePushed) {
716-
predicatesToPush = predicateTree;
717-
remainingPredicates.ExprNode = MakeBool<true>(pos, ctx);
718-
return;
714+
return {{predicateTree}, {}};
719715
}
720716

721717
if (!TCoAnd::Match(predicateTree.ExprNode.Get())) {
722718
// We can partially pushdown predicates from AND operator only.
723719
// For OR operator we would need to have several read operators which is not acceptable.
724720
// TODO: Add support for NOT(op1 OR op2), because it expands to (!op1 AND !op2).
725-
remainingPredicates = predicateTree;
726-
return;
721+
return {{}, {predicateTree}};
727722
}
728723

729724
bool isFoundNotStrictOp = false;
@@ -738,8 +733,7 @@ void SplitForPartialPushdown(const TOLAPPredicateNode& predicateTree, TOLAPPredi
738733
remaining.emplace_back(predicate);
739734
}
740735
}
741-
predicatesToPush = WrapPredicates(pushable, ctx, pos);
742-
remainingPredicates = WrapPredicates(remaining, ctx, pos);
736+
return {pushable, remaining};
743737
}
744738

745739
} // anonymous namespace end
@@ -763,6 +757,7 @@ TExprBase KqpPushOlapFilter(TExprBase node, TExprContext& ctx, const TKqpOptimiz
763757
}
764758

765759
const auto& lambda = flatmap.Lambda();
760+
const auto& lambdaArg = lambda.Args().Arg(0).Ref();
766761

767762
YQL_CLOG(TRACE, ProviderKqp) << "Initial OLAP lambda: " << KqpExprToPrettyString(lambda, ctx);
768763

@@ -775,35 +770,53 @@ TExprBase KqpPushOlapFilter(TExprBase node, TExprContext& ctx, const TKqpOptimiz
775770
auto predicate = optionaIf.Predicate();
776771
auto value = optionaIf.Value();
777772

778-
if constexpr (NSsa::RuntimeVersion >= 5U) {
779-
TExprNode::TPtr afterPeephole;
780-
bool hasNonDeterministicFunctions;
781-
if (const auto status = PeepHoleOptimizeNode(optionaIf.Ptr(), afterPeephole, ctx, typesCtx, nullptr, hasNonDeterministicFunctions);
782-
status != IGraphTransformer::TStatus::Ok) {
783-
YQL_CLOG(ERROR, ProviderKqp) << "Peephole OLAP failed." << Endl << ctx.IssueManager.GetIssues().ToString();
784-
return node;
785-
}
786-
787-
const TCoIf simplified(std::move(afterPeephole));
788-
predicate = simplified.Predicate();
789-
value = simplified.ThenValue().Cast<TCoJust>().Input();
790-
}
791-
792773
TOLAPPredicateNode predicateTree;
793774
predicateTree.ExprNode = predicate.Ptr();
794-
const auto& lambdaArg = lambda.Args().Arg(0).Ref();
795775
CollectPredicates(predicate, predicateTree, &lambdaArg, read.Process().Body());
796776
YQL_ENSURE(predicateTree.IsValid(), "Collected OLAP predicates are invalid");
797777

798-
TOLAPPredicateNode predicatesToPush, remainingPredicates;
799-
SplitForPartialPushdown(predicateTree, predicatesToPush, remainingPredicates, ctx, node.Pos());
800-
if (!predicatesToPush.IsValid()) {
778+
auto [pushable, remaining] = SplitForPartialPushdown(predicateTree);
779+
780+
if constexpr (NSsa::RuntimeVersion >= 5U) {
781+
if (!remaining.empty()) {
782+
const auto remainingPredicates = WrapPredicates(remaining, ctx, node.Pos());
783+
const auto recoveredOptinalIfForNonPushedDownPredicates = Build<TCoOptionalIf>(ctx, node.Pos())
784+
.Predicate<TCoLambda>()
785+
.Args(lambda.Args())
786+
.Body(remainingPredicates.ExprNode)
787+
.Build()
788+
.Build();
789+
TExprNode::TPtr afterPeephole;
790+
bool hasNonDeterministicFunctions;
791+
if (const auto status = PeepHoleOptimizeNode(optionaIf.Ptr(), afterPeephole, ctx, typesCtx, nullptr, hasNonDeterministicFunctions);
792+
status != IGraphTransformer::TStatus::Ok) {
793+
YQL_CLOG(ERROR, ProviderKqp) << "Peephole OLAP failed." << Endl << ctx.IssueManager.GetIssues().ToString();
794+
return node;
795+
}
796+
const TCoIf simplified(std::move(afterPeephole));
797+
predicate = simplified.Predicate();
798+
value = simplified.ThenValue().Cast<TCoJust>().Input();
799+
800+
TOLAPPredicateNode predicateTree;
801+
predicateTree.ExprNode = predicate.Ptr();
802+
CollectPredicates(predicate, predicateTree, &lambdaArg, read.Process().Body());
803+
YQL_ENSURE(predicateTree.IsValid(), "Collected OLAP predicates are invalid");
804+
805+
auto [pushable2, remaining2] = SplitForPartialPushdown(predicateTree);
806+
for (auto& p: pushable2) {
807+
pushable.emplace_back(std::move(p));
808+
}
809+
remaining = std::move(remaining2);
810+
}
811+
}
812+
813+
if (pushable.empty()) {
801814
return node;
802815
}
803816

804-
YQL_ENSURE(predicatesToPush.IsValid(), "Predicates to push is invalid");
805-
YQL_ENSURE(remainingPredicates.IsValid(), "Remaining predicates is invalid");
806817

818+
const auto predicatesToPush = WrapPredicates(pushable, ctx, node.Pos());
819+
const auto remainingPredicates = WrapPredicates(remaining, ctx, node.Pos());
807820
const auto pushedFilters = PredicatePushdown(TExprBase(predicatesToPush.ExprNode), lambdaArg, ctx, node.Pos());
808821
// Temporary fix for https://st.yandex-team.ru/KIKIMR-22560
809822
// YQL_ENSURE(pushedFilters.IsValid(), "Pushed predicate should be always valid!");

ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,7 @@ const TTypedColumn CompileJsonExists(const TKqpOlapJsonExists& jsonExistsCallabl
450450
jsonExistsCallable.Column(),
451451
jsonExistsCallable.Path(),
452452
type);
453+
jsonExistsFunc->SetKernelName("JsonExists");
453454
jsonExistsFunc->SetKernelIdx(idx);
454455

455456
if constexpr (NSsa::RuntimeVersion >= 4U) {

ydb/core/kqp/ut/olap/aggregations_ut.cpp

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1286,6 +1286,24 @@ Y_UNIT_TEST_SUITE(KqpOlapAggregations) {
12861286
TestTableWithNulls({testCase});
12871287
}
12881288

1289+
Y_UNIT_TEST(Json_ValueAndLike) {
1290+
TAggregationTestCase testCase;
1291+
testCase.SetQuery(R"(
1292+
SELECT id FROM `/Root/tableWithNulls`
1293+
WHERE
1294+
-- (JSON_VALUE(jsonval, "$.labels.http_status_code") = '500')
1295+
-- AND JSON_EXISTS(jsonval, "$.meta.exception.stacktrace")
1296+
--AND
1297+
(JSON_VALUE(jsonval, "$.meta.uri") like "%unified%")
1298+
)")
1299+
.AddExpectedPlanOptions("KqpOlapFilter");
1300+
// .SetExpectedReply(R"([[1;["val1"];#]])");
1301+
1302+
TestTableWithNulls({testCase});
1303+
}
1304+
1305+
1306+
12891307
Y_UNIT_TEST(Json_GetValue_Minus) {
12901308
TAggregationTestCase testCase;
12911309
testCase.SetQuery(R"(
@@ -1310,9 +1328,7 @@ Y_UNIT_TEST_SUITE(KqpOlapAggregations) {
13101328
SELECT id, JSON_VALUE(jsonval, "$.col1" RETURNING String), JSON_VALUE(jsondoc, "$.col1") FROM `/Root/tableWithNulls`
13111329
WHERE JSON_VALUE(jsonval, "$.col1" RETURNING String) = "val1" AND id = 1;
13121330
)")
1313-
#if SSA_RUNTIME_VERSION >= 5U
1314-
.AddExpectedPlanOptions("KqpOlapApply")
1315-
#elif SSA_RUNTIME_VERSION >= 3U
1331+
#if SSA_RUNTIME_VERSION >= 3U
13161332
.AddExpectedPlanOptions("KqpOlapJsonValue")
13171333
#else
13181334
.AddExpectedPlanOptions("Udf")

0 commit comments

Comments
 (0)