Skip to content

Commit 48e8f3d

Browse files
authored
[dq] Don't make physical operations for callables with outer dependencies (#5771)
1 parent d730690 commit 48e8f3d

File tree

5 files changed

+38
-75
lines changed

5 files changed

+38
-75
lines changed

ydb/library/yql/dq/opt/dq_opt.cpp

+2-42
Original file line numberDiff line numberDiff line change
@@ -130,47 +130,7 @@ bool IsDqPureExpr(const TExprBase& node, bool isPrecomputePure) {
130130
}
131131

132132
bool IsDqSelfContainedExpr(const TExprBase& node) {
133-
bool selfContained = true;
134-
TNodeSet knownArguments;
135-
136-
VisitExpr(node.Ptr(),
137-
[&selfContained, &knownArguments] (const TExprNode::TPtr& node) {
138-
if (!selfContained) {
139-
return false;
140-
}
141-
142-
if (auto maybeLambda = TMaybeNode<TCoLambda>(node)) {
143-
for (const auto& arg : maybeLambda.Cast().Args()) {
144-
YQL_ENSURE(knownArguments.emplace(arg.Raw()).second);
145-
}
146-
}
147-
148-
if (node->IsArgument()) {
149-
if (!knownArguments.contains(node.Get())) {
150-
selfContained = false;
151-
return false;
152-
}
153-
}
154-
155-
return true;
156-
},
157-
[&selfContained, &knownArguments] (const TExprNode::TPtr& node) {
158-
if (!selfContained) {
159-
return false;
160-
}
161-
162-
if (auto maybeLambda = TMaybeNode<TCoLambda>(node)) {
163-
for (const auto& arg : maybeLambda.Cast().Args()) {
164-
auto it = knownArguments.find(arg.Raw());
165-
YQL_ENSURE(it != knownArguments.end());
166-
knownArguments.erase(it);
167-
}
168-
}
169-
170-
return true;
171-
});
172-
173-
return selfContained;
133+
return node.Ref().IsComplete();
174134
}
175135

176136
bool IsDqDependsOnStage(const TExprBase& node, const TDqStageBase& stage) {
@@ -193,7 +153,7 @@ bool IsDqDependsOnStageOutput(const TExprBase& node, const TDqStageBase& stage,
193153
}
194154

195155
bool CanPushDqExpr(const TExprBase& expr, const TDqStageBase& stage) {
196-
return IsDqPureExpr(expr, true) && !IsDqDependsOnStage(expr, stage);
156+
return IsDqCompletePureExpr(expr, true) && !IsDqDependsOnStage(expr, stage);
197157
}
198158

199159
bool CanPushDqExpr(const TExprBase& expr, const TDqConnection& connection) {

ydb/library/yql/dq/opt/dq_opt.h

+4
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ ui32 GetStageOutputsCount(const NNodes::TDqStageBase& stage);
2727
void FindDqConnections(const NNodes::TExprBase& node, TVector<NNodes::TDqConnection>& connections, bool& isPure);
2828
bool DqStageFirstInputIsBroadcast(const NNodes::TDqStageBase& stage);
2929
bool IsDqPureExpr(const NNodes::TExprBase& node, bool isPrecomputePure = true);
30+
inline bool IsDqCompletePureExpr(const NNodes::TExprBase& node, bool isPrecomputePure = true) {
31+
return node.Ref().IsComplete() && IsDqPureExpr(node, isPrecomputePure);
32+
}
33+
3034
bool IsDqSelfContainedExpr(const NNodes::TExprBase& node);
3135
bool IsDqDependsOnStage(const NNodes::TExprBase& node, const NNodes::TDqStageBase& stage);
3236
bool IsDqDependsOnStageOutput(const NNodes::TExprBase& node, const NNodes::TDqStageBase& stage, ui32 outputIndex);

ydb/library/yql/dq/opt/dq_opt_join.cpp

+5-5
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,7 @@ TExprBase DqRewriteLeftPureJoin(const TExprBase node, TExprContext& ctx, const T
560560
return node;
561561
}
562562

563-
if (!IsDqPureExpr(join.LeftInput())) {
563+
if (!IsDqCompletePureExpr(join.LeftInput())) {
564564
return node;
565565
}
566566

@@ -599,7 +599,7 @@ TExprBase DqRewriteLeftPureJoin(const TExprBase node, TExprContext& ctx, const T
599599
.JoinType().Build(joinType)
600600
.LeftJoinKeyNames(join.LeftJoinKeyNames())
601601
.RightJoinKeyNames(join.RightJoinKeyNames())
602-
.Done();
602+
.Done();
603603
}
604604

605605
TExprBase DqBuildPhyJoin(const TDqJoin& join, bool pushLeftStage, TExprContext& ctx, IOptimizationContext& optCtx) {
@@ -621,7 +621,7 @@ TExprBase DqBuildPhyJoin(const TDqJoin& join, bool pushLeftStage, TExprContext&
621621
TDqCnUnionAll leftCn = join.LeftInput().Cast<TDqCnUnionAll>();
622622

623623
TMaybeNode<TDqCnUnionAll> rightCn = join.RightInput().Maybe<TDqCnUnionAll>();
624-
YQL_ENSURE(rightCn || IsDqPureExpr(join.RightInput(), /* isPrecomputePure */ true));
624+
YQL_ENSURE(rightCn || IsDqCompletePureExpr(join.RightInput(), /* isPrecomputePure */ true));
625625

626626
TMaybeNode<TDqCnBroadcast> rightBroadcast;
627627
TNodeOnNodeOwnedMap rightPrecomputes;
@@ -875,7 +875,7 @@ TExprBase DqBuildJoinDict(const TDqJoin& join, TExprContext& ctx) {
875875
}
876876

877877
// join stream with pure expr
878-
else if (leftIsUnionAll && IsDqPureExpr(join.RightInput(), /* isPrecomputePure */ true)) {
878+
else if (leftIsUnionAll && IsDqCompletePureExpr(join.RightInput(), /* isPrecomputePure */ true)) {
879879
auto leftCn = join.LeftInput().Cast<TDqCnUnionAll>();
880880

881881
auto [leftJoinKeys, _] = GetJoinKeys(join, ctx);
@@ -909,7 +909,7 @@ TExprBase DqBuildJoinDict(const TDqJoin& join, TExprContext& ctx) {
909909
}
910910

911911
// join pure expr with stream
912-
else if (IsDqPureExpr(join.RightInput(), /* isPrecomputePure */ true) && rightIsUnionAll) {
912+
else if (IsDqCompletePureExpr(join.RightInput(), /* isPrecomputePure */ true) && rightIsUnionAll) {
913913
auto rightCn = join.RightInput().Cast<TDqCnUnionAll>();
914914

915915
auto [_, rightJoinKeys] = GetJoinKeys(join, ctx);

ydb/library/yql/dq/opt/dq_opt_log.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ TExprBase DqRewriteTakeSortToTopSort(TExprBase node, TExprContext& ctx, const TP
3737
}
3838
auto take = node.Cast<TCoTake>();
3939

40-
if (!IsDqPureExpr(take.Count())) {
40+
if (!IsDqCompletePureExpr(take.Count())) {
4141
return node;
4242
}
4343

@@ -51,7 +51,7 @@ TExprBase DqRewriteTakeSortToTopSort(TExprBase node, TExprContext& ctx, const TP
5151
return node;
5252
}
5353

54-
if (!IsDqPureExpr(maybeSkip.Cast().Count())) {
54+
if (!IsDqCompletePureExpr(maybeSkip.Cast().Count())) {
5555
return node;
5656
}
5757
}

ydb/library/yql/dq/opt/dq_opt_phy.cpp

+25-26
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ TMaybeNode<TCoMux> ConvertMuxArgumentsToFlows(TCoMux node, TExprContext& ctx) {
6262
if (child.Maybe<TDqConnection>().IsValid()) {
6363
muxArgs.push_back(child);
6464
}
65-
else if (IsDqPureExpr(child)) {
65+
else if (IsDqCompletePureExpr(child)) {
6666
muxArgs.push_back(Build<TCoToFlow>(ctx, node.Pos())
6767
.Input(child)
6868
.Done());
@@ -242,9 +242,9 @@ TExprBase DqBuildPartitionsStageStub(TExprBase node, TExprContext& ctx, IOptimiz
242242
}
243243

244244
auto partition = node.Cast<TPartition>();
245-
if (!IsDqPureExpr(partition.KeySelectorLambda()) ||
246-
!IsDqPureExpr(partition.ListHandlerLambda()) ||
247-
!IsDqPureExpr(partition.SortKeySelectorLambda()))
245+
if (!IsDqCompletePureExpr(partition.KeySelectorLambda()) ||
246+
!IsDqCompletePureExpr(partition.ListHandlerLambda()) ||
247+
!IsDqCompletePureExpr(partition.SortKeySelectorLambda()))
248248
{
249249
return node;
250250
}
@@ -710,11 +710,7 @@ TExprBase DqBuildPureFlatmapStage(TExprBase node, TExprContext& ctx) {
710710

711711
auto flatmap = node.Cast<TCoFlatMapBase>();
712712

713-
if (!IsDqPureExpr(flatmap.Input())) {
714-
return node;
715-
}
716-
717-
if (!IsDqSelfContainedExpr(flatmap.Input())) {
713+
if (!IsDqCompletePureExpr(flatmap.Input()) || !IsDqSelfContainedExpr(flatmap.Lambda())) {
718714
return node;
719715
}
720716

@@ -757,6 +753,9 @@ TExprBase DqBuildFlatmapStage(TExprBase node, TExprContext& ctx, IOptimizationCo
757753
}
758754

759755
auto flatmap = node.Cast<TCoFlatMapBase>();
756+
if (!IsDqSelfContainedExpr(flatmap.Lambda())) {
757+
return node;
758+
}
760759
auto dqUnion = flatmap.Input().Cast<TDqCnUnionAll>();
761760
if (!IsSingleConsumerConnection(dqUnion, parentsMap, allowStageMultiUsage)) {
762761
return node;
@@ -914,7 +913,7 @@ TExprBase DqBuildLMapOverMuxStageStub(TExprBase node, TExprContext& ctx, NYql::I
914913
return node;
915914
}
916915

917-
if (!IsDqPureExpr(lmap.Lambda())) {
916+
if (!IsDqCompletePureExpr(lmap.Lambda())) {
918917
return node;
919918
}
920919

@@ -991,11 +990,11 @@ TExprBase DqPushCombineToStage(TExprBase node, TExprContext& ctx, IOptimizationC
991990
return node;
992991
}
993992

994-
if (!IsDqPureExpr(combine.PreMapLambda()) ||
995-
!IsDqPureExpr(combine.KeySelectorLambda()) ||
996-
!IsDqPureExpr(combine.InitHandlerLambda()) ||
997-
!IsDqPureExpr(combine.UpdateHandlerLambda()) ||
998-
!IsDqPureExpr(combine.FinishHandlerLambda()))
993+
if (!IsDqCompletePureExpr(combine.PreMapLambda()) ||
994+
!IsDqCompletePureExpr(combine.KeySelectorLambda()) ||
995+
!IsDqCompletePureExpr(combine.InitHandlerLambda()) ||
996+
!IsDqCompletePureExpr(combine.UpdateHandlerLambda()) ||
997+
!IsDqCompletePureExpr(combine.FinishHandlerLambda()))
999998
{
1000999
return node;
10011000
}
@@ -1126,8 +1125,8 @@ TExprBase DqBuildShuffleStage(TExprBase node, TExprContext& ctx, IOptimizationCo
11261125
}
11271126

11281127
auto shuffle = node.Cast<TCoShuffleByKeys>();
1129-
if (!IsDqPureExpr(shuffle.KeySelectorLambda()) ||
1130-
!IsDqPureExpr(shuffle.ListHandlerLambda()))
1128+
if (!IsDqCompletePureExpr(shuffle.KeySelectorLambda()) ||
1129+
!IsDqCompletePureExpr(shuffle.ListHandlerLambda()))
11311130
{
11321131
return node;
11331132
}
@@ -1753,7 +1752,7 @@ TExprBase DqBuildSortStage(TExprBase node, TExprContext& ctx, IOptimizationConte
17531752
TVector<const TTypeAnnotationNode*> sortKeyTypes;
17541753

17551754
auto lambdaBody = sortKeySelector.Body();
1756-
if (IsDqPureExpr(sortKeySelector)) {
1755+
if (IsDqCompletePureExpr(sortKeySelector)) {
17571756
if (auto maybeColTuple = lambdaBody.Maybe<TExprList>()) {
17581757
auto tuple = maybeColTuple.Cast();
17591758
sortKeyTypes.reserve(tuple.Size());
@@ -1851,7 +1850,7 @@ TExprBase DqBuildSkipStage(TExprBase node, TExprContext& ctx, IOptimizationConte
18511850
}
18521851

18531852
auto skip = node.Cast<TCoSkip>();
1854-
if (!IsDqPureExpr(skip.Count())) {
1853+
if (!IsDqCompletePureExpr(skip.Count())) {
18551854
return node;
18561855
}
18571856

@@ -2087,7 +2086,7 @@ TExprBase DqRewriteLengthOfStageOutput(TExprBase node, TExprContext& ctx, IOptim
20872086
}
20882087

20892088
TExprBase DqBuildPureExprStage(TExprBase node, TExprContext& ctx) {
2090-
if (!IsDqPureExpr(node)) {
2089+
if (!IsDqCompletePureExpr(node)) {
20912090
return node;
20922091
}
20932092

@@ -2141,7 +2140,7 @@ TExprBase DqBuildExtendStage(TExprBase node, TExprContext& ctx) {
21412140
inputConns.push_back(conn);
21422141
inputArgs.push_back(programArg);
21432142
extendArgs.push_back(programArg);
2144-
} else if (IsDqPureExpr(arg)) {
2143+
} else if (IsDqCompletePureExpr(arg)) {
21452144
// arg is deemed to be a pure expression so leave it inside (Extend ...)
21462145
extendArgs.push_back(Build<TCoToFlow>(ctx, arg.Pos())
21472146
.Input(arg)
@@ -2193,7 +2192,7 @@ TExprBase DqBuildPrecompute(TExprBase node, TExprContext& ctx) {
21932192
connection = input.Ptr();
21942193
} else if (input.Maybe<TCoParameter>()) {
21952194
return input;
2196-
} else if (IsDqPureExpr(input)) {
2195+
} else if (IsDqCompletePureExpr(input)) {
21972196
if (input.Ref().GetTypeAnn()->GetKind() != ETypeAnnotationKind::List &&
21982197
input.Ref().GetTypeAnn()->GetKind() != ETypeAnnotationKind::Data)
21992198
{
@@ -2327,7 +2326,7 @@ TExprBase DqBuildSqlIn(TExprBase node, TExprContext& ctx, IOptimizationContext&
23272326
return node;
23282327
}
23292328

2330-
if (!IsDqPureExpr(sqlIn.Lookup())) {
2329+
if (!IsDqCompletePureExpr(sqlIn.Lookup())) {
23312330
return node;
23322331
}
23332332

@@ -2600,7 +2599,7 @@ bool DqValidateJoinInputs(const TExprBase& left, const TExprBase& right, const T
26002599
if (!IsSingleConsumerConnection(right.Cast<TDqCnUnionAll>(), parentsMap, allowStageMultiUsage)) {
26012600
return false;
26022601
}
2603-
} else if (IsDqPureExpr(right, /* isPrecomputePure */ true)) {
2602+
} else if (IsDqCompletePureExpr(right, /* isPrecomputePure */ true)) {
26042603
// pass
26052604
} else {
26062605
return false;
@@ -2674,8 +2673,8 @@ TExprBase DqBuildJoin(const TExprBase& node, TExprContext& ctx, IOptimizationCon
26742673
}
26752674

26762675
bool useHashJoin = EHashJoinMode::Off != hashJoin
2677-
&& joinType != "Cross"sv
2678-
&& leftIsUnionAll
2676+
&& joinType != "Cross"sv
2677+
&& leftIsUnionAll
26792678
&& rightIsUnionAll;
26802679

26812680
if (DqValidateJoinInputs(join.LeftInput(), join.RightInput(), parentsMap, allowStageMultiUsage)) {

0 commit comments

Comments
 (0)