Skip to content

Commit 621daf0

Browse files
committed
review fixes
1 parent 6111922 commit 621daf0

File tree

6 files changed

+26
-18
lines changed

6 files changed

+26
-18
lines changed

ydb/library/yql/dq/tasks/dq_task_program.cpp

+5-3
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,11 @@ TString BuildProgram(NNodes::TCoLambda program, const TStructExprType& paramsTyp
7878
TRuntimeNode rootNode = MkqlBuildExpr(program.Body().Ref(), ctx);
7979

8080
TExploringNodeVisitor explorer;
81-
explorer.Walk(rootNode.GetNode(), typeEnv);
82-
bool wereChanges = false;
83-
rootNode = SinglePassVisitCallables(rootNode, explorer, TSpillingTransformProvider(spillingSettings), typeEnv, true, wereChanges);
81+
if (spillingSettings) {
82+
explorer.Walk(rootNode.GetNode(), typeEnv);
83+
bool wereChanges = false;
84+
rootNode = SinglePassVisitCallables(rootNode, explorer, TSpillingTransformProvider(spillingSettings), typeEnv, true, wereChanges);
85+
}
8486

8587
TStructLiteralBuilder structBuilder(typeEnv);
8688
structBuilder.Add("Program", rootNode);

ydb/library/yql/dq/tasks/dq_task_program.h

+4
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@
1010
namespace NYql::NDq {
1111

1212
struct TSpillingSettings {
13+
operator bool() const {
14+
return EnableSpillingInGraceJoin;
15+
}
16+
1317
bool EnableSpillingInGraceJoin = false;
1418
};
1519

ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp

+2-11
Original file line numberDiff line numberDiff line change
@@ -1170,23 +1170,14 @@ IComputationNode* WrapGraceJoinCommon(TCallable& callable, const TComputationNod
11701170

11711171
IComputationNode* WrapGraceJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
11721172
MKQL_ENSURE(callable.GetInputsCount() == 8, "Expected 8 args");
1173-
bool isSpillingAllowed = false;
1174-
if (callable.GetType()->GetName() == "GraceJoinWithSpilling") {
1175-
isSpillingAllowed = true;
1176-
}
11771173

1178-
return WrapGraceJoinCommon(callable, ctx, false, isSpillingAllowed);
1174+
return WrapGraceJoinCommon(callable, ctx, false, callable.IsSpillingSupported());
11791175
}
11801176

11811177
IComputationNode* WrapGraceSelfJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
11821178
MKQL_ENSURE(callable.GetInputsCount() == 7, "Expected 7 args");
1183-
1184-
bool isSpillingAllowed = false;
1185-
if (callable.GetType()->GetName() == "GraceSelfJoinWithSpilling") {
1186-
isSpillingAllowed = true;
1187-
}
11881179

1189-
return WrapGraceJoinCommon(callable, ctx, true, isSpillingAllowed);
1180+
return WrapGraceJoinCommon(callable, ctx, true, callable.IsSpillingSupported());
11901181
}
11911182

11921183
}

ydb/library/yql/minikql/mkql_node.h

+4
Original file line numberDiff line numberDiff line change
@@ -1075,6 +1075,10 @@ friend class TNode;
10751075
UniqueId = uniqueId;
10761076
}
10771077

1078+
bool IsSpillingSupported() const {
1079+
return TStringBuf(GetType()->GetName()).EndsWith("WithSpilling"_sb);
1080+
}
1081+
10781082
private:
10791083
TCallable(ui32 inputsCount, TRuntimeNode* inputs, TCallableType* type, bool validate = true);
10801084
TCallable(TRuntimeNode result, TCallableType* type, bool validate = true);

ydb/library/yql/providers/dq/common/yql_dq_settings.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ TDqConfiguration::TDqConfiguration() {
9898

9999
REGISTER_SETTING(*this, _MaxAttachmentsSize);
100100
REGISTER_SETTING(*this, DisableCheckpoints);
101-
REGISTER_SETTING(*this, EnableSpillingInGraceJoin);
101+
REGISTER_SETTING(*this, EnabledSpillingNodes).Parser([](const TString& v) { return FromString<EEnabledSpillingNodes>(v); });;
102102
}
103103

104104
} // namespace NYql

ydb/library/yql/providers/dq/common/yql_dq_settings.h

+10-3
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@ struct TDqSettings {
2828
File /* "file" */,
2929
};
3030

31+
enum class EEnabledSpillingNodes {
32+
None,
33+
OnlyGraceJoin,
34+
All,
35+
};
36+
3137
struct TDefault {
3238
static constexpr ui32 MaxTasksPerStage = 20U;
3339
static constexpr ui32 MaxTasksPerOperation = 70U;
@@ -60,7 +66,7 @@ struct TDqSettings {
6066
static constexpr ui32 MaxDPccpDPTableSize = 16400U;
6167
static constexpr ui64 MaxAttachmentsSize = 2_GB;
6268
static constexpr bool SplitStageOnDqReplicate = true;
63-
static constexpr bool EnableSpillingInGraceJoin = false;
69+
static constexpr EEnabledSpillingNodes EnabledSpillingNodes = EEnabledSpillingNodes::None;
6470
};
6571

6672
using TPtr = std::shared_ptr<TDqSettings>;
@@ -132,7 +138,7 @@ struct TDqSettings {
132138
NCommon::TConfSetting<bool, false> DisableLLVMForBlockStages;
133139
NCommon::TConfSetting<bool, false> SplitStageOnDqReplicate;
134140

135-
NCommon::TConfSetting<bool, false> EnableSpillingInGraceJoin;
141+
NCommon::TConfSetting<EEnabledSpillingNodes, false> EnabledSpillingNodes;
136142

137143
NCommon::TConfSetting<ui64, false> _MaxAttachmentsSize;
138144
NCommon::TConfSetting<bool, false> DisableCheckpoints;
@@ -219,7 +225,8 @@ struct TDqSettings {
219225
}
220226

221227
bool IsSpillingInGraceJoinEnabled() const {
222-
return IsSpillingEnabled() && EnableSpillingInGraceJoin.Get().GetOrElse(false);
228+
auto enabledNodes = EnabledSpillingNodes.Get().GetOrElse(TDqSettings::TDefault::EnabledSpillingNodes);
229+
return IsSpillingEnabled() && (enabledNodes == EEnabledSpillingNodes::OnlyGraceJoin || enabledNodes == EEnabledSpillingNodes::All);
223230
}
224231

225232
bool IsDqReplicateEnabled(const TTypeAnnotationContext& typesCtx) const {

0 commit comments

Comments
 (0)