@@ -15,33 +15,6 @@ TCoAtomList MakeColumnsList(Container rows, TExprContext& ctx, TPositionHandle p
15
15
return Build<TCoAtomList>(ctx, pos).Add (columnsVector).Done ();
16
16
}
17
17
18
- template <typename Container>
19
- TExprBase SelectFields (TExprBase node, Container fields, TExprContext& ctx, TPositionHandle pos) {
20
- TVector<TExprBase> items;
21
- for (auto && field : fields) {
22
- TString name;
23
-
24
- if constexpr (std::is_same_v<NYql::NNodes::TCoAtom&&, decltype (field)>) {
25
- name = field.Value ();
26
- } else {
27
- name = field;
28
- }
29
-
30
- auto tuple = Build<TCoNameValueTuple>(ctx, pos)
31
- .Name ().Build (field)
32
- .template Value <TCoMember>()
33
- .Struct (node)
34
- .Name ().Build (name)
35
- .Build ()
36
- .Done ();
37
-
38
- items.emplace_back (tuple);
39
- }
40
- return Build<TCoAsStruct>(ctx, pos)
41
- .Add (items)
42
- .Done ();
43
- }
44
-
45
18
TExprBase KqpBuildReturning (TExprBase node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) {
46
19
auto maybeReturning = node.Maybe <TKqlReturningList>();
47
20
if (!maybeReturning) {
@@ -51,60 +24,46 @@ TExprBase KqpBuildReturning(TExprBase node, TExprContext& ctx, const TKqpOptimiz
51
24
auto returning = maybeReturning.Cast ();
52
25
const auto & tableDesc = kqpCtx.Tables ->ExistingTable (kqpCtx.Cluster , returning.Table ().Path ());
53
26
54
- auto buildReturningRows = [&](TExprBase rows, TCoAtomList columns, TCoAtomList returningColumns) -> TExprBase {
55
- auto pos = rows.Pos ();
27
+ auto buildFromUpsert = [&](TMaybeNode<TKqlUpsertRows> upsert) -> TExprBase {
28
+ auto rows = upsert.Cast ().Input ();
29
+ auto pos = upsert.Input ().Cast ().Pos ();
56
30
57
31
TSet<TString> inputColumns;
58
32
TSet<TString> columnsToReadSet;
59
- for (auto && column : columns) {
33
+
34
+ for (auto && column : upsert.Columns ().Cast ()) {
60
35
inputColumns.insert (TString (column.Value ()));
61
36
}
62
- for (auto && column : returningColumns ) {
37
+ for (auto && column : upsert. ReturningColumns (). Cast () ) {
63
38
if (!inputColumns.contains (column) && !tableDesc.GetKeyColumnIndex (TString (column))) {
64
39
columnsToReadSet.insert (TString (column));
65
40
}
66
41
}
67
- TMaybeNode<TExprBase> input = rows;
68
42
69
- if (!columnsToReadSet.empty ()) {
70
- auto payloadSelectorArg = TCoArgument (ctx.NewArgument (pos, " payload_selector_row" ));
71
- TVector<TExprBase> payloadTuples;
72
- for (const auto & column : columns) {
73
- payloadTuples.emplace_back (
74
- Build<TCoNameValueTuple>(ctx, pos)
75
- .Name (column)
76
- .Value <TCoMember>()
77
- .Struct (payloadSelectorArg)
78
- .Name (column)
79
- .Build ()
80
- .Done ());
81
- }
43
+ TMaybeNode<TExprBase> input = upsert.Input ();
82
44
83
- auto payloadSelector = Build<TCoLambda>(ctx, pos)
84
- .Args ({payloadSelectorArg})
85
- .Body <TCoAsStruct>()
86
- .Add (payloadTuples)
87
- .Build ()
88
- .Done ();
45
+ if (!columnsToReadSet.empty ()) {
46
+ TString upsertInputName = " upsertInput" ;
47
+ TString tableInputName = " table" ;
89
48
49
+ auto payloadSelector = MakeRowsPayloadSelector (upsert.Columns ().Cast (), tableDesc, pos, ctx);
90
50
auto condenseResult = CondenseInputToDictByPk (input.Cast (), tableDesc, payloadSelector, ctx);
91
51
if (!condenseResult) {
92
52
return node;
93
53
}
94
54
95
55
auto inputDictAndKeys = PrecomputeDictAndKeys (*condenseResult, pos, ctx);
56
+
57
+ TSet<TString> columnsToLookup = columnsToReadSet;
96
58
for (auto && column : tableDesc.Metadata ->KeyColumnNames ) {
97
59
columnsToReadSet.insert (column);
98
60
}
99
- TSet<TString> columnsToLookup = columnsToReadSet;
61
+
100
62
for (auto && column : tableDesc.Metadata ->KeyColumnNames ) {
101
63
columnsToReadSet.erase (column);
102
64
}
103
65
TCoAtomList additionalColumnsToRead = MakeColumnsList (columnsToReadSet, ctx, pos);
104
66
105
- TCoArgument existingRow = Build<TCoArgument>(ctx, node.Pos ())
106
- .Name (" existing_row" )
107
- .Done ();
108
67
auto prepareUpdateStage = Build<TDqStage>(ctx, pos)
109
68
.Inputs ()
110
69
.Add (inputDictAndKeys.KeysPrecompute )
@@ -121,21 +80,27 @@ TExprBase KqpBuildReturning(TExprBase node, TExprContext& ctx, const TKqpOptimiz
121
80
.Columns (MakeColumnsList (columnsToLookup, ctx, pos))
122
81
.Build ()
123
82
.Lambda ()
124
- .Args ({existingRow})
83
+ .Args ({" existingRow" })
125
84
.Body <TCoJust>()
126
85
.Input <TCoFlattenMembers>()
127
86
.Add ()
128
87
.Name ().Build (" " )
129
88
.Value <TCoUnwrap>() // Key should always exist in the dict
130
89
.Optional <TCoLookup>()
131
90
.Collection (" dict" )
132
- .Lookup (SelectFields (existingRow, tableDesc.Metadata ->KeyColumnNames , ctx, pos))
91
+ .Lookup <TCoExtractMembers>()
92
+ .Input (" existingRow" )
93
+ .Members (MakeColumnsList (tableDesc.Metadata ->KeyColumnNames , ctx, pos))
94
+ .Build ()
133
95
.Build ()
134
96
.Build ()
135
97
.Build ()
136
98
.Add ()
137
99
.Name ().Build (" " )
138
- .Value (SelectFields (existingRow, additionalColumnsToRead, ctx, pos))
100
+ .Value <TCoExtractMembers>()
101
+ .Input (" existingRow" )
102
+ .Members (additionalColumnsToRead)
103
+ .Build ()
139
104
.Build ()
140
105
.Build ()
141
106
.Build ()
@@ -163,27 +128,20 @@ TExprBase KqpBuildReturning(TExprBase node, TExprContext& ctx, const TKqpOptimiz
163
128
for (auto item : maybeList.Cast ()) {
164
129
if (auto upsert = item.Maybe <TKqlUpsertRows>()) {
165
130
if (upsert.Cast ().Table ().Raw () == returning.Table ().Raw ()) {
166
- return buildReturningRows (upsert.Input ().Cast (), upsert.Columns ().Cast (), returning.Columns ());
167
- }
168
- }
169
- if (auto del = item.Maybe <TKqlDeleteRows>()) {
170
- if (del.Cast ().Table ().Raw () == returning.Table ().Raw ()) {
171
- return buildReturningRows (del.Input ().Cast (), MakeColumnsList (tableDesc.Metadata ->KeyColumnNames , ctx, node.Pos ()), returning.Columns ());
131
+ return buildFromUpsert (upsert);
172
132
}
173
133
}
174
134
}
175
135
}
176
136
177
137
if (auto upsert = returning.Update ().Maybe <TKqlUpsertRows>()) {
178
- return buildReturningRows (upsert.Input ().Cast (), upsert.Columns ().Cast (), returning.Columns ());
179
- }
180
- if (auto del = returning.Update ().Maybe <TKqlDeleteRows>()) {
181
- return buildReturningRows (del.Input ().Cast (), MakeColumnsList (tableDesc.Metadata ->KeyColumnNames , ctx, node.Pos ()), returning.Columns ());
138
+ return buildFromUpsert (upsert);
182
139
}
183
140
184
141
return node;
185
142
}
186
143
144
+
187
145
TExprBase KqpRewriteReturningUpsert (TExprBase node, TExprContext& ctx, const TKqpOptimizeContext&) {
188
146
auto upsert = node.Cast <TKqlUpsertRows>();
189
147
if (upsert.ReturningColumns ().Empty ()) {
@@ -206,24 +164,4 @@ TExprBase KqpRewriteReturningUpsert(TExprBase node, TExprContext& ctx, const TKq
206
164
.Done ();
207
165
}
208
166
209
- TExprBase KqpRewriteReturningDelete (TExprBase node, TExprContext& ctx, const TKqpOptimizeContext&) {
210
- auto del = node.Cast <TKqlDeleteRows>();
211
- if (del.ReturningColumns ().Empty ()) {
212
- return node;
213
- }
214
-
215
- if (!del.Input ().Maybe <TDqPrecompute>() && !del.Input ().Maybe <TDqPhyPrecompute>()) {
216
- return node;
217
- }
218
-
219
- return
220
- Build<TKqlDeleteRows>(ctx, del.Pos ())
221
- .Input <TDqPrecompute>()
222
- .Input (del.Input ())
223
- .Build ()
224
- .Table (del.Table ())
225
- .ReturningColumns <TCoAtomList>().Build ()
226
- .Done ();
227
- }
228
-
229
167
} // namespace NKikimr::NKqp::NOpt
0 commit comments