5
5
#include < ydb/core/kqp/gateway/kqp_gateway.h>
6
6
#include < ydb/core/kqp/host/kqp_transform.h>
7
7
#include < ydb/core/kqp/opt/kqp_opt_impl.h>
8
+ #include < ydb/core/protos/table_service_config.pb.h>
8
9
#include < ydb/library/naming_conventions/naming_conventions.h>
9
10
10
11
#include < ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.h>
11
12
#include < ydb/library/yql/core/yql_expr_optimize.h>
12
13
#include < ydb/library/yql/core/yql_join.h>
13
14
#include < ydb/library/yql/core/yql_opt_utils.h>
14
15
#include < ydb/library/yql/dq/opt/dq_opt_peephole.h>
16
+ #include < ydb/library/yql/dq/type_ann/dq_type_ann.h>
15
17
#include < ydb/library/yql/core/services/yql_transform_pipeline.h>
16
18
#include < ydb/library/yql/providers/common/transform/yql_optimize.h>
17
19
@@ -153,7 +155,7 @@ class TKqpPeepholeTransformer : public TOptimizeTransformerBase {
153
155
154
156
struct TKqpPeepholePipelineConfigurator : IPipelineConfigurator {
155
157
TKqpPeepholePipelineConfigurator (
156
- TKikimrConfiguration::TPtr config,
158
+ TKikimrConfiguration::TPtr config,
157
159
TSet<TString> disabledOpts
158
160
)
159
161
: Config(config)
@@ -213,6 +215,60 @@ struct TKqpPeepholePipelineFinalConfigurator : IPipelineConfigurator {
213
215
const TKikimrConfiguration::TPtr Config;
214
216
};
215
217
218
+ // TODO: copy-paste from https://github.com/ydb-platform/ydb/blob/122f053354c5df4fc559bf06fe0302f92d813032/ydb/library/yql/dq/opt/dq_opt_build.cpp#L444
219
+ bool IsCompatibleWithBlocks (TPositionHandle pos, const TStructExprType& type, TExprContext& ctx, TTypeAnnotationContext& typesCtx) {
220
+ TVector<const TTypeAnnotationNode*> types;
221
+ for (auto & item : type.GetItems ()) {
222
+ types.emplace_back (item->GetItemType ());
223
+ }
224
+
225
+ auto resolveStatus = typesCtx.ArrowResolver ->AreTypesSupported (ctx.GetPosition (pos), types, ctx);
226
+ YQL_ENSURE (resolveStatus != IArrowResolver::ERROR);
227
+ return resolveStatus == IArrowResolver::OK;
228
+ }
229
+
230
+ // TODO: composite copy-paste from https://github.com/ydb-platform/ydb/blob/122f053354c5df4fc559bf06fe0302f92d813032/ydb/library/yql/dq/opt/dq_opt_build.cpp#L388
231
+ bool CanPropagateWideBlockThroughChannel (
232
+ const TDqOutput& output,
233
+ const THashMap<ui64, TKqpProgram>& programs,
234
+ const TDqStageSettings& stageSettings,
235
+ TExprContext& ctx,
236
+ TTypeAnnotationContext& typesCtx)
237
+ {
238
+ ui32 index = FromString<ui32>(output.Index ().Value ());
239
+ if (index != 0 ) {
240
+ // stage has multiple outputs
241
+ return false ;
242
+ }
243
+
244
+ const auto & program = programs.at (output.Stage ().Ref ().UniqueId ());
245
+
246
+ auto outputItemType = program.Lambda ().Ref ().GetTypeAnn ()->Cast <TStreamExprType>()->GetItemType ();
247
+ if (IsWideBlockType (*outputItemType)) {
248
+ // output is already wide block
249
+ return false ;
250
+ }
251
+
252
+ if (!stageSettings.WideChannels ) {
253
+ return false ;
254
+ }
255
+
256
+ YQL_ENSURE (stageSettings.OutputNarrowType );
257
+
258
+ if (!IsCompatibleWithBlocks (program.Pos (), *stageSettings.OutputNarrowType , ctx, typesCtx)) {
259
+ return false ;
260
+ }
261
+
262
+ // Ensure that stage has blocks on top level (i.e. FromFlow(WideFromBlocks(...)))
263
+ if (!program.Lambda ().Body ().Maybe <TCoFromFlow>() ||
264
+ !program.Lambda ().Body ().Cast <TCoFromFlow>().Input ().Maybe <TCoWideFromBlocks>())
265
+ {
266
+ return false ;
267
+ }
268
+
269
+ return true ;
270
+ }
271
+
216
272
TStatus PeepHoleOptimize (const TExprBase& program, TExprNode::TPtr& newProgram, TExprContext& ctx,
217
273
IGraphTransformer& typeAnnTransformer, TTypeAnnotationContext& typesCtx, TKikimrConfiguration::TPtr config,
218
274
bool allowNonDeterministicFunctions, bool withFinalStageRules, TSet<TString> disabledOpts)
@@ -244,30 +300,148 @@ TMaybeNode<TKqpPhysicalTx> PeepholeOptimize(const TKqpPhysicalTx& tx, TExprConte
244
300
IGraphTransformer& typeAnnTransformer, TTypeAnnotationContext& typesCtx, THashSet<ui64>& optimizedStages,
245
301
TKikimrConfiguration::TPtr config, bool withFinalStageRules, TSet<TString> disabledOpts)
246
302
{
247
- TVector<TDqPhyStage> stages;
248
- stages.reserve (tx.Stages ().Size ());
249
- TNodeOnNodeOwnedMap stagesMap;
250
- TVector<TKqpParamBinding> bindings (tx.ParamBindings ().begin (), tx.ParamBindings ().end ());
303
+ // Sort stages in topological order by their inputs, so that we optimize the ones without inputs first.
304
+ TVector<TDqPhyStage> topSortedStages;
305
+ topSortedStages.reserve (tx.Stages ().Size ());
306
+ {
307
+ std::function<void (const TDqPhyStage&)> topSort;
308
+ THashSet<ui64 /* uniqueId*/ > visitedStages;
309
+
310
+ // Assume there is no cycles.
311
+ topSort = [&](const TDqPhyStage& stage) {
312
+ if (visitedStages.contains (stage.Ref ().UniqueId ())) {
313
+ return ;
314
+ }
315
+
316
+ for (const auto & input : stage.Inputs ()) {
317
+ if (auto connection = input.Maybe <TDqConnection>()) {
318
+ // NOTE: somehow `Output()` is actually an input.
319
+ if (auto phyStage = connection.Cast ().Output ().Stage ().Maybe <TDqPhyStage>()) {
320
+ topSort (phyStage.Cast ());
321
+ }
322
+ }
323
+ }
324
+
325
+ visitedStages.insert (stage.Ref ().UniqueId ());
326
+ topSortedStages.push_back (stage);
327
+ };
328
+
329
+ for (const auto & stage : tx.Stages ()) {
330
+ topSort (stage);
331
+ }
332
+ }
333
+
334
+ THashMap<ui64 /* stage uid*/ , TKqpProgram> programs;
251
335
THashMap<TString, TKqpParamBinding> nonDetParamBindings;
252
336
253
- for (const auto & stage : tx. Stages () ) {
337
+ for (const auto & stage : topSortedStages ) {
254
338
YQL_ENSURE (!optimizedStages.contains (stage.Ref ().UniqueId ()));
255
339
340
+ TCoLambda lambda = stage.Program ();
341
+ TVector<TCoArgument> newArgs;
342
+ newArgs.reserve (stage.Inputs ().Size ());
343
+
344
+ // Propagate "WideFromBlock" through connections.
345
+ // TODO(ilezhankin): this peephole optimization should be implemented instead as
346
+ // the original whole-graph transformer |CreateDqBuildWideBlockChannelsTransformer|.
347
+ if (config->BlockChannelsMode != NKikimrConfig::TTableServiceConfig_EBlockChannelsMode_BLOCK_CHANNELS_SCALAR) {
348
+ TNodeOnNodeOwnedMap argsMap;
349
+
350
+ YQL_ENSURE (stage.Inputs ().Size () == stage.Program ().Args ().Size ());
351
+
352
+ bool needRebuild = false ;
353
+ for (size_t i = 0 ; i < stage.Inputs ().Size (); ++i) {
354
+ auto oldArg = stage.Program ().Args ().Arg (i);
355
+ auto newArg = TCoArgument (ctx.NewArgument (oldArg.Pos (), oldArg.Name ()));
356
+ newArg.MutableRef ().SetTypeAnn (oldArg.Ref ().GetTypeAnn ());
357
+ newArgs.emplace_back (newArg);
358
+
359
+ if (auto connection = stage.Inputs ().Item (i).Maybe <TDqConnection>(); connection &&
360
+ CanPropagateWideBlockThroughChannel (connection.Cast ().Output (), programs, TDqStageSettings::Parse (stage), ctx, typesCtx))
361
+ {
362
+ needRebuild = true ;
363
+
364
+ TExprNode::TPtr newArgNode = ctx.Builder (oldArg.Pos ())
365
+ .Callable (" FromFlow" )
366
+ .Callable (0 , " WideFromBlocks" )
367
+ .Callable (0 , " ToFlow" )
368
+ .Add (0 , newArg.Ptr ())
369
+ .Seal ()
370
+ .Seal ()
371
+ .Seal ()
372
+ .Build ();
373
+ argsMap.emplace (oldArg.Raw (), newArgNode);
374
+
375
+ auto stageUid = connection.Cast ().Output ().Stage ().Ref ().UniqueId ();
376
+
377
+ // Update input program with: FromFlow(WideFromBlocks($1)) → FromFlow($1)
378
+ {
379
+ const auto & inputProgram = programs.at (stageUid);
380
+
381
+ auto newBody = Build<TCoFromFlow>(ctx, inputProgram.Lambda ().Body ().Cast <TCoFromFlow>().Pos ())
382
+ .Input (inputProgram.Lambda ().Body ().Cast <TCoFromFlow>().Input ().Cast <TCoWideFromBlocks>().Input ())
383
+ .Done ();
384
+
385
+ auto newInputProgram = Build<TKqpProgram>(ctx, inputProgram.Pos ())
386
+ .Lambda ()
387
+ .Args (inputProgram.Lambda ().Args ())
388
+ .Body (newBody)
389
+ .Build ()
390
+ .ArgsType (inputProgram.ArgsType ())
391
+ .Done ();
392
+
393
+ // Run the peephole optimization on new program again to update type annotations.
394
+ // TODO(ilezhankin): refactor to run only the update of type annotations - not the whole optimization.
395
+ bool allowNonDeterministicFunctions = !newInputProgram.Lambda ().Body ().Maybe <TKqpEffects>();
396
+ TExprNode::TPtr newInputProgramNode;
397
+
398
+ auto status = PeepHoleOptimize (newInputProgram, newInputProgramNode, ctx, typeAnnTransformer, typesCtx, config,
399
+ allowNonDeterministicFunctions, withFinalStageRules, disabledOpts);
400
+ if (status != TStatus::Ok) {
401
+ ctx.AddError (TIssue (ctx.GetPosition (stage.Pos ()), " Peephole optimization failed for KQP transaction" ));
402
+ return {};
403
+ }
404
+
405
+ programs.at (stageUid) = TKqpProgram (newInputProgramNode);
406
+ }
407
+
408
+ // Update the type annotation for an argument with return value of the input program.
409
+ newArg.MutableRef ().SetTypeAnn (programs.at (stageUid).Lambda ().Body ().Ref ().GetTypeAnn ());
410
+ } else {
411
+ argsMap.emplace (oldArg.Raw (), newArg.Ptr ());
412
+ }
413
+ }
414
+
415
+ if (needRebuild) {
416
+ lambda = Build<TCoLambda>(ctx, lambda.Pos ())
417
+ .Args (newArgs)
418
+ .Body (ctx.ReplaceNodes (stage.Program ().Body ().Ptr (), argsMap))
419
+ .Done ();
420
+ }
421
+ } else {
422
+ for (size_t i = 0 ; i < stage.Inputs ().Size (); ++i) {
423
+ auto oldArg = stage.Program ().Args ().Arg (i);
424
+ auto newArg = TCoArgument (ctx.NewArgument (oldArg.Pos (), oldArg.Name ()));
425
+ newArg.MutableRef ().SetTypeAnn (oldArg.Ref ().GetTypeAnn ());
426
+ newArgs.emplace_back (newArg);
427
+ }
428
+ }
429
+
256
430
TVector<const TTypeAnnotationNode*> argTypes;
257
- for (const auto & arg : stage. Program (). Args () ) {
431
+ for (const auto & arg : newArgs ) {
258
432
YQL_ENSURE (arg.Ref ().GetTypeAnn ());
259
433
argTypes.push_back (arg.Ref ().GetTypeAnn ());
260
434
}
261
435
262
436
// TODO: get rid of TKqpProgram-callable (YQL-10078)
263
- TNodeOnNodeOwnedMap tmp;
264
437
auto program = Build<TKqpProgram>(ctx, stage.Pos ())
265
- // .Lambda(ctx.DeepCopy(stage.Program().Ref(), ctx, tmp, true /* internStrings */, false /* copyTypes */))
266
- .Lambda (stage.Program ())
438
+ .Lambda (lambda)
267
439
.ArgsType (ExpandType (stage.Pos (), *ctx.MakeType <TTupleExprType>(argTypes), ctx))
268
440
.Done ();
269
441
270
- bool allowNonDeterministicFunctions = !stage.Program ().Body ().Maybe <TKqpEffects>();
442
+ YQL_ENSURE (programs.emplace (stage.Ref ().UniqueId (), program).second );
443
+
444
+ const bool allowNonDeterministicFunctions = !program.Lambda ().Body ().Maybe <TKqpEffects>();
271
445
272
446
TExprNode::TPtr newProgram;
273
447
auto status = PeepHoleOptimize (program, newProgram, ctx, typeAnnTransformer, typesCtx, config,
@@ -287,26 +461,34 @@ TMaybeNode<TKqpPhysicalTx> PeepholeOptimize(const TKqpPhysicalTx& tx, TExprConte
287
461
}
288
462
}
289
463
290
- auto newStage = Build<TDqPhyStage>(ctx, stage.Pos ())
291
- .Inputs (ctx.ReplaceNodes (stage.Inputs ().Ptr (), stagesMap))
292
- .Program (ctx.DeepCopyLambda (TKqpProgram (newProgram).Lambda ().Ref ()))
293
- .Settings (stage.Settings ())
294
- .Outputs (stage.Outputs ())
295
- .Done ();
296
-
297
- stages.emplace_back (newStage);
298
- stagesMap.emplace (stage.Raw (), newStage.Ptr ());
299
-
300
464
optimizedStages.emplace (stage.Ref ().UniqueId ());
465
+ programs.at (stage.Ref ().UniqueId ()) = TKqpProgram (newProgram);
301
466
}
302
467
468
+ TVector<TKqpParamBinding> bindings (tx.ParamBindings ().begin (), tx.ParamBindings ().end ());
469
+
303
470
for (const auto & [_, binding] : nonDetParamBindings) {
304
471
bindings.emplace_back (std::move (binding));
305
472
}
306
473
474
+ TVector<TDqPhyStage> newStages;
475
+ TNodeOnNodeOwnedMap stagesMap;
476
+
477
+ // Rebuild stages only after all new programs are ready.
478
+ for (const auto & stage : topSortedStages) {
479
+ auto newStage = Build<TDqPhyStage>(ctx, stage.Pos ())
480
+ .InitFrom (stage)
481
+ .Inputs (ctx.ReplaceNodes (stage.Inputs ().Ptr (), stagesMap))
482
+ .Program (ctx.DeepCopyLambda (programs.at (stage.Ref ().UniqueId ()).Lambda ().Ref ()))
483
+ .Done ();
484
+
485
+ newStages.emplace_back (newStage);
486
+ stagesMap.emplace (stage.Raw (), newStage.Ptr ());
487
+ }
488
+
307
489
return Build<TKqpPhysicalTx>(ctx, tx.Pos ())
308
490
.Stages ()
309
- .Add (stages )
491
+ .Add (newStages )
310
492
.Build ()
311
493
.Results (ctx.ReplaceNodes (tx.Results ().Ptr (), stagesMap))
312
494
.ParamBindings ().Add (bindings).Build ()
@@ -318,7 +500,7 @@ class TKqpTxPeepholeTransformer : public TSyncTransformerBase {
318
500
public:
319
501
TKqpTxPeepholeTransformer (
320
502
IGraphTransformer* typeAnnTransformer,
321
- TTypeAnnotationContext& typesCtx,
503
+ TTypeAnnotationContext& typesCtx,
322
504
TKikimrConfiguration::TPtr config,
323
505
bool withFinalStageRules,
324
506
TSet<TString> disabledOpts
@@ -444,8 +626,8 @@ class TKqpTxsPeepholeTransformer : public TSyncTransformerBase {
444
626
445
627
TAutoPtr<IGraphTransformer> CreateKqpTxPeepholeTransformer (
446
628
NYql::IGraphTransformer* typeAnnTransformer,
447
- TTypeAnnotationContext& typesCtx,
448
- const TKikimrConfiguration::TPtr& config,
629
+ TTypeAnnotationContext& typesCtx,
630
+ const TKikimrConfiguration::TPtr& config,
449
631
bool withFinalStageRules,
450
632
TSet<TString> disabledOpts
451
633
)
@@ -455,7 +637,7 @@ TAutoPtr<IGraphTransformer> CreateKqpTxPeepholeTransformer(
455
637
456
638
TAutoPtr<IGraphTransformer> CreateKqpTxsPeepholeTransformer (
457
639
TAutoPtr<NYql::IGraphTransformer> typeAnnTransformer,
458
- TTypeAnnotationContext& typesCtx,
640
+ TTypeAnnotationContext& typesCtx,
459
641
const TKikimrConfiguration::TPtr& config
460
642
)
461
643
{
0 commit comments