@@ -222,6 +222,115 @@ TStatus KqpDuplicateResults(const TExprNode::TPtr& input, TExprNode::TPtr& outpu
222
222
return TStatus::Ok;
223
223
}
224
224
225
+ template <typename TExpr>
226
+ TVector<TExpr> CollectNodes (const TExprNode::TPtr& input) {
227
+ TVector<TExpr> result;
228
+
229
+ VisitExpr (input, [&result](const TExprNode::TPtr& node) {
230
+ if (TExpr::Match (node.Get ())) {
231
+ result.emplace_back (TExpr (node));
232
+ }
233
+ return true ;
234
+ });
235
+
236
+ return result;
237
+ }
238
+
239
+ bool FindPrecomputedOutputs (TDqStageBase stage, const TParentsMap& parentsMap) {
240
+ auto outIt = parentsMap.find (stage.Raw ());
241
+ if (outIt == parentsMap.end ()) {
242
+ return false ;
243
+ }
244
+
245
+ for (auto & output : outIt->second ) {
246
+ if (TDqOutput::Match (output)) {
247
+ auto connIt = parentsMap.find (output);
248
+ if (connIt != parentsMap.end ()) {
249
+ for (auto maybeConn : connIt->second ) {
250
+ auto parentIt = parentsMap.find (maybeConn);
251
+ if (parentIt != parentsMap.end ()) {
252
+ for (auto & parent : parentIt->second ) {
253
+ if (TDqPrecompute::Match (parent) || TDqPhyPrecompute::Match (parent)) {
254
+ return true ;
255
+ }
256
+ }
257
+ }
258
+ }
259
+ }
260
+ }
261
+ }
262
+
263
+ return false ;
264
+ }
265
+
266
+
267
+ TExprBase ReplicatePrecompute (TDqStageBase stage, TExprContext& ctx, const TParentsMap& parentsMap) {
268
+ for (size_t i = 0 ; i < stage.Inputs ().Size (); ++i) {
269
+ auto input = stage.Inputs ().Item (i);
270
+ if (auto maybeConn = stage.Inputs ().Item (i).Maybe <TDqConnection>()) {
271
+ auto conn = maybeConn.Cast ();
272
+ if (conn.Maybe <TDqCnValue>() || conn.Maybe <TDqCnUnionAll>()) {
273
+ {
274
+ auto sourceStage = conn.Output ().Stage ();
275
+ if (!sourceStage.Program ().Body ().Maybe <TDqReplicate>()) {
276
+ continue ;
277
+ }
278
+
279
+ if (!FindPrecomputedOutputs (sourceStage, parentsMap)) {
280
+ continue ;
281
+ }
282
+ }
283
+
284
+ auto arg = stage.Program ().Args ().Arg (i);
285
+ auto newArg = Build<TCoArgument>(ctx, stage.Program ().Args ().Arg (i).Pos ())
286
+ .Name (" _replaced_arg" )
287
+ .Done ();
288
+
289
+ TVector<TCoArgument> newArgs;
290
+ TNodeOnNodeOwnedMap programReplaces;
291
+ for (size_t j = 0 ; j < stage.Program ().Args ().Size (); ++j) {
292
+ auto oldArg = stage.Program ().Args ().Arg (j);
293
+ newArgs.push_back (Build<TCoArgument>(ctx, stage.Program ().Args ().Arg (i).Pos ())
294
+ .Name (" _replaced_arg_" + ToString (j))
295
+ .Done ());
296
+ if (i == j) {
297
+ programReplaces[oldArg.Raw ()] = Build<TCoToFlow>(ctx, oldArg.Pos ()).Input (newArgs.back ()).Done ().Ptr ();
298
+ } else {
299
+ programReplaces[oldArg.Raw ()] = newArgs.back ().Ptr ();
300
+ }
301
+ }
302
+
303
+ return
304
+ Build<TDqStage>(ctx, stage.Pos ())
305
+ .Inputs (ctx.ReplaceNode (stage.Inputs ().Ptr (), input.Ref (), Build<TDqPhyPrecompute>(ctx, input.Pos ()).Connection (conn).Done ().Ptr ()))
306
+ .Outputs (stage.Outputs ())
307
+ .Settings (stage.Settings ())
308
+ .Program ()
309
+ .Args (newArgs)
310
+ .Body (TExprBase (ctx.ReplaceNodes (stage.Program ().Body ().Ptr (), programReplaces)))
311
+ .Build ()
312
+ .Done ();
313
+ }
314
+ }
315
+ }
316
+ return stage;
317
+ }
318
+
319
+ NYql::IGraphTransformer::TStatus ReplicatePrecomputeRule (const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
320
+ TParentsMap parents;
321
+ GatherParents (*input, parents, true );
322
+ auto stages = CollectNodes<TDqStageBase>(input);
323
+ for (auto & stage : stages) {
324
+ auto applied = ReplicatePrecompute (stage, ctx, parents);
325
+ if (applied.Raw () != stage.Raw ()) {
326
+ output = ctx.ReplaceNode (input.Get (), stage.Ref (), applied.Ptr ());
327
+ return TStatus::Repeat;
328
+ }
329
+ }
330
+ output = input;
331
+ return TStatus::Ok;
332
+ }
333
+
225
334
template <typename TFunctor>
226
335
NYql::IGraphTransformer::TStatus PerformGlobalRule (const TString& ruleName, const NYql::TExprNode::TPtr& input,
227
336
NYql::TExprNode::TPtr& output, NYql::TExprContext& ctx, TFunctor func)
@@ -251,6 +360,8 @@ TAutoPtr<IGraphTransformer> CreateKqpFinalizingOptTransformer(const TIntrusivePt
251
360
[kqpCtx](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) -> TStatus {
252
361
output = input;
253
362
363
+ PERFORM_GLOBAL_RULE (" ReplicatePrecompute" , input, output, ctx, ReplicatePrecomputeRule);
364
+
254
365
PERFORM_GLOBAL_RULE (" ReplicateMultiUsedConnection" , input, output, ctx,
255
366
[](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
256
367
YQL_ENSURE (TKqlQuery::Match (input.Get ()));
0 commit comments