Skip to content

Commit 7cf9db7

Browse files
authored
add feature flag to enable spilling nodes (#6895)
1 parent 6064125 commit 7cf9db7

File tree

8 files changed

+100
-20
lines changed

8 files changed

+100
-20
lines changed

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -608,6 +608,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
608608
kqpConfig.EnableSpillingGenericQuery = serviceConfig.GetEnableQueryServiceSpilling();
609609
kqpConfig.DefaultCostBasedOptimizationLevel = serviceConfig.GetDefaultCostBasedOptimizationLevel();
610610
kqpConfig.EnableConstantFolding = serviceConfig.GetEnableConstantFolding();
611+
kqpConfig.SetDefaultEnabledSpillingNodes(serviceConfig.GetEnableSpillingNodes());
611612

612613
if (const auto limit = serviceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit()) {
613614
kqpConfig._KqpYqlCombinerMemoryLimit = std::max(1_GB, limit - (limit >> 2U));

ydb/core/kqp/compile_service/kqp_compile_service.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -534,6 +534,8 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
534534
ui64 defaultCostBasedOptimizationLevel = TableServiceConfig.GetDefaultCostBasedOptimizationLevel();
535535
bool enableConstantFolding = TableServiceConfig.GetEnableConstantFolding();
536536

537+
TString enableSpillingNodes = TableServiceConfig.GetEnableSpillingNodes();
538+
537539
TableServiceConfig.Swap(event.MutableConfig()->MutableTableServiceConfig());
538540
LOG_INFO(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE, "Updated config");
539541

@@ -556,6 +558,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
556558
TableServiceConfig.GetExtractPredicateRangesLimit() != rangesLimit ||
557559
TableServiceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit() != mkqlHeavyLimit ||
558560
TableServiceConfig.GetIdxLookupJoinPointsLimit() != idxLookupPointsLimit ||
561+
TableServiceConfig.GetEnableSpillingNodes() != enableSpillingNodes ||
559562
TableServiceConfig.GetEnableQueryServiceSpilling() != enableQueryServiceSpilling ||
560563
TableServiceConfig.GetDefaultCostBasedOptimizationLevel() != defaultCostBasedOptimizationLevel ||
561564
TableServiceConfig.GetEnableConstantFolding() != enableConstantFolding ||

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,10 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
166166
runtimeSettings.UseSpilling = args.WithSpilling;
167167
runtimeSettings.StatsMode = args.StatsMode;
168168

169+
if (runtimeSettings.UseSpilling) {
170+
args.Task->SetEnableSpilling(runtimeSettings.UseSpilling);
171+
}
172+
169173
if (args.Deadline) {
170174
runtimeSettings.Timeout = args.Deadline - TAppData::TimeProvider->Now();
171175
}

ydb/core/kqp/executer_actor/kqp_planner.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::SerializeReque
194194
request.SetStartAllOrFail(true);
195195
if (UseDataQueryPool) {
196196
request.MutableRuntimeSettings()->SetExecType(NYql::NDqProto::TComputeRuntimeSettings::DATA);
197+
request.MutableRuntimeSettings()->SetUseSpilling(WithSpilling);
197198
} else {
198199
request.MutableRuntimeSettings()->SetExecType(NYql::NDqProto::TComputeRuntimeSettings::SCAN);
199200
request.MutableRuntimeSettings()->SetUseSpilling(WithSpilling);

ydb/core/kqp/provider/yql_kikimr_settings.cpp

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,22 @@ EOptionalFlag GetOptionalFlagValue(const TMaybe<TType>& flag) {
2525
return EOptionalFlag::Disabled;
2626
}
2727

28+
29+
ui64 ParseEnableSpillingNodes(const TString &v) {
30+
ui64 res = 0;
31+
TVector<TString> vec;
32+
StringSplitter(v).SplitBySet(",;| ").AddTo(&vec);
33+
for (auto& s: vec) {
34+
if (s.empty()) {
35+
throw yexception() << "Empty value item";
36+
}
37+
auto value = FromStringWithDefault<NYql::TDqSettings::EEnabledSpillingNodes>(
38+
s, NYql::TDqSettings::EEnabledSpillingNodes::None);
39+
res |= ui64(value);
40+
}
41+
return res;
42+
}
43+
2844
static inline bool GetFlagValue(const TMaybe<bool>& flag) {
2945
return flag ? flag.GetRef() : false;
3046
}
@@ -73,20 +89,7 @@ TKikimrConfiguration::TKikimrConfiguration() {
7389
REGISTER_SETTING(*this, OptUseFinalizeByKey);
7490
REGISTER_SETTING(*this, CostBasedOptimizationLevel);
7591
REGISTER_SETTING(*this, EnableSpillingNodes)
76-
.Parser([](const TString& v) {
77-
ui64 res = 0;
78-
TVector<TString> vec;
79-
StringSplitter(v).SplitBySet(",;| ").AddTo(&vec);
80-
for (auto& s: vec) {
81-
if (s.empty()) {
82-
throw yexception() << "Empty value item";
83-
}
84-
auto value = FromStringWithDefault<NYql::TDqSettings::EEnabledSpillingNodes>(
85-
s, NYql::TDqSettings::EEnabledSpillingNodes::None);
86-
res |= ui64(value);
87-
}
88-
return res;
89-
});
92+
.Parser([](const TString& v) { return ParseEnableSpillingNodes(v); });
9093

9194
REGISTER_SETTING(*this, MaxDPccpDPTableSize);
9295

@@ -143,11 +146,6 @@ bool TKikimrSettings::HasOptUseFinalizeByKey() const {
143146
return GetOptionalFlagValue(OptUseFinalizeByKey.Get()) != EOptionalFlag::Disabled;
144147
}
145148

146-
ui64 TKikimrSettings::GetEnabledSpillingNodes() const {
147-
return EnableSpillingNodes.Get().GetOrElse(0);
148-
}
149-
150-
151149
EOptionalFlag TKikimrSettings::GetOptPredicateExtract() const {
152150
return GetOptionalFlagValue(OptEnablePredicateExtract.Get());
153151
}
@@ -169,4 +167,12 @@ TKikimrSettings::TConstPtr TKikimrConfiguration::Snapshot() const {
169167
return std::make_shared<const TKikimrSettings>(*this);
170168
}
171169

170+
void TKikimrConfiguration::SetDefaultEnabledSpillingNodes(const TString& node) {
171+
DefaultEnableSpillingNodes = ParseEnableSpillingNodes(node);
172+
}
173+
174+
ui64 TKikimrConfiguration::GetEnabledSpillingNodes() const {
175+
return EnableSpillingNodes.Get().GetOrElse(DefaultEnableSpillingNodes);
176+
}
177+
172178
}

ydb/core/kqp/provider/yql_kikimr_settings.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@ struct TKikimrSettings {
8484
bool HasOptEnableOlapPushdown() const;
8585
bool HasOptEnableOlapProvideComputeSharding() const;
8686
bool HasOptUseFinalizeByKey() const;
87-
ui64 GetEnabledSpillingNodes() const;
8887

8988
EOptionalFlag GetOptPredicateExtract() const;
9089
EOptionalFlag GetUseLlvm() const;
@@ -167,6 +166,10 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi
167166
bool EnableSpillingGenericQuery = false;
168167
ui32 DefaultCostBasedOptimizationLevel = 3;
169168
bool EnableConstantFolding = true;
169+
ui64 DefaultEnableSpillingNodes = 0;
170+
171+
void SetDefaultEnabledSpillingNodes(const TString& node);
172+
ui64 GetEnabledSpillingNodes() const;
170173
};
171174

172175
}

ydb/core/kqp/ut/spilling/kqp_scan_spilling_ut.cpp

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,70 @@ NKikimrConfig::TAppConfig AppCfg() {
3232
return appCfg;
3333
}
3434

35+
NKikimrConfig::TAppConfig AppCfgLowComputeLimits(ui64 reasonableTreshold) {
36+
NKikimrConfig::TAppConfig appCfg;
37+
38+
auto* rm = appCfg.MutableTableServiceConfig()->MutableResourceManager();
39+
rm->SetMkqlLightProgramMemoryLimit(100);
40+
rm->SetMkqlHeavyProgramMemoryLimit(300);
41+
rm->SetReasonableSpillingTreshold(reasonableTreshold);
42+
appCfg.MutableTableServiceConfig()->SetEnableQueryServiceSpilling(true);
43+
44+
auto* spilling = appCfg.MutableTableServiceConfig()->MutableSpillingServiceConfig()->MutableLocalFileConfig();
45+
46+
spilling->SetEnable(true);
47+
spilling->SetRoot("./spilling/");
48+
49+
return appCfg;
50+
}
51+
52+
3553
} // anonymous namespace
3654

3755
Y_UNIT_TEST_SUITE(KqpScanSpilling) {
3856

57+
Y_UNIT_TEST_TWIN(SpillingInRuntimeNodes, EnabledSpilling) {
58+
ui64 reasonableTreshold = EnabledSpilling ? 100 : 200_MB;
59+
Cerr << "cwd: " << NFs::CurrentWorkingDirectory() << Endl;
60+
TKikimrRunner kikimr(AppCfgLowComputeLimits(reasonableTreshold));
61+
62+
auto db = kikimr.GetQueryClient();
63+
64+
for (ui32 i = 0; i < 300; ++i) {
65+
auto result = db.ExecuteQuery(Sprintf(R"(
66+
--!syntax_v1
67+
REPLACE INTO `/Root/KeyValue` (Key, Value) VALUES (%d, "%s")
68+
)", i, TString(200000 + i, 'a' + (i % 26)).c_str()), NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
69+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
70+
}
71+
72+
auto query = R"(
73+
--!syntax_v1
74+
PRAGMA ydb.EnableSpillingNodes="GraceJoin";
75+
select t1.Key, t1.Value, t2.Key, t2.Value
76+
from `/Root/KeyValue` as t1 full join `/Root/KeyValue` as t2 on t1.Value = t2.Value
77+
order by t1.Value
78+
)";
79+
80+
auto explainMode = NYdb::NQuery::TExecuteQuerySettings().ExecMode(NYdb::NQuery::EExecMode::Explain);
81+
auto planres = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx(), explainMode).ExtractValueSync();
82+
UNIT_ASSERT_VALUES_EQUAL_C(planres.GetStatus(), EStatus::SUCCESS, planres.GetIssues().ToString());
83+
84+
Cerr << planres.GetStats()->GetAst() << Endl;
85+
86+
auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), NYdb::NQuery::TExecuteQuerySettings()).ExtractValueSync();
87+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
88+
89+
TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
90+
if (EnabledSpilling) {
91+
UNIT_ASSERT(counters.SpillingWriteBlobs->Val() > 0);
92+
UNIT_ASSERT(counters.SpillingReadBlobs->Val() > 0);
93+
} else {
94+
UNIT_ASSERT(counters.SpillingWriteBlobs->Val() == 0);
95+
UNIT_ASSERT(counters.SpillingReadBlobs->Val() == 0);
96+
}
97+
}
98+
3999
Y_UNIT_TEST(SelfJoinQueryService) {
40100
Cerr << "cwd: " << NFs::CurrentWorkingDirectory() << Endl;
41101

ydb/core/protos/table_service_config.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,4 +300,6 @@ message TTableServiceConfig {
300300
optional bool EnableConstantFolding = 65 [ default = true ];
301301

302302
optional bool EnableImplicitQueryParameterTypes = 66 [ default = true ];
303+
304+
optional string EnableSpillingNodes = 67 [ default = "All" ];
303305
};

0 commit comments

Comments
 (0)