Skip to content

Commit 356726c

Browse files
authored
Recursive CTE, runtime part (#4759)
1 parent d25e35f commit 356726c

File tree

24 files changed

+568
-2
lines changed

24 files changed

+568
-2
lines changed

ydb/library/yql/core/common_opt/yql_co_pgselect.cpp

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4154,4 +4154,123 @@ TExprNode::TPtr ExpandPgGrouping(const TExprNode::TPtr& node, TExprContext& ctx,
41544154
.Build();
41554155
}
41564156

4157+
TExprNode::TPtr ExpandPgIterate(const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx) {
4158+
const bool all = node->Content().EndsWith("All");
4159+
auto init = node->HeadPtr();
4160+
init = ctx.WrapByCallableIf(!all, "ListUniq", std::move(init));
4161+
auto lambda = node->TailPtr();
4162+
const auto limit = optCtx.Types->PgIterateLimit;
4163+
auto itemArg = ctx.NewArgument(node->Pos(), "item");
4164+
auto stateArg = ctx.NewArgument(node->Pos(), "state");
4165+
auto state = ctx.Builder(node->Pos())
4166+
.Callable("Ensure")
4167+
.Add(0, stateArg)
4168+
.Callable(1, "<")
4169+
.Add(0, itemArg)
4170+
.Callable(1, "Uint32")
4171+
.Atom(0, limit)
4172+
.Seal()
4173+
.Seal()
4174+
.Callable(2, "String")
4175+
.Atom(0, "Too many CTE iterations: " + ToString(limit))
4176+
.Seal()
4177+
.Seal()
4178+
.Build();
4179+
4180+
auto currentIter = ctx.NewCallable(node->Pos(), "Nth", { state, ctx.NewAtom(node->Pos(), 1)});
4181+
auto add = ctx.Builder(node->Pos())
4182+
.Apply(lambda)
4183+
.With(0, currentIter)
4184+
.Seal()
4185+
.Build();
4186+
4187+
auto currentRes = ctx.NewCallable(node->Pos(), "Nth", { state, ctx.NewAtom(node->Pos(), 0)});
4188+
add = ctx.WrapByCallableIf(!all, "ListUniq", std::move(add));
4189+
if (!all) {
4190+
auto id = ctx.Builder(node->Pos())
4191+
.Lambda()
4192+
.Param("x")
4193+
.Arg("x")
4194+
.Seal()
4195+
.Build();
4196+
4197+
add = ctx.Builder(node->Pos())
4198+
.Callable("Filter")
4199+
.Add(0, add)
4200+
.Lambda(1)
4201+
.Param("item")
4202+
.Callable("Not")
4203+
.Callable(0, "Contains")
4204+
.Callable(0, "ToDict")
4205+
.Add(0, currentRes)
4206+
.Add(1, id)
4207+
.Add(2, id)
4208+
.List(3)
4209+
.Atom(0, "Auto", TNodeFlags::Default)
4210+
.Atom(1, "One", TNodeFlags::Default)
4211+
.Seal()
4212+
.Seal()
4213+
.Arg(1, "item")
4214+
.Seal()
4215+
.Seal()
4216+
.Seal()
4217+
.Seal()
4218+
.Build();
4219+
}
4220+
4221+
auto res = ctx.NewCallable(node->Pos(), "Extend", { currentRes, add });
4222+
auto p = ctx.NewList(node->Pos(), { res, add });
4223+
auto foldLambdaBody = ctx.NewList(node->Pos(), { p, p});
4224+
auto foldLambda = ctx.NewLambda(node->Pos(), ctx.NewArguments(node->Pos(), { itemArg, stateArg}), std::move(foldLambdaBody));
4225+
auto foldMap = ctx.Builder(node->Pos())
4226+
.Callable("FoldMap")
4227+
.Callable(0, "ListFromRange")
4228+
.Callable(0, "Uint32")
4229+
.Atom(0, 0)
4230+
.Seal()
4231+
.Callable(1, "Uint32")
4232+
.Atom(0, limit + 1)
4233+
.Seal()
4234+
.Seal()
4235+
.List(1)
4236+
.Add(0, init)
4237+
.Add(1, init)
4238+
.Seal()
4239+
.Add(2, foldLambda)
4240+
.Seal()
4241+
.Build();
4242+
4243+
return ctx.Builder(node->Pos())
4244+
.Callable("Coalesce")
4245+
.Callable(0, "Nth")
4246+
.Callable(0, "ListLast")
4247+
.Callable(0, "ListTakeWhile")
4248+
.Callable(0, "ListExtend")
4249+
.Callable(0, "AsList")
4250+
.List(0)
4251+
.Add(0, init)
4252+
.Add(1, init)
4253+
.Seal()
4254+
.Seal()
4255+
.Add(1, foldMap)
4256+
.Seal()
4257+
.Lambda(1)
4258+
.Param("x")
4259+
.Callable("HasItems")
4260+
.Callable(0, "Nth")
4261+
.Arg(0, "x")
4262+
.Atom(1, 1)
4263+
.Seal()
4264+
.Seal()
4265+
.Seal()
4266+
.Seal()
4267+
.Seal()
4268+
.Atom(1, 0)
4269+
.Seal()
4270+
.Callable(1, "AsList")
4271+
.Seal()
4272+
.Seal()
4273+
.Build();
4274+
}
4275+
41574276
} // namespace NYql

ydb/library/yql/core/common_opt/yql_co_pgselect.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,6 @@ TExprNode::TPtr ExpandPgGroupRef(const TExprNode::TPtr& node, TExprContext& ctx,
2525

2626
TExprNode::TPtr ExpandPgGrouping(const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx);
2727

28+
TExprNode::TPtr ExpandPgIterate(const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& optCtx);
29+
2830
} // namespace NYql

ydb/library/yql/core/common_opt/yql_co_simple1.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6257,6 +6257,8 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) {
62576257
};
62586258

62596259
map["PgSelect"] = &ExpandPgSelect;
6260+
map["PgIterate"] = &ExpandPgIterate;
6261+
map["PgIterateAll"] = &ExpandPgIterate;
62606262

62616263
map["PgLike"] = &ExpandPgLike;
62626264
map["PgILike"] = &ExpandPgLike;

ydb/library/yql/core/type_ann/type_ann_columnorder.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ void AddPrefix(TVector<TString>& columnOrder, const TString& prefix) {
4141

4242
IGraphTransformer::TStatus OrderForPgSetItem(const TExprNode::TPtr& node, TExprNode::TPtr& output, TExtContext& ctx) {
4343
Y_UNUSED(output);
44+
if (node->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Unit) {
45+
return IGraphTransformer::TStatus::Ok;
46+
}
47+
4448
TVector<TString> columnOrder;
4549
auto result = GetSetting(node->Tail(), "result");
4650
auto emitPgStar = GetSetting(node->Tail(), "emit_pg_star");

ydb/library/yql/core/type_ann/type_ann_core.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12359,6 +12359,8 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
1235912359
Functions["PgGrouping"] = &PgGroupingWrapper;
1236012360
Functions["PgGroupingSet"] = &PgGroupingSetWrapper;
1236112361
Functions["PgToRecord"] = &PgToRecordWrapper;
12362+
Functions["PgIterate"] = &PgIterateWrapper;
12363+
Functions["PgIterateAll"] = &PgIterateWrapper;
1236212364
Functions["StructUnion"] = &StructMergeWrapper;
1236312365
Functions["StructIntersection"] = &StructMergeWrapper;
1236412366
Functions["StructDifference"] = &StructMergeWrapper;
@@ -12629,6 +12631,8 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
1262912631
ExtFunctions["AggregateMergeManyFinalize"] = &AggregateWrapper;
1263012632

1263112633
ColumnOrderFunctions["PgSetItem"] = &OrderForPgSetItem;
12634+
ColumnOrderFunctions["PgIterate"] = &OrderFromFirst;
12635+
ColumnOrderFunctions["PgIterateAll"] = &OrderFromFirst;
1263212636
ColumnOrderFunctions["AssumeColumnOrder"] = &OrderForAssumeColumnOrder;
1263312637

1263412638
ColumnOrderFunctions["SqlProject"] = ColumnOrderFunctions["OrderedSqlProject"] = &OrderForSqlProject;

ydb/library/yql/core/type_ann/type_ann_pg.cpp

Lines changed: 126 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3848,8 +3848,11 @@ IGraphTransformer::TStatus PgSetItemWrapper(const TExprNode::TPtr& input, TExprN
38483848
inputStructType = ctx.Expr.MakeType<TStructExprType>(items);
38493849
columnOrder = TColumnOrder({ TString(memberName) });
38503850
}
3851-
}
3852-
else {
3851+
} else {
3852+
if (p->Head().IsCallable("PgSelf")) {
3853+
input->SetTypeAnn(ctx.Expr.MakeType<TUnitExprType>());
3854+
return IGraphTransformer::TStatus::Ok;
3855+
}
38533856
if (!EnsureListType(p->Head(), ctx.Expr)) {
38543857
return IGraphTransformer::TStatus::Error;
38553858
}
@@ -4869,6 +4872,7 @@ IGraphTransformer::TStatus PgSelectWrapper(const TExprNode::TPtr& input, TExprNo
48694872
TExprNode* setItems = nullptr;
48704873
TExprNode* setOps = nullptr;
48714874
bool hasSort = false;
4875+
bool hasLimit = false;
48724876

48734877
for (ui32 pass = 0; pass < 2; ++pass) {
48744878
for (auto& option : options.Children()) {
@@ -4928,6 +4932,7 @@ IGraphTransformer::TStatus PgSelectWrapper(const TExprNode::TPtr& input, TExprNo
49284932
setItems = &option->Tail();
49294933
}
49304934
} else if (optionName == "limit" || optionName == "offset") {
4935+
hasLimit = true;
49314936
if (pass != 0) {
49324937
continue;
49334938
}
@@ -5024,6 +5029,90 @@ IGraphTransformer::TStatus PgSelectWrapper(const TExprNode::TPtr& input, TExprNo
50245029
return IGraphTransformer::TStatus::Error;
50255030
}
50265031

5032+
const bool hasRecursive = AnyOf(setItems->Children(), [](const auto& n) {
5033+
return n->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Unit;
5034+
});
5035+
5036+
if (hasRecursive) {
5037+
bool error = false;
5038+
if (setItems->ChildrenSize() != 2) {
5039+
error = true;
5040+
} else if (setItems->Child(0)->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Unit) {
5041+
error = true;
5042+
} else if (setOps->Child(2)->Content() != "union" && setOps->Child(2)->Content() != "union_all") {
5043+
error = true;
5044+
} else if (hasSort || hasLimit) {
5045+
error = true;
5046+
}
5047+
5048+
if (error) {
5049+
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
5050+
"Recursive query does not have the form non-recursive-term UNION [ALL] recursive-term"));
5051+
return IGraphTransformer::TStatus::Error;
5052+
}
5053+
5054+
auto nonRecursivePart = setItems->ChildPtr(0);
5055+
auto recursivePart = setItems->ChildPtr(1);
5056+
auto tableArg = ctx.Expr.NewArgument(input->Pos(), "table");
5057+
auto order = *ctx.Types.LookupColumnOrder(*nonRecursivePart);
5058+
auto withColumnOrder = KeepColumnOrder(order, tableArg, ctx.Expr);
5059+
auto status = OptimizeExpr(recursivePart, recursivePart, [&](const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr {
5060+
Y_UNUSED(ctx);
5061+
if (node->IsCallable("PgSelf")) {
5062+
return withColumnOrder;
5063+
}
5064+
5065+
return node;
5066+
}, ctx.Expr, TOptimizeExprSettings(&ctx.Types));
5067+
5068+
YQL_ENSURE(status.Level != IGraphTransformer::TStatus::Error);
5069+
5070+
auto lambdaBody = ctx.Expr.Builder(input->Pos())
5071+
.Callable("PgSelect")
5072+
.List(0)
5073+
.List(0)
5074+
.Atom(0, "set_items")
5075+
.List(1)
5076+
.Add(0, recursivePart)
5077+
.Seal()
5078+
.Seal()
5079+
.List(1)
5080+
.Atom(0, "set_ops")
5081+
.List(1)
5082+
.Atom(0, "push")
5083+
.Seal()
5084+
.Seal()
5085+
.Seal()
5086+
.Seal()
5087+
.Build();
5088+
5089+
auto lambda = ctx.Expr.NewLambda(input->Pos(), ctx.Expr.NewArguments(input->Pos(), { tableArg }), std::move(lambdaBody));
5090+
5091+
output = ctx.Expr.Builder(input->Pos())
5092+
.Callable(ToString("PgIterate") + (setOps->Child(2)->Content() == "union_all" ? "All" : ""))
5093+
.Callable(0, "PgSelect")
5094+
.List(0)
5095+
.List(0)
5096+
.Atom(0, "set_items")
5097+
.List(1)
5098+
.Add(0, nonRecursivePart)
5099+
.Seal()
5100+
.Seal()
5101+
.List(1)
5102+
.Atom(0, "set_ops")
5103+
.List(1)
5104+
.Atom(0, "push")
5105+
.Seal()
5106+
.Seal()
5107+
.Seal()
5108+
.Seal()
5109+
.Add(1, lambda)
5110+
.Seal()
5111+
.Build();
5112+
5113+
return IGraphTransformer::TStatus::Repeat;
5114+
}
5115+
50275116
TColumnOrder resultColumnOrder;
50285117
const TStructExprType* resultStructType = nullptr;
50295118

@@ -5815,5 +5904,40 @@ IGraphTransformer::TStatus PgToRecordWrapper(const TExprNode::TPtr& input, TExpr
58155904
return IGraphTransformer::TStatus::Ok;
58165905
}
58175906

5907+
IGraphTransformer::TStatus PgIterateWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
5908+
Y_UNUSED(output);
5909+
if (!EnsureArgsCount(*input, 2, ctx.Expr)) {
5910+
return IGraphTransformer::TStatus::Error;
5911+
}
5912+
5913+
if (!EnsureListType(input->Head(), ctx.Expr)) {
5914+
return IGraphTransformer::TStatus::Error;
5915+
}
5916+
5917+
auto& lambda = input->ChildRef(1);
5918+
const auto status = ConvertToLambda(lambda, ctx.Expr, 1);
5919+
if (status.Level != IGraphTransformer::TStatus::Ok) {
5920+
return status;
5921+
}
5922+
5923+
if (!UpdateLambdaAllArgumentsTypes(lambda, { input->Head().GetTypeAnn() }, ctx.Expr)) {
5924+
return IGraphTransformer::TStatus::Error;
5925+
}
5926+
5927+
if (!lambda->GetTypeAnn()) {
5928+
return IGraphTransformer::TStatus::Repeat;
5929+
}
5930+
5931+
if (!IsSameAnnotation(*lambda->GetTypeAnn(), *input->Head().GetTypeAnn())) {
5932+
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(lambda->Pos()), TStringBuilder() <<
5933+
"Mismatch of transform lambda return type and input type: " <<
5934+
*lambda->GetTypeAnn() << " != " << *input->Head().GetTypeAnn()));
5935+
return IGraphTransformer::TStatus::Error;
5936+
}
5937+
5938+
input->SetTypeAnn(input->Head().GetTypeAnn());
5939+
return IGraphTransformer::TStatus::Ok;
5940+
}
5941+
58185942
} // namespace NTypeAnnImpl
58195943
}

ydb/library/yql/core/type_ann/type_ann_pg.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ IGraphTransformer::TStatus PgGroupRefWrapper(const TExprNode::TPtr& input, TExpr
5757
IGraphTransformer::TStatus PgGroupingWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
5858
IGraphTransformer::TStatus PgGroupingSetWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
5959
IGraphTransformer::TStatus PgToRecordWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
60+
IGraphTransformer::TStatus PgIterateWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
6061

6162
} // namespace NTypeAnnImpl
6263
} // namespace NYql

ydb/library/yql/core/yql_type_annotation.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@ struct TTypeAnnotationContext: public TThrRefBase {
251251
ui32 EvaluateForLimit = 500;
252252
ui32 EvaluateParallelForLimit = 5000;
253253
ui32 EvaluateOrderByColumnLimit = 100;
254+
ui32 PgIterateLimit = 500;
254255
bool PullUpFlatMapOverJoin = true;
255256
bool FilterPushdownOverJoinOptionalSide = false;
256257
bool DeprecatedSQL = false;

ydb/library/yql/tests/sql/dq_file/part17/canondata/result.json

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2258,6 +2258,28 @@
22582258
}
22592259
],
22602260
"test.test[pg-wide_top_sort--Results]": [],
2261+
"test.test[pg-with_rec_distinct-default.txt-Analyze]": [
2262+
{
2263+
"checksum": "b4dd508a329723c74293d80f0278c705",
2264+
"size": 505,
2265+
"uri": "https://{canondata_backend}/1031349/f5278948946380da3d5514360765e6ba76347c46/resource.tar.gz#test.test_pg-with_rec_distinct-default.txt-Analyze_/plan.txt"
2266+
}
2267+
],
2268+
"test.test[pg-with_rec_distinct-default.txt-Debug]": [
2269+
{
2270+
"checksum": "bf75dfccb555f932da627750b495f70f",
2271+
"size": 1327,
2272+
"uri": "https://{canondata_backend}/1031349/f5278948946380da3d5514360765e6ba76347c46/resource.tar.gz#test.test_pg-with_rec_distinct-default.txt-Debug_/opt.yql_patched"
2273+
}
2274+
],
2275+
"test.test[pg-with_rec_distinct-default.txt-Plan]": [
2276+
{
2277+
"checksum": "b4dd508a329723c74293d80f0278c705",
2278+
"size": 505,
2279+
"uri": "https://{canondata_backend}/1031349/f5278948946380da3d5514360765e6ba76347c46/resource.tar.gz#test.test_pg-with_rec_distinct-default.txt-Plan_/plan.txt"
2280+
}
2281+
],
2282+
"test.test[pg-with_rec_distinct-default.txt-Results]": [],
22612283
"test.test[pg_catalog-pg_auth_members-default.txt-Analyze]": [
22622284
{
22632285
"checksum": "c1f2d837c3623c81dd596a9877913fb8",

0 commit comments

Comments
 (0)