Skip to content

Commit 6d64542

Browse files
gridnevvvitlll-phill-lll
authored andcommitted
add feature flag to enable spilling nodes (ydb-platform#6895)
1 parent 69a91af commit 6d64542

File tree

7 files changed

+84
-24
lines changed

7 files changed

+84
-24
lines changed

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -609,6 +609,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
609609
kqpConfig.EnableSpillingGenericQuery = serviceConfig.GetEnableQueryServiceSpilling();
610610
kqpConfig.DefaultCostBasedOptimizationLevel = serviceConfig.GetDefaultCostBasedOptimizationLevel();
611611
kqpConfig.EnableConstantFolding = serviceConfig.GetEnableConstantFolding();
612+
kqpConfig.SetDefaultEnabledSpillingNodes(serviceConfig.GetEnableSpillingNodes());
612613

613614
if (const auto limit = serviceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit()) {
614615
kqpConfig._KqpYqlCombinerMemoryLimit = std::max(1_GB, limit - (limit >> 2U));

ydb/core/kqp/compile_service/kqp_compile_service.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,8 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
536536
ui64 defaultCostBasedOptimizationLevel = TableServiceConfig.GetDefaultCostBasedOptimizationLevel();
537537
bool enableConstantFolding = TableServiceConfig.GetEnableConstantFolding();
538538

539+
TString enableSpillingNodes = TableServiceConfig.GetEnableSpillingNodes();
540+
539541
TableServiceConfig.Swap(event.MutableConfig()->MutableTableServiceConfig());
540542
LOG_INFO(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE, "Updated config");
541543

@@ -562,6 +564,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
562564
TableServiceConfig.GetExtractPredicateRangesLimit() != rangesLimit ||
563565
TableServiceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit() != mkqlHeavyLimit ||
564566
TableServiceConfig.GetIdxLookupJoinPointsLimit() != idxLookupPointsLimit ||
567+
TableServiceConfig.GetEnableSpillingNodes() != enableSpillingNodes ||
565568
TableServiceConfig.GetEnableQueryServiceSpilling() != enableQueryServiceSpilling ||
566569
TableServiceConfig.GetEnableImplicitQueryParameterTypes() != enableImplicitQueryParameterTypes ||
567570
TableServiceConfig.GetDefaultCostBasedOptimizationLevel() != defaultCostBasedOptimizationLevel ||

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,10 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
172172
runtimeSettings.UseSpilling = args.WithSpilling;
173173
runtimeSettings.StatsMode = args.StatsMode;
174174

175+
if (runtimeSettings.UseSpilling) {
176+
args.Task->SetEnableSpilling(runtimeSettings.UseSpilling);
177+
}
178+
175179
if (args.Deadline) {
176180
runtimeSettings.Timeout = args.Deadline - TAppData::TimeProvider->Now();
177181
}

ydb/core/kqp/executer_actor/kqp_planner.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::SerializeReque
204204
request.SetStartAllOrFail(true);
205205
if (UseDataQueryPool) {
206206
request.MutableRuntimeSettings()->SetExecType(NYql::NDqProto::TComputeRuntimeSettings::DATA);
207+
request.MutableRuntimeSettings()->SetUseSpilling(WithSpilling);
207208
} else {
208209
request.MutableRuntimeSettings()->SetExecType(NYql::NDqProto::TComputeRuntimeSettings::SCAN);
209210
request.MutableRuntimeSettings()->SetUseSpilling(WithSpilling);

ydb/core/kqp/provider/yql_kikimr_settings.cpp

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,22 @@ EOptionalFlag GetOptionalFlagValue(const TMaybe<TType>& flag) {
2525
return EOptionalFlag::Disabled;
2626
}
2727

28+
29+
ui64 ParseEnableSpillingNodes(const TString &v) {
30+
ui64 res = 0;
31+
TVector<TString> vec;
32+
StringSplitter(v).SplitBySet(",;| ").AddTo(&vec);
33+
for (auto& s: vec) {
34+
if (s.empty()) {
35+
throw yexception() << "Empty value item";
36+
}
37+
auto value = FromStringWithDefault<NYql::TDqSettings::EEnabledSpillingNodes>(
38+
s, NYql::TDqSettings::EEnabledSpillingNodes::None);
39+
res |= ui64(value);
40+
}
41+
return res;
42+
}
43+
2844
static inline bool GetFlagValue(const TMaybe<bool>& flag) {
2945
return flag ? flag.GetRef() : false;
3046
}
@@ -76,20 +92,7 @@ TKikimrConfiguration::TKikimrConfiguration() {
7692
REGISTER_SETTING(*this, OptUseFinalizeByKey);
7793
REGISTER_SETTING(*this, CostBasedOptimizationLevel);
7894
REGISTER_SETTING(*this, EnableSpillingNodes)
79-
.Parser([](const TString& v) {
80-
ui64 res = 0;
81-
TVector<TString> vec;
82-
StringSplitter(v).SplitBySet(",;| ").AddTo(&vec);
83-
for (auto& s: vec) {
84-
if (s.empty()) {
85-
throw yexception() << "Empty value item";
86-
}
87-
auto value = FromStringWithDefault<NYql::TDqSettings::EEnabledSpillingNodes>(
88-
s, NYql::TDqSettings::EEnabledSpillingNodes::None);
89-
res |= ui64(value);
90-
}
91-
return res;
92-
});
95+
.Parser([](const TString& v) { return ParseEnableSpillingNodes(v); });
9396

9497
REGISTER_SETTING(*this, MaxDPccpDPTableSize);
9598

@@ -146,11 +149,6 @@ bool TKikimrSettings::HasOptUseFinalizeByKey() const {
146149
return GetOptionalFlagValue(OptUseFinalizeByKey.Get()) != EOptionalFlag::Disabled;
147150
}
148151

149-
ui64 TKikimrSettings::GetEnabledSpillingNodes() const {
150-
return EnableSpillingNodes.Get().GetOrElse(0);
151-
}
152-
153-
154152
EOptionalFlag TKikimrSettings::GetOptPredicateExtract() const {
155153
return GetOptionalFlagValue(OptEnablePredicateExtract.Get());
156154
}
@@ -172,4 +170,12 @@ TKikimrSettings::TConstPtr TKikimrConfiguration::Snapshot() const {
172170
return std::make_shared<const TKikimrSettings>(*this);
173171
}
174172

173+
void TKikimrConfiguration::SetDefaultEnabledSpillingNodes(const TString& node) {
174+
DefaultEnableSpillingNodes = ParseEnableSpillingNodes(node);
175+
}
176+
177+
ui64 TKikimrConfiguration::GetEnabledSpillingNodes() const {
178+
return EnableSpillingNodes.Get().GetOrElse(DefaultEnableSpillingNodes);
179+
}
180+
175181
}

ydb/core/kqp/provider/yql_kikimr_settings.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,6 @@ struct TKikimrSettings {
9090
bool HasOptEnableOlapPushdown() const;
9191
bool HasOptEnableOlapProvideComputeSharding() const;
9292
bool HasOptUseFinalizeByKey() const;
93-
ui64 GetEnabledSpillingNodes() const;
9493

9594
EOptionalFlag GetOptPredicateExtract() const;
9695
EOptionalFlag GetUseLlvm() const;
@@ -177,6 +176,10 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi
177176
bool EnableSpillingGenericQuery = false;
178177
ui32 DefaultCostBasedOptimizationLevel = 4;
179178
bool EnableConstantFolding = true;
179+
ui64 DefaultEnableSpillingNodes = 0;
180+
181+
void SetDefaultEnabledSpillingNodes(const TString& node);
182+
ui64 GetEnabledSpillingNodes() const;
180183
};
181184

182185
}

ydb/core/kqp/ut/spilling/kqp_scan_spilling_ut.cpp

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,26 +32,68 @@ NKikimrConfig::TAppConfig AppCfg() {
3232
return appCfg;
3333
}
3434

35+
NKikimrConfig::TAppConfig AppCfgLowComputeLimits(ui64 reasonableTreshold) {
36+
NKikimrConfig::TAppConfig appCfg;
37+
38+
auto* rm = appCfg.MutableTableServiceConfig()->MutableResourceManager();
39+
rm->SetMkqlLightProgramMemoryLimit(100);
40+
rm->SetMkqlHeavyProgramMemoryLimit(300);
41+
rm->SetReasonableSpillingTreshold(reasonableTreshold);
42+
appCfg.MutableTableServiceConfig()->SetEnableQueryServiceSpilling(true);
43+
44+
auto* spilling = appCfg.MutableTableServiceConfig()->MutableSpillingServiceConfig()->MutableLocalFileConfig();
45+
46+
spilling->SetEnable(true);
47+
spilling->SetRoot("./spilling/");
48+
49+
return appCfg;
50+
}
51+
52+
3553
} // anonymous namespace
3654

3755
Y_UNIT_TEST_SUITE(KqpScanSpilling) {
3856

39-
Y_UNIT_TEST(SpillingPragmaParseError) {
57+
Y_UNIT_TEST_TWIN(SpillingInRuntimeNodes, EnabledSpilling) {
58+
ui64 reasonableTreshold = EnabledSpilling ? 100 : 200_MB;
4059
Cerr << "cwd: " << NFs::CurrentWorkingDirectory() << Endl;
41-
TKikimrRunner kikimr(AppCfg());
60+
TKikimrRunner kikimr(AppCfgLowComputeLimits(reasonableTreshold));
4261

4362
auto db = kikimr.GetQueryClient();
63+
64+
for (ui32 i = 0; i < 300; ++i) {
65+
auto result = db.ExecuteQuery(Sprintf(R"(
66+
--!syntax_v1
67+
REPLACE INTO `/Root/KeyValue` (Key, Value) VALUES (%d, "%s")
68+
)", i, TString(200000 + i, 'a' + (i % 26)).c_str()), NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
69+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
70+
}
71+
4472
auto query = R"(
4573
--!syntax_v1
46-
PRAGMA ydb.EnableSpillingNodes="GraceJoin1";
74+
PRAGMA ydb.EnableSpillingNodes="GraceJoin";
4775
select t1.Key, t1.Value, t2.Key, t2.Value
4876
from `/Root/KeyValue` as t1 full join `/Root/KeyValue` as t2 on t1.Value = t2.Value
4977
order by t1.Value
5078
)";
5179

5280
auto explainMode = NYdb::NQuery::TExecuteQuerySettings().ExecMode(NYdb::NQuery::EExecMode::Explain);
5381
auto planres = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx(), explainMode).ExtractValueSync();
54-
UNIT_ASSERT_VALUES_EQUAL_C(planres.GetStatus(), EStatus::GENERIC_ERROR, planres.GetIssues().ToString());
82+
UNIT_ASSERT_VALUES_EQUAL_C(planres.GetStatus(), EStatus::SUCCESS, planres.GetIssues().ToString());
83+
84+
Cerr << planres.GetStats()->GetAst() << Endl;
85+
86+
auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), NYdb::NQuery::TExecuteQuerySettings()).ExtractValueSync();
87+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
88+
89+
TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
90+
if (EnabledSpilling) {
91+
UNIT_ASSERT(counters.SpillingWriteBlobs->Val() > 0);
92+
UNIT_ASSERT(counters.SpillingReadBlobs->Val() > 0);
93+
} else {
94+
UNIT_ASSERT(counters.SpillingWriteBlobs->Val() == 0);
95+
UNIT_ASSERT(counters.SpillingReadBlobs->Val() == 0);
96+
}
5597
}
5698

5799
Y_UNIT_TEST(SelfJoinQueryService) {

0 commit comments

Comments
 (0)