Skip to content

Commit 0e03b2f

Browse files
split SpillingEngine and channels spilling (#6406)
1 parent 40256f3 commit 0e03b2f

File tree

4 files changed

+20
-10
lines changed

4 files changed

+20
-10
lines changed

ydb/library/yql/providers/dq/common/yql_dq_settings.cpp

+6-4
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,12 @@ TDqConfiguration::TDqConfiguration() {
8080
REGISTER_SETTING(*this, SpillingEngine)
8181
.Parser([](const TString& v) {
8282
return FromString<TDqSettings::ESpillingEngine>(v);
83-
})
84-
.ValueSetter([this](const TString&, TDqSettings::ESpillingEngine value) {
85-
SpillingEngine = value;
86-
if (value != TDqSettings::ESpillingEngine::Disable) {
83+
});
84+
85+
REGISTER_SETTING(*this, EnableSpillingInChannels)
86+
.ValueSetter([this](const TString&, bool value) {
87+
EnableSpillingInChannels = value;
88+
if (value) {
8789
SplitStageOnDqReplicate = false;
8890
EnableDqReplicate = true;
8991
}

ydb/library/yql/providers/dq/common/yql_dq_settings.h

+11-3
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ struct TDqSettings {
6666
static constexpr ui64 MaxAttachmentsSize = 2_GB;
6767
static constexpr bool SplitStageOnDqReplicate = true;
6868
static constexpr ui64 EnableSpillingNodes = 0;
69+
static constexpr bool EnableSpillingInChannels = false;
6970
};
7071

7172
using TPtr = std::shared_ptr<TDqSettings>;
@@ -138,6 +139,7 @@ struct TDqSettings {
138139
NCommon::TConfSetting<bool, false> SplitStageOnDqReplicate;
139140

140141
NCommon::TConfSetting<ui64, false> EnableSpillingNodes;
142+
NCommon::TConfSetting<bool, false> EnableSpillingInChannels;
141143

142144
NCommon::TConfSetting<ui64, false> _MaxAttachmentsSize;
143145
NCommon::TConfSetting<bool, false> DisableCheckpoints;
@@ -193,6 +195,7 @@ struct TDqSettings {
193195
SAVE_SETTING(ExportStats);
194196
SAVE_SETTING(TaskRunnerStats);
195197
SAVE_SETTING(SpillingEngine);
198+
SAVE_SETTING(EnableSpillingInChannels);
196199
SAVE_SETTING(DisableCheckpoints);
197200
#undef SAVE_SETTING
198201
}
@@ -219,13 +222,18 @@ struct TDqSettings {
219222
}
220223
}
221224

222-
bool IsSpillingEnabled() const {
225+
bool IsSpillingEngineEnabled() const {
223226
return SpillingEngine.Get().GetOrElse(TDqSettings::TDefault::SpillingEngine) != ESpillingEngine::Disable;
224227
}
225228

229+
bool IsSpillingInChannelsEnabled() const {
230+
if (!IsSpillingEngineEnabled()) return false;
231+
return EnableSpillingInChannels.Get().GetOrElse(TDqSettings::TDefault::EnableSpillingInChannels) != false;
232+
}
233+
226234
ui64 GetEnabledSpillingNodes() const {
227-
if (!IsSpillingEnabled()) return 0;
228-
return EnableSpillingNodes.Get().GetOrElse(0);
235+
if (!IsSpillingEngineEnabled()) return 0;
236+
return EnableSpillingNodes.Get().GetOrElse(TDqSettings::TDefault::EnableSpillingNodes);
229237
}
230238

231239
bool IsDqReplicateEnabled(const TTypeAnnotationContext& typesCtx) const {

ydb/library/yql/providers/dq/planner/execution_planner.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,7 @@ namespace NYql::NDqs {
433433

434434
bool enableSpilling = false;
435435
if (task.Outputs.size() > 1) {
436-
enableSpilling = Settings->IsSpillingEnabled();
436+
enableSpilling = Settings->IsSpillingInChannelsEnabled();
437437
}
438438
for (auto& output : task.Outputs) {
439439
FillOutputDesc(*taskDesc.AddOutputs(), output, enableSpilling);
@@ -448,7 +448,7 @@ namespace NYql::NDqs {
448448
taskMeta.SetStageId(publicId);
449449
taskDesc.MutableMeta()->PackFrom(taskMeta);
450450
taskDesc.SetStageId(stageId);
451-
taskDesc.SetEnableSpilling(Settings->IsSpillingEnabled());
451+
taskDesc.SetEnableSpilling(Settings->GetEnabledSpillingNodes());
452452

453453
if (Settings->DisableLLVMForBlockStages.Get().GetOrElse(true)) {
454454
auto& stage = TasksGraph.GetStageInfo(task.StageId).Meta.Stage;

ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ class TDqExecutionValidator {
147147
, State_(state)
148148
, CheckSelfJoin_(!TypeCtx_.ForceDq
149149
&& !State_->Settings->SplitStageOnDqReplicate.Get().GetOrElse(TDqSettings::TDefault::SplitStageOnDqReplicate)
150-
&& !State_->Settings->IsSpillingEnabled())
150+
&& !State_->Settings->IsSpillingInChannelsEnabled())
151151
{}
152152

153153
bool ValidateDqExecution(const TExprNode& node) {

0 commit comments

Comments
 (0)