@@ -215,6 +215,39 @@ struct TKqpPeepholePipelineFinalConfigurator : IPipelineConfigurator {
215
215
const TKikimrConfiguration::TPtr Config;
216
216
};
217
217
218
+ // Sort stages in topological order by their inputs, so that we optimize the ones without inputs first.
219
+ TVector<TDqPhyStage> TopSortStages (const TDqPhyStageList& stages) {
220
+ TVector<TDqPhyStage> topSortedStages;
221
+ topSortedStages.reserve (stages.Size ());
222
+ std::function<void (const TDqPhyStage&)> topSort;
223
+ THashSet<ui64 /* uniqueId*/ > visitedStages;
224
+
225
+ // Assume there is no cycles.
226
+ topSort = [&](const TDqPhyStage& stage) {
227
+ if (visitedStages.contains (stage.Ref ().UniqueId ())) {
228
+ return ;
229
+ }
230
+
231
+ for (const auto & input : stage.Inputs ()) {
232
+ if (auto connection = input.Maybe <TDqConnection>()) {
233
+ // NOTE: somehow `Output()` is actually an input.
234
+ if (auto phyStage = connection.Cast ().Output ().Stage ().Maybe <TDqPhyStage>()) {
235
+ topSort (phyStage.Cast ());
236
+ }
237
+ }
238
+ }
239
+
240
+ visitedStages.insert (stage.Ref ().UniqueId ());
241
+ topSortedStages.push_back (stage);
242
+ };
243
+
244
+ for (const auto & stage : stages) {
245
+ topSort (stage);
246
+ }
247
+
248
+ return topSortedStages;
249
+ }
250
+
218
251
// TODO: copy-paste from https://github.com/ydb-platform/ydb/blob/122f053354c5df4fc559bf06fe0302f92d813032/ydb/library/yql/dq/opt/dq_opt_build.cpp#L444
219
252
bool IsCompatibleWithBlocks (TPositionHandle pos, const TStructExprType& type, TExprContext& ctx, TTypeAnnotationContext& typesCtx) {
220
253
TVector<const TTypeAnnotationNode*> types;
@@ -300,40 +333,10 @@ TMaybeNode<TKqpPhysicalTx> PeepholeOptimize(const TKqpPhysicalTx& tx, TExprConte
300
333
IGraphTransformer& typeAnnTransformer, TTypeAnnotationContext& typesCtx, THashSet<ui64>& optimizedStages,
301
334
TKikimrConfiguration::TPtr config, bool withFinalStageRules, TSet<TString> disabledOpts)
302
335
{
303
- // Sort stages in topological order by their inputs, so that we optimize the ones without inputs first.
304
- TVector<TDqPhyStage> topSortedStages;
305
- topSortedStages.reserve (tx.Stages ().Size ());
306
- {
307
- std::function<void (const TDqPhyStage&)> topSort;
308
- THashSet<ui64 /* uniqueId*/ > visitedStages;
309
-
310
- // Assume there is no cycles.
311
- topSort = [&](const TDqPhyStage& stage) {
312
- if (visitedStages.contains (stage.Ref ().UniqueId ())) {
313
- return ;
314
- }
315
-
316
- for (const auto & input : stage.Inputs ()) {
317
- if (auto connection = input.Maybe <TDqConnection>()) {
318
- // NOTE: somehow `Output()` is actually an input.
319
- if (auto phyStage = connection.Cast ().Output ().Stage ().Maybe <TDqPhyStage>()) {
320
- topSort (phyStage.Cast ());
321
- }
322
- }
323
- }
324
-
325
- visitedStages.insert (stage.Ref ().UniqueId ());
326
- topSortedStages.push_back (stage);
327
- };
328
-
329
- for (const auto & stage : tx.Stages ()) {
330
- topSort (stage);
331
- }
332
- }
333
-
334
336
THashMap<ui64 /* stage uid*/ , TKqpProgram> programs;
335
337
THashMap<TString, TKqpParamBinding> nonDetParamBindings;
336
338
339
+ const auto topSortedStages = TopSortStages (tx.Stages ());
337
340
for (const auto & stage : topSortedStages) {
338
341
YQL_ENSURE (!optimizedStages.contains (stage.Ref ().UniqueId ()));
339
342
@@ -349,7 +352,6 @@ TMaybeNode<TKqpPhysicalTx> PeepholeOptimize(const TKqpPhysicalTx& tx, TExprConte
349
352
350
353
YQL_ENSURE (stage.Inputs ().Size () == stage.Program ().Args ().Size ());
351
354
352
- bool needRebuild = false ;
353
355
for (size_t i = 0 ; i < stage.Inputs ().Size (); ++i) {
354
356
auto oldArg = stage.Program ().Args ().Arg (i);
355
357
auto newArg = TCoArgument (ctx.NewArgument (oldArg.Pos (), oldArg.Name ()));
@@ -359,8 +361,6 @@ TMaybeNode<TKqpPhysicalTx> PeepholeOptimize(const TKqpPhysicalTx& tx, TExprConte
359
361
if (auto connection = stage.Inputs ().Item (i).Maybe <TDqConnection>(); connection &&
360
362
CanPropagateWideBlockThroughChannel (connection.Cast ().Output (), programs, TDqStageSettings::Parse (stage), ctx, typesCtx))
361
363
{
362
- needRebuild = true ;
363
-
364
364
TExprNode::TPtr newArgNode = ctx.Builder (oldArg.Pos ())
365
365
.Callable (" FromFlow" )
366
366
.Callable (0 , " WideFromBlocks" )
@@ -412,12 +412,11 @@ TMaybeNode<TKqpPhysicalTx> PeepholeOptimize(const TKqpPhysicalTx& tx, TExprConte
412
412
}
413
413
}
414
414
415
- if (needRebuild) {
416
- lambda = Build<TCoLambda>(ctx, lambda.Pos ())
417
- .Args (newArgs)
418
- .Body (ctx.ReplaceNodes (stage.Program ().Body ().Ptr (), argsMap))
419
- .Done ();
420
- }
415
+ // Rebuild lambda with new arguments.
416
+ lambda = Build<TCoLambda>(ctx, lambda.Pos ())
417
+ .Args (newArgs)
418
+ .Body (ctx.ReplaceNodes (stage.Program ().Body ().Ptr (), argsMap))
419
+ .Done ();
421
420
} else {
422
421
for (size_t i = 0 ; i < stage.Inputs ().Size (); ++i) {
423
422
auto oldArg = stage.Program ().Args ().Arg (i);
0 commit comments