@@ -15,6 +15,33 @@ TCoAtomList MakeColumnsList(Container rows, TExprContext& ctx, TPositionHandle p
15
15
return Build<TCoAtomList>(ctx, pos).Add (columnsVector).Done ();
16
16
}
17
17
18
+ template <typename Container>
19
+ TExprBase SelectFields (TExprBase node, Container fields, TExprContext& ctx, TPositionHandle pos) {
20
+ TVector<TExprBase> items;
21
+ for (auto && field : fields) {
22
+ TString name;
23
+
24
+ if constexpr (std::is_same_v<NYql::NNodes::TCoAtom&&, decltype (field)>) {
25
+ name = field.Value ();
26
+ } else {
27
+ name = field;
28
+ }
29
+
30
+ auto tuple = Build<TCoNameValueTuple>(ctx, pos)
31
+ .Name ().Build (field)
32
+ .template Value <TCoMember>()
33
+ .Struct (node)
34
+ .Name ().Build (name)
35
+ .Build ()
36
+ .Done ();
37
+
38
+ items.emplace_back (tuple);
39
+ }
40
+ return Build<TCoAsStruct>(ctx, pos)
41
+ .Add (items)
42
+ .Done ();
43
+ }
44
+
18
45
TExprBase KqpBuildReturning (TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) {
19
46
auto maybeReturning = node.Maybe <TKqlReturningList>();
20
47
if (!maybeReturning) {
@@ -24,46 +51,60 @@ TExprBase KqpBuildReturning(TExprBase node, TExprContext& ctx, const TKqpOptimiz
24
51
auto returning = maybeReturning.Cast ();
25
52
const auto & tableDesc = kqpCtx.Tables ->ExistingTable (kqpCtx.Cluster , returning.Table ().Path ());
26
53
27
- auto buildFromUpsert = [&](TMaybeNode<TKqlUpsertRows> upsert) -> TExprBase {
28
- auto rows = upsert.Cast ().Input ();
29
- auto pos = upsert.Input ().Cast ().Pos ();
54
+ auto buildReturningRows = [&](TExprBase rows, TCoAtomList columns, TCoAtomList returningColumns) -> TExprBase {
55
+ auto pos = rows.Pos ();
30
56
31
57
TSet<TString> inputColumns;
32
58
TSet<TString> columnsToReadSet;
33
-
34
- for (auto && column : upsert.Columns ().Cast ()) {
59
+ for (auto && column : columns) {
35
60
inputColumns.insert (TString (column.Value ()));
36
61
}
37
- for (auto && column : upsert. ReturningColumns (). Cast () ) {
62
+ for (auto && column : returningColumns ) {
38
63
if (!inputColumns.contains (column) && !tableDesc.GetKeyColumnIndex (TString (column))) {
39
64
columnsToReadSet.insert (TString (column));
40
65
}
41
66
}
42
-
43
- TMaybeNode<TExprBase> input = upsert.Input ();
67
+ TMaybeNode<TExprBase> input = rows;
44
68
45
69
if (!columnsToReadSet.empty ()) {
46
- TString upsertInputName = " upsertInput" ;
47
- TString tableInputName = " table" ;
70
+ auto payloadSelectorArg = TCoArgument (ctx.NewArgument (pos, " payload_selector_row" ));
71
+ TVector<TExprBase> payloadTuples;
72
+ for (const auto & column : columns) {
73
+ payloadTuples.emplace_back (
74
+ Build<TCoNameValueTuple>(ctx, pos)
75
+ .Name (column)
76
+ .Value <TCoMember>()
77
+ .Struct (payloadSelectorArg)
78
+ .Name (column)
79
+ .Build ()
80
+ .Done ());
81
+ }
82
+
83
+ auto payloadSelector = Build<TCoLambda>(ctx, pos)
84
+ .Args ({payloadSelectorArg})
85
+ .Body <TCoAsStruct>()
86
+ .Add (payloadTuples)
87
+ .Build ()
88
+ .Done ();
48
89
49
- auto payloadSelector = MakeRowsPayloadSelector (upsert.Columns ().Cast (), tableDesc, pos, ctx);
50
90
auto condenseResult = CondenseInputToDictByPk (input.Cast (), tableDesc, payloadSelector, ctx);
51
91
if (!condenseResult) {
52
92
return node;
53
93
}
54
94
55
95
auto inputDictAndKeys = PrecomputeDictAndKeys (*condenseResult, pos, ctx);
56
-
57
- TSet<TString> columnsToLookup = columnsToReadSet;
58
96
for (auto && column : tableDesc.Metadata ->KeyColumnNames ) {
59
97
columnsToReadSet.insert (column);
60
98
}
61
-
99
+ TSet<TString> columnsToLookup = columnsToReadSet;
62
100
for (auto && column : tableDesc.Metadata ->KeyColumnNames ) {
63
101
columnsToReadSet.erase (column);
64
102
}
65
103
TCoAtomList additionalColumnsToRead = MakeColumnsList (columnsToReadSet, ctx, pos);
66
104
105
+ TCoArgument existingRow = Build<TCoArgument>(ctx, node.Pos ())
106
+ .Name (" existing_row" )
107
+ .Done ();
67
108
auto prepareUpdateStage = Build<TDqStage>(ctx, pos)
68
109
.Inputs ()
69
110
.Add (inputDictAndKeys.KeysPrecompute )
@@ -80,27 +121,21 @@ TExprBase KqpBuildReturning(TExprBase node, TExprContext& ctx, const TKqpOptimiz
80
121
.Columns (MakeColumnsList (columnsToLookup, ctx, pos))
81
122
.Build ()
82
123
.Lambda ()
83
- .Args ({" existingRow" })
124
+ .Args ({existingRow})
84
125
.Body <TCoJust>()
85
126
.Input <TCoFlattenMembers>()
86
127
.Add ()
87
128
.Name ().Build (" " )
88
129
.Value <TCoUnwrap>() // Key should always exist in the dict
89
130
.Optional <TCoLookup>()
90
131
.Collection (" dict" )
91
- .Lookup <TCoExtractMembers>()
92
- .Input (" existingRow" )
93
- .Members (MakeColumnsList (tableDesc.Metadata ->KeyColumnNames , ctx, pos))
94
- .Build ()
132
+ .Lookup (SelectFields (existingRow, tableDesc.Metadata ->KeyColumnNames , ctx, pos))
95
133
.Build ()
96
134
.Build ()
97
135
.Build ()
98
136
.Add ()
99
137
.Name ().Build (" " )
100
- .Value <TCoExtractMembers>()
101
- .Input (" existingRow" )
102
- .Members (additionalColumnsToRead)
103
- .Build ()
138
+ .Value (SelectFields (existingRow, additionalColumnsToRead, ctx, pos))
104
139
.Build ()
105
140
.Build ()
106
141
.Build ()
@@ -128,20 +163,27 @@ TExprBase KqpBuildReturning(TExprBase node, TExprContext& ctx, const TKqpOptimiz
128
163
for (auto item : maybeList.Cast ()) {
129
164
if (auto upsert = item.Maybe <TKqlUpsertRows>()) {
130
165
if (upsert.Cast ().Table ().Raw () == returning.Table ().Raw ()) {
131
- return buildFromUpsert (upsert);
166
+ return buildReturningRows (upsert.Input ().Cast (), upsert.Columns ().Cast (), returning.Columns ());
167
+ }
168
+ }
169
+ if (auto del = item.Maybe <TKqlDeleteRows>()) {
170
+ if (del.Cast ().Table ().Raw () == returning.Table ().Raw ()) {
171
+ return buildReturningRows (del.Input ().Cast (), MakeColumnsList (tableDesc.Metadata ->KeyColumnNames , ctx, node.Pos ()), returning.Columns ());
132
172
}
133
173
}
134
174
}
135
175
}
136
176
137
177
if (auto upsert = returning.Update ().Maybe <TKqlUpsertRows>()) {
138
- return buildFromUpsert (upsert);
178
+ return buildReturningRows (upsert.Input ().Cast (), upsert.Columns ().Cast (), returning.Columns ());
179
+ }
180
+ if (auto del = returning.Update ().Maybe <TKqlDeleteRows>()) {
181
+ return buildReturningRows (del.Input ().Cast (), MakeColumnsList (tableDesc.Metadata ->KeyColumnNames , ctx, node.Pos ()), returning.Columns ());
139
182
}
140
183
141
184
return node;
142
185
}
143
186
144
-
145
187
TExprBase KqpRewriteReturningUpsert (TExprBase node, TExprContext& ctx, const TKqpOptimizeContext&) {
146
188
auto upsert = node.Cast <TKqlUpsertRows>();
147
189
if (upsert.ReturningColumns ().Empty ()) {
@@ -164,4 +206,24 @@ TExprBase KqpRewriteReturningUpsert(TExprBase node, TExprContext& ctx, const TKq
164
206
.Done ();
165
207
}
166
208
209
+ TExprBase KqpRewriteReturningDelete (TExprBase node, TExprContext& ctx, const TKqpOptimizeContext&) {
210
+ auto del = node.Cast <TKqlDeleteRows>();
211
+ if (del.ReturningColumns ().Empty ()) {
212
+ return node;
213
+ }
214
+
215
+ if (!del.Input ().Maybe <TDqPrecompute>() && !del.Input ().Maybe <TDqPhyPrecompute>()) {
216
+ return node;
217
+ }
218
+
219
+ return
220
+ Build<TKqlDeleteRows>(ctx, del.Pos ())
221
+ .Input <TDqPrecompute>()
222
+ .Input (del.Input ())
223
+ .Build ()
224
+ .Table (del.Table ())
225
+ .ReturningColumns <TCoAtomList>().Build ()
226
+ .Done ();
227
+ }
228
+
167
229
} // namespace NKikimr::NKqp::NOpt
0 commit comments