@@ -287,12 +287,6 @@ bool CanPropagateWideBlockThroughChannel(
287
287
return false ;
288
288
}
289
289
290
- auto outputItemType = program.Lambda ().Ref ().GetTypeAnn ()->Cast <TStreamExprType>()->GetItemType ();
291
- if (IsWideBlockType (*outputItemType)) {
292
- // output is already wide block
293
- return false ;
294
- }
295
-
296
290
YQL_ENSURE (stageSettings.OutputNarrowType );
297
291
298
292
if (!IsCompatibleWithBlocks (program.Pos (), *stageSettings.OutputNarrowType , ctx, typesCtx)) {
@@ -306,6 +300,15 @@ bool CanPropagateWideBlockThroughChannel(
306
300
return false ;
307
301
}
308
302
303
+ auto typeAnnotation = program.Lambda ().Ref ().GetTypeAnn ();
304
+
305
+ YQL_ENSURE (typeAnnotation, " Program for stage " << output.Stage ().Ref ().UniqueId () << " doesn't have type annotation" );
306
+
307
+ if (IsWideBlockType (*typeAnnotation->Cast <TStreamExprType>()->GetItemType ())) {
308
+ // output is already wide block
309
+ return false ;
310
+ }
311
+
309
312
return true ;
310
313
}
311
314
@@ -445,8 +448,6 @@ TMaybeNode<TKqpPhysicalTx> PeepholeOptimize(const TKqpPhysicalTx& tx, TExprConte
445
448
.ArgsType (ExpandType (stage.Pos (), *ctx.MakeType <TTupleExprType>(argTypes), ctx))
446
449
.Done ();
447
450
448
- YQL_ENSURE (programs.emplace (stage.Ref ().UniqueId (), program).second );
449
-
450
451
const bool allowNonDeterministicFunctions = !program.Lambda ().Body ().Maybe <TKqpEffects>();
451
452
452
453
TExprNode::TPtr newProgram;
@@ -468,7 +469,7 @@ TMaybeNode<TKqpPhysicalTx> PeepholeOptimize(const TKqpPhysicalTx& tx, TExprConte
468
469
}
469
470
470
471
optimizedStages.emplace (stage.Ref ().UniqueId ());
471
- programs.at (stage.Ref ().UniqueId ()) = TKqpProgram (newProgram);
472
+ YQL_ENSURE ( programs.emplace (stage.Ref ().UniqueId (), TKqpProgram (newProgram)). second );
472
473
}
473
474
474
475
TVector<TKqpParamBinding> bindings (tx.ParamBindings ().begin (), tx.ParamBindings ().end ());
0 commit comments