Skip to content

Commit 4d1e99e

Browse files
authored
Propagate wide-block through channels (ydb-platform#6096)
1 parent afddf72 commit 4d1e99e

File tree

3 files changed

+273
-27
lines changed

3 files changed

+273
-27
lines changed

ydb/core/kqp/opt/kqp_opt_build_txs.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,7 @@ class TKqpBuildTxsTransformer : public TSyncTransformerBase {
578578
.Add(*TypeAnnTransformer, "TypeAnnotation")
579579
.AddPostTypeAnnotation(/* forSubgraph */ true)
580580
.Add(CreateKqpBuildPhyStagesTransformer(enableSpillingGenericQuery, typesCtx, config->BlockChannelsMode), "BuildPhysicalStages")
581+
// TODO(ilezhankin): "BuildWideBlockChannels" transformer is required only for BLOCK_CHANNELS_FORCE mode.
581582
.Add(CreateKqpBuildWideBlockChannelsTransformer(typesCtx, config->BlockChannelsMode), "BuildWideBlockChannels")
582583
.Add(*BuildTxTransformer, "BuildPhysicalTx")
583584
.Add(CreateKqpTxPeepholeTransformer(

ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp

Lines changed: 207 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@
55
#include <ydb/core/kqp/gateway/kqp_gateway.h>
66
#include <ydb/core/kqp/host/kqp_transform.h>
77
#include <ydb/core/kqp/opt/kqp_opt_impl.h>
8+
#include <ydb/core/protos/table_service_config.pb.h>
89
#include <ydb/library/naming_conventions/naming_conventions.h>
910

1011
#include <ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.h>
1112
#include <ydb/library/yql/core/yql_expr_optimize.h>
1213
#include <ydb/library/yql/core/yql_join.h>
1314
#include <ydb/library/yql/core/yql_opt_utils.h>
1415
#include <ydb/library/yql/dq/opt/dq_opt_peephole.h>
16+
#include <ydb/library/yql/dq/type_ann/dq_type_ann.h>
1517
#include <ydb/library/yql/core/services/yql_transform_pipeline.h>
1618
#include <ydb/library/yql/providers/common/transform/yql_optimize.h>
1719

@@ -153,7 +155,7 @@ class TKqpPeepholeTransformer : public TOptimizeTransformerBase {
153155

154156
struct TKqpPeepholePipelineConfigurator : IPipelineConfigurator {
155157
TKqpPeepholePipelineConfigurator(
156-
TKikimrConfiguration::TPtr config,
158+
TKikimrConfiguration::TPtr config,
157159
TSet<TString> disabledOpts
158160
)
159161
: Config(config)
@@ -213,6 +215,93 @@ struct TKqpPeepholePipelineFinalConfigurator : IPipelineConfigurator {
213215
const TKikimrConfiguration::TPtr Config;
214216
};
215217

218+
// Sort stages in topological order by their inputs, so that we optimize the ones without inputs first.
219+
TVector<TDqPhyStage> TopSortStages(const TDqPhyStageList& stages) {
220+
TVector<TDqPhyStage> topSortedStages;
221+
topSortedStages.reserve(stages.Size());
222+
std::function<void(const TDqPhyStage&)> topSort;
223+
THashSet<ui64 /*uniqueId*/> visitedStages;
224+
225+
// Assume there is no cycles.
226+
topSort = [&](const TDqPhyStage& stage) {
227+
if (visitedStages.contains(stage.Ref().UniqueId())) {
228+
return;
229+
}
230+
231+
for (const auto& input : stage.Inputs()) {
232+
if (auto connection = input.Maybe<TDqConnection>()) {
233+
// NOTE: somehow `Output()` is actually an input.
234+
if (auto phyStage = connection.Cast().Output().Stage().Maybe<TDqPhyStage>()) {
235+
topSort(phyStage.Cast());
236+
}
237+
}
238+
}
239+
240+
visitedStages.insert(stage.Ref().UniqueId());
241+
topSortedStages.push_back(stage);
242+
};
243+
244+
for (const auto& stage : stages) {
245+
topSort(stage);
246+
}
247+
248+
return topSortedStages;
249+
}
250+
251+
// TODO: copy-paste from https://github.com/ydb-platform/ydb/blob/122f053354c5df4fc559bf06fe0302f92d813032/ydb/library/yql/dq/opt/dq_opt_build.cpp#L444
252+
bool IsCompatibleWithBlocks(TPositionHandle pos, const TStructExprType& type, TExprContext& ctx, TTypeAnnotationContext& typesCtx) {
253+
TVector<const TTypeAnnotationNode*> types;
254+
for (auto& item : type.GetItems()) {
255+
types.emplace_back(item->GetItemType());
256+
}
257+
258+
auto resolveStatus = typesCtx.ArrowResolver->AreTypesSupported(ctx.GetPosition(pos), types, ctx);
259+
YQL_ENSURE(resolveStatus != IArrowResolver::ERROR);
260+
return resolveStatus == IArrowResolver::OK;
261+
}
262+
263+
// TODO: composite copy-paste from https://github.com/ydb-platform/ydb/blob/122f053354c5df4fc559bf06fe0302f92d813032/ydb/library/yql/dq/opt/dq_opt_build.cpp#L388
264+
bool CanPropagateWideBlockThroughChannel(
265+
const TDqOutput& output,
266+
const THashMap<ui64, TKqpProgram>& programs,
267+
const TDqStageSettings& stageSettings,
268+
TExprContext& ctx,
269+
TTypeAnnotationContext& typesCtx)
270+
{
271+
const auto& program = programs.at(output.Stage().Ref().UniqueId());
272+
273+
ui32 index = FromString<ui32>(output.Index().Value());
274+
if (index != 0) {
275+
// stage has multiple outputs
276+
return false;
277+
}
278+
279+
auto outputItemType = program.Lambda().Ref().GetTypeAnn()->Cast<TStreamExprType>()->GetItemType();
280+
if (IsWideBlockType(*outputItemType)) {
281+
// output is already wide block
282+
return false;
283+
}
284+
285+
if (!stageSettings.WideChannels) {
286+
return false;
287+
}
288+
289+
YQL_ENSURE(stageSettings.OutputNarrowType);
290+
291+
if (!IsCompatibleWithBlocks(program.Pos(), *stageSettings.OutputNarrowType, ctx, typesCtx)) {
292+
return false;
293+
}
294+
295+
// Ensure that stage has blocks on top level (i.e. FromFlow(WideFromBlocks(...)))
296+
if (!program.Lambda().Body().Maybe<TCoFromFlow>() ||
297+
!program.Lambda().Body().Cast<TCoFromFlow>().Input().Maybe<TCoWideFromBlocks>())
298+
{
299+
return false;
300+
}
301+
302+
return true;
303+
}
304+
216305
TStatus PeepHoleOptimize(const TExprBase& program, TExprNode::TPtr& newProgram, TExprContext& ctx,
217306
IGraphTransformer& typeAnnTransformer, TTypeAnnotationContext& typesCtx, TKikimrConfiguration::TPtr config,
218307
bool allowNonDeterministicFunctions, bool withFinalStageRules, TSet<TString> disabledOpts)
@@ -244,30 +333,114 @@ TMaybeNode<TKqpPhysicalTx> PeepholeOptimize(const TKqpPhysicalTx& tx, TExprConte
244333
IGraphTransformer& typeAnnTransformer, TTypeAnnotationContext& typesCtx, THashSet<ui64>& optimizedStages,
245334
TKikimrConfiguration::TPtr config, bool withFinalStageRules, TSet<TString> disabledOpts)
246335
{
247-
TVector<TDqPhyStage> stages;
248-
stages.reserve(tx.Stages().Size());
249-
TNodeOnNodeOwnedMap stagesMap;
250-
TVector<TKqpParamBinding> bindings(tx.ParamBindings().begin(), tx.ParamBindings().end());
336+
THashMap<ui64 /*stage uid*/, TKqpProgram> programs;
251337
THashMap<TString, TKqpParamBinding> nonDetParamBindings;
252338

253-
for (const auto& stage : tx.Stages()) {
339+
const auto topSortedStages = TopSortStages(tx.Stages());
340+
for (const auto& stage : topSortedStages) {
254341
YQL_ENSURE(!optimizedStages.contains(stage.Ref().UniqueId()));
255342

343+
TCoLambda lambda = stage.Program();
344+
TVector<TCoArgument> newArgs;
345+
newArgs.reserve(stage.Inputs().Size());
346+
347+
// Propagate "WideFromBlock" through connections.
348+
// TODO(ilezhankin): this peephole optimization should be implemented instead as
349+
// the original whole-graph transformer |CreateDqBuildWideBlockChannelsTransformer|.
350+
if (config->BlockChannelsMode == NKikimrConfig::TTableServiceConfig_EBlockChannelsMode_BLOCK_CHANNELS_AUTO) {
351+
TNodeOnNodeOwnedMap argsMap;
352+
353+
YQL_ENSURE(stage.Inputs().Size() == stage.Program().Args().Size());
354+
355+
for (size_t i = 0; i < stage.Inputs().Size(); ++i) {
356+
auto oldArg = stage.Program().Args().Arg(i);
357+
auto newArg = TCoArgument(ctx.NewArgument(oldArg.Pos(), oldArg.Name()));
358+
newArg.MutableRef().SetTypeAnn(oldArg.Ref().GetTypeAnn());
359+
newArgs.emplace_back(newArg);
360+
361+
if (auto connection = stage.Inputs().Item(i).Maybe<TDqConnection>(); connection &&
362+
CanPropagateWideBlockThroughChannel(connection.Cast().Output(), programs, TDqStageSettings::Parse(stage), ctx, typesCtx))
363+
{
364+
TExprNode::TPtr newArgNode = ctx.Builder(oldArg.Pos())
365+
.Callable("FromFlow")
366+
.Callable(0, "WideFromBlocks")
367+
.Callable(0, "ToFlow")
368+
.Add(0, newArg.Ptr())
369+
.Seal()
370+
.Seal()
371+
.Seal()
372+
.Build();
373+
argsMap.emplace(oldArg.Raw(), newArgNode);
374+
375+
auto stageUid = connection.Cast().Output().Stage().Ref().UniqueId();
376+
377+
// Update input program with: FromFlow(WideFromBlocks($1)) → FromFlow($1)
378+
if (const auto& inputProgram = programs.at(stageUid); inputProgram.Lambda().Body().Maybe<TCoFromFlow>() &&
379+
inputProgram.Lambda().Body().Cast<TCoFromFlow>().Input().Maybe<TCoWideFromBlocks>())
380+
{
381+
auto newBody = Build<TCoFromFlow>(ctx, inputProgram.Lambda().Body().Cast<TCoFromFlow>().Pos())
382+
.Input(inputProgram.Lambda().Body().Cast<TCoFromFlow>().Input().Cast<TCoWideFromBlocks>().Input())
383+
.Done();
384+
385+
auto newInputProgram = Build<TKqpProgram>(ctx, inputProgram.Pos())
386+
.Lambda()
387+
.Args(inputProgram.Lambda().Args())
388+
.Body(newBody)
389+
.Build()
390+
.ArgsType(inputProgram.ArgsType())
391+
.Done();
392+
393+
// Run the peephole optimization on new program again to update type annotations.
394+
// TODO(ilezhankin): refactor to run only the update of type annotations - not the whole optimization.
395+
bool allowNonDeterministicFunctions = !newInputProgram.Lambda().Body().Maybe<TKqpEffects>();
396+
TExprNode::TPtr newInputProgramNode;
397+
398+
auto status = PeepHoleOptimize(newInputProgram, newInputProgramNode, ctx, typeAnnTransformer, typesCtx, config,
399+
allowNonDeterministicFunctions, withFinalStageRules, disabledOpts);
400+
if (status != TStatus::Ok) {
401+
ctx.AddError(TIssue(ctx.GetPosition(stage.Pos()), "Peephole optimization failed for KQP transaction"));
402+
return {};
403+
}
404+
405+
programs.at(stageUid) = TKqpProgram(newInputProgramNode);
406+
}
407+
408+
// Update the type annotation for an argument with return value of the input program.
409+
newArg.MutableRef().SetTypeAnn(programs.at(stageUid).Lambda().Body().Ref().GetTypeAnn());
410+
} else {
411+
argsMap.emplace(oldArg.Raw(), newArg.Ptr());
412+
}
413+
}
414+
415+
// Rebuild lambda with new arguments.
416+
lambda = Build<TCoLambda>(ctx, lambda.Pos())
417+
.Args(newArgs)
418+
.Body(ctx.ReplaceNodes(stage.Program().Body().Ptr(), argsMap))
419+
.Done();
420+
} else {
421+
for (size_t i = 0; i < stage.Inputs().Size(); ++i) {
422+
auto oldArg = stage.Program().Args().Arg(i);
423+
auto newArg = TCoArgument(ctx.NewArgument(oldArg.Pos(), oldArg.Name()));
424+
newArg.MutableRef().SetTypeAnn(oldArg.Ref().GetTypeAnn());
425+
newArgs.emplace_back(newArg);
426+
}
427+
}
428+
256429
TVector<const TTypeAnnotationNode*> argTypes;
257-
for (const auto& arg : stage.Program().Args()) {
430+
for (const auto& arg : newArgs) {
258431
YQL_ENSURE(arg.Ref().GetTypeAnn());
259432
argTypes.push_back(arg.Ref().GetTypeAnn());
260433
}
261434

262435
// TODO: get rid of TKqpProgram-callable (YQL-10078)
263-
TNodeOnNodeOwnedMap tmp;
264436
auto program = Build<TKqpProgram>(ctx, stage.Pos())
265-
//.Lambda(ctx.DeepCopy(stage.Program().Ref(), ctx, tmp, true /* internStrings */, false /* copyTypes */))
266-
.Lambda(stage.Program())
437+
.Lambda(lambda)
267438
.ArgsType(ExpandType(stage.Pos(), *ctx.MakeType<TTupleExprType>(argTypes), ctx))
268439
.Done();
269440

270-
bool allowNonDeterministicFunctions = !stage.Program().Body().Maybe<TKqpEffects>();
441+
YQL_ENSURE(programs.emplace(stage.Ref().UniqueId(), program).second);
442+
443+
const bool allowNonDeterministicFunctions = !program.Lambda().Body().Maybe<TKqpEffects>();
271444

272445
TExprNode::TPtr newProgram;
273446
auto status = PeepHoleOptimize(program, newProgram, ctx, typeAnnTransformer, typesCtx, config,
@@ -287,26 +460,34 @@ TMaybeNode<TKqpPhysicalTx> PeepholeOptimize(const TKqpPhysicalTx& tx, TExprConte
287460
}
288461
}
289462

290-
auto newStage = Build<TDqPhyStage>(ctx, stage.Pos())
291-
.Inputs(ctx.ReplaceNodes(stage.Inputs().Ptr(), stagesMap))
292-
.Program(ctx.DeepCopyLambda(TKqpProgram(newProgram).Lambda().Ref()))
293-
.Settings(stage.Settings())
294-
.Outputs(stage.Outputs())
295-
.Done();
296-
297-
stages.emplace_back(newStage);
298-
stagesMap.emplace(stage.Raw(), newStage.Ptr());
299-
300463
optimizedStages.emplace(stage.Ref().UniqueId());
464+
programs.at(stage.Ref().UniqueId()) = TKqpProgram(newProgram);
301465
}
302466

467+
TVector<TKqpParamBinding> bindings(tx.ParamBindings().begin(), tx.ParamBindings().end());
468+
303469
for (const auto& [_, binding] : nonDetParamBindings) {
304470
bindings.emplace_back(std::move(binding));
305471
}
306472

473+
TVector<TDqPhyStage> newStages;
474+
TNodeOnNodeOwnedMap stagesMap;
475+
476+
// Rebuild stages only after all new programs are ready.
477+
for (const auto& stage : topSortedStages) {
478+
auto newStage = Build<TDqPhyStage>(ctx, stage.Pos())
479+
.InitFrom(stage)
480+
.Inputs(ctx.ReplaceNodes(stage.Inputs().Ptr(), stagesMap))
481+
.Program(ctx.DeepCopyLambda(programs.at(stage.Ref().UniqueId()).Lambda().Ref()))
482+
.Done();
483+
484+
newStages.emplace_back(newStage);
485+
stagesMap.emplace(stage.Raw(), newStage.Ptr());
486+
}
487+
307488
return Build<TKqpPhysicalTx>(ctx, tx.Pos())
308489
.Stages()
309-
.Add(stages)
490+
.Add(newStages)
310491
.Build()
311492
.Results(ctx.ReplaceNodes(tx.Results().Ptr(), stagesMap))
312493
.ParamBindings().Add(bindings).Build()
@@ -318,7 +499,7 @@ class TKqpTxPeepholeTransformer : public TSyncTransformerBase {
318499
public:
319500
TKqpTxPeepholeTransformer(
320501
IGraphTransformer* typeAnnTransformer,
321-
TTypeAnnotationContext& typesCtx,
502+
TTypeAnnotationContext& typesCtx,
322503
TKikimrConfiguration::TPtr config,
323504
bool withFinalStageRules,
324505
TSet<TString> disabledOpts
@@ -444,8 +625,8 @@ class TKqpTxsPeepholeTransformer : public TSyncTransformerBase {
444625

445626
TAutoPtr<IGraphTransformer> CreateKqpTxPeepholeTransformer(
446627
NYql::IGraphTransformer* typeAnnTransformer,
447-
TTypeAnnotationContext& typesCtx,
448-
const TKikimrConfiguration::TPtr& config,
628+
TTypeAnnotationContext& typesCtx,
629+
const TKikimrConfiguration::TPtr& config,
449630
bool withFinalStageRules,
450631
TSet<TString> disabledOpts
451632
)
@@ -455,7 +636,7 @@ TAutoPtr<IGraphTransformer> CreateKqpTxPeepholeTransformer(
455636

456637
TAutoPtr<IGraphTransformer> CreateKqpTxsPeepholeTransformer(
457638
TAutoPtr<NYql::IGraphTransformer> typeAnnTransformer,
458-
TTypeAnnotationContext& typesCtx,
639+
TTypeAnnotationContext& typesCtx,
459640
const TKikimrConfiguration::TPtr& config
460641
)
461642
{

0 commit comments

Comments
 (0)