Skip to content

Commit 7d6178d

Browse files
authored
Implement returning list for KiWrite/UpdateTable KIKIMR-18531
1 parent 15d4982 commit 7d6178d

21 files changed

+537
-133
lines changed

ydb/core/kqp/expr_nodes/kqp_expr_nodes.json

+19-5
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,16 @@
224224
"Match": {"Type": "CallableBase"},
225225
"Builder": {"Generate": "None"}
226226
},
227+
{
228+
"Name": "TKqlReturningList",
229+
"Base": "TCallable",
230+
"Match": {"Type": "Callable", "Name": "KqlReturningList"},
231+
"Children": [
232+
{"Index": 0, "Name": "Update", "Type": "TExprBase"},
233+
{"Index": 1, "Name": "Columns", "Type": "TCoAtomList"},
234+
{"Index": 2, "Name": "Table", "Type": "TKqpTable"}
235+
]
236+
},
227237
{
228238
"Name": "TKqlTableEffect",
229239
"Base": "TKqlEffectBase",
@@ -247,15 +257,17 @@
247257
"Base": "TKqlUpsertRowsBase",
248258
"Match": {"Type": "Callable", "Name": "KqlUpsertRows"},
249259
"Children": [
250-
{"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true}
260+
{"Index": 3, "Name": "ReturningColumns", "Type": "TCoAtomList"},
261+
{"Index": 4, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true}
251262
]
252263
},
253264
{
254265
"Name": "TKqlUpsertRowsIndex",
255266
"Base": "TKqlUpsertRowsBase",
256267
"Match": {"Type": "Callable", "Name": "KqlUpsertRowsIndex"},
257268
"Children": [
258-
{"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true}
269+
{"Index": 3, "Name": "ReturningColumns", "Type": "TCoAtomList"},
270+
{"Index": 4, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true}
259271
]
260272
},
261273
{
@@ -282,7 +294,8 @@
282294
"Children": [
283295
{"Index": 1, "Name": "Input", "Type": "TExprBase"},
284296
{"Index": 2, "Name": "Columns", "Type": "TCoAtomList"},
285-
{"Index": 3, "Name": "OnConflict", "Type": "TCoAtom", "Description": "'abort or 'revert"}
297+
{"Index": 3, "Name": "OnConflict", "Type": "TCoAtom", "Description": "'abort or 'revert"},
298+
{"Index": 4, "Name": "ReturningColumns", "Type": "TCoAtomList"}
286299
]
287300
},
288301
{
@@ -291,7 +304,8 @@
291304
"Match": {"Type": "CallableBase"},
292305
"Children": [
293306
{"Index": 1, "Name": "Input", "Type": "TExprBase"},
294-
{"Index": 2, "Name": "Columns", "Type": "TCoAtomList"}
307+
{"Index": 2, "Name": "Columns", "Type": "TCoAtomList"},
308+
{"Index": 3, "Name": "ReturningColumns", "Type": "TCoAtomList"}
295309
]
296310
},
297311
{
@@ -304,7 +318,7 @@
304318
"Base": "TKqlUpdateRowsBase",
305319
"Match": {"Type": "Callable", "Name": "TKqlUpdateRowsIndex"},
306320
"Children": [
307-
{"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true}
321+
{"Index": 4, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true}
308322
]
309323
},
310324
{

ydb/core/kqp/host/kqp_type_ann.cpp

+34-2
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,34 @@ bool CalcKeyColumnsCount(TExprContext& ctx, const TPositionHandle pos, const TSt
196196
return true;
197197
}
198198

199+
TStatus AnnotateReturningList(
200+
const TExprNode::TPtr& node, TExprContext& ctx, const TString& cluster,
201+
const TKikimrTablesData& tablesData, bool withSystemColumns)
202+
{
203+
if (!EnsureArgsCount(*node, 3, ctx)) {
204+
return TStatus::Error;
205+
}
206+
207+
const auto& columns = node->ChildPtr(TKqlReturningList::idx_Columns);
208+
if (!EnsureTupleOfAtoms(*columns, ctx)) {
209+
return TStatus::Error;
210+
}
211+
212+
auto table = ResolveTable(node->Child(TKqlReturningList::idx_Table), ctx, cluster, tablesData);
213+
if (!table.second) {
214+
return TStatus::Error;
215+
}
216+
217+
auto rowType = GetReadTableRowType(ctx, tablesData, cluster, table.first, TCoAtomList(columns), withSystemColumns);
218+
if (!rowType) {
219+
return TStatus::Error;
220+
}
221+
222+
node->SetTypeAnn(ctx.MakeType<TListExprType>(rowType));
223+
224+
return TStatus::Ok;
225+
}
226+
199227
TStatus AnnotateReadTable(const TExprNode::TPtr& node, TExprContext& ctx, const TString& cluster,
200228
const TKikimrTablesData& tablesData, bool withSystemColumns)
201229
{
@@ -613,7 +641,7 @@ TStatus AnnotateUpsertRows(const TExprNode::TPtr& node, TExprContext& ctx, const
613641
TStatus AnnotateInsertRows(const TExprNode::TPtr& node, TExprContext& ctx, const TString& cluster,
614642
const TKikimrTablesData& tablesData)
615643
{
616-
if (!EnsureArgsCount(*node, 4, ctx)) {
644+
if (!EnsureArgsCount(*node, 5, ctx)) {
617645
return TStatus::Error;
618646
}
619647

@@ -694,7 +722,7 @@ TStatus AnnotateUpdateRows(const TExprNode::TPtr& node, TExprContext& ctx, const
694722
return TStatus::Error;
695723
}
696724

697-
if (!EnsureMaxArgsCount(*node, 4, ctx)) {
725+
if (!EnsureMaxArgsCount(*node, 5, ctx)) {
698726
return TStatus::Error;
699727
}
700728

@@ -1906,6 +1934,10 @@ TAutoPtr<IGraphTransformer> CreateKqpTypeAnnotationTransformer(const TString& cl
19061934
return AnnotateKqpSinkEffect(input, ctx);
19071935
}
19081936

1937+
if (TKqlReturningList::Match(input.Get())) {
1938+
return AnnotateReturningList(input, ctx, cluster, *tablesData, config->SystemColumnsEnabled());
1939+
}
1940+
19091941
return dqTransformer->Transform(input, output, ctx);
19101942
});
19111943
}

ydb/core/kqp/opt/kqp_opt_effects.cpp

+32-25
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,22 @@ bool BuildEffects(TPositionHandle pos, const TVector<TExprBase>& effects,
468468
return true;
469469
}
470470

471+
template<typename Visitor>
472+
bool ExploreEffectLists(TExprBase expr, Visitor visitor) {
473+
if (auto list = expr.Maybe<TExprList>()) {
474+
for (auto&& item : list.Cast()) {
475+
if (!ExploreEffectLists(item, visitor)) {
476+
return false;
477+
}
478+
}
479+
} else {
480+
if (!visitor(expr)) {
481+
return false;
482+
}
483+
}
484+
return true;
485+
}
486+
471487
template <bool GroupEffectsByTable>
472488
TMaybeNode<TKqlQuery> BuildEffects(const TKqlQuery& query, TExprContext& ctx,
473489
const TKqpOptimizeContext& kqpCtx)
@@ -476,21 +492,16 @@ TMaybeNode<TKqlQuery> BuildEffects(const TKqlQuery& query, TExprContext& ctx,
476492

477493
if constexpr (GroupEffectsByTable) {
478494
TMap<TStringBuf, TVector<TExprBase>> tableEffectsMap;
479-
for (const auto& maybeEffect : query.Effects()) {
480-
if (const auto maybeList = maybeEffect.Maybe<TExprList>()) {
481-
for (const auto effect : maybeList.Cast()) {
482-
YQL_ENSURE(effect.Maybe<TKqlTableEffect>());
483-
auto tableEffect = effect.Cast<TKqlTableEffect>();
495+
ExploreEffectLists(
496+
query.Effects(),
497+
[&](TExprBase effect) {
498+
auto tableEffect = effect.Maybe<TKqlTableEffect>();
499+
YQL_ENSURE(tableEffect);
484500

485-
tableEffectsMap[tableEffect.Table().Path()].push_back(effect);
486-
}
487-
} else {
488-
YQL_ENSURE(maybeEffect.Maybe<TKqlTableEffect>());
489-
auto tableEffect = maybeEffect.Cast<TKqlTableEffect>();
501+
tableEffectsMap[tableEffect.Cast().Table().Path()].push_back(effect);
490502

491-
tableEffectsMap[tableEffect.Table().Path()].push_back(maybeEffect);
492-
}
493-
}
503+
return true;
504+
});
494505

495506
for (const auto& pair: tableEffectsMap) {
496507
if (!BuildEffects(query.Pos(), pair.second, ctx, kqpCtx, builtEffects)) {
@@ -500,18 +511,14 @@ TMaybeNode<TKqlQuery> BuildEffects(const TKqlQuery& query, TExprContext& ctx,
500511
} else {
501512
builtEffects.reserve(query.Effects().Size() * 2);
502513

503-
for (const auto& maybeEffect : query.Effects()) {
504-
if (const auto maybeList = maybeEffect.Maybe<TExprList>()) {
505-
for (const auto effect : maybeList.Cast()) {
506-
if (!BuildEffects(query.Pos(), {effect}, ctx, kqpCtx, builtEffects)) {
507-
return {};
508-
}
509-
}
510-
} else {
511-
if (!BuildEffects(query.Pos(), {maybeEffect}, ctx, kqpCtx, builtEffects)) {
512-
return {};
513-
}
514-
}
514+
auto result = ExploreEffectLists(
515+
query.Effects(),
516+
[&](TExprBase effect) {
517+
return BuildEffects(query.Pos(), {effect}, ctx, kqpCtx, builtEffects);
518+
});
519+
520+
if (!result) {
521+
return {};
515522
}
516523
}
517524

0 commit comments

Comments
 (0)