Skip to content

Commit be493a4

Browse files
Merge 24103ed into f837701
2 parents f837701 + 24103ed commit be493a4

File tree

67 files changed

+1084
-1119
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

67 files changed

+1084
-1119
lines changed

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -609,6 +609,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
609609
kqpConfig.EnableSpillingGenericQuery = serviceConfig.GetEnableQueryServiceSpilling();
610610
kqpConfig.DefaultCostBasedOptimizationLevel = serviceConfig.GetDefaultCostBasedOptimizationLevel();
611611
kqpConfig.EnableConstantFolding = serviceConfig.GetEnableConstantFolding();
612+
kqpConfig.SetDefaultEnabledSpillingNodes(serviceConfig.GetEnableSpillingNodes());
612613

613614
if (const auto limit = serviceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit()) {
614615
kqpConfig._KqpYqlCombinerMemoryLimit = std::max(1_GB, limit - (limit >> 2U));

ydb/core/kqp/compile_service/kqp_compile_service.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,8 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
536536
ui64 defaultCostBasedOptimizationLevel = TableServiceConfig.GetDefaultCostBasedOptimizationLevel();
537537
bool enableConstantFolding = TableServiceConfig.GetEnableConstantFolding();
538538

539+
TString enableSpillingNodes = TableServiceConfig.GetEnableSpillingNodes();
540+
539541
TableServiceConfig.Swap(event.MutableConfig()->MutableTableServiceConfig());
540542
LOG_INFO(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE, "Updated config");
541543

@@ -562,6 +564,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
562564
TableServiceConfig.GetExtractPredicateRangesLimit() != rangesLimit ||
563565
TableServiceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit() != mkqlHeavyLimit ||
564566
TableServiceConfig.GetIdxLookupJoinPointsLimit() != idxLookupPointsLimit ||
567+
TableServiceConfig.GetEnableSpillingNodes() != enableSpillingNodes ||
565568
TableServiceConfig.GetEnableQueryServiceSpilling() != enableQueryServiceSpilling ||
566569
TableServiceConfig.GetEnableImplicitQueryParameterTypes() != enableImplicitQueryParameterTypes ||
567570
TableServiceConfig.GetDefaultCostBasedOptimizationLevel() != defaultCostBasedOptimizationLevel ||

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,10 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
172172
runtimeSettings.UseSpilling = args.WithSpilling;
173173
runtimeSettings.StatsMode = args.StatsMode;
174174

175+
if (runtimeSettings.UseSpilling) {
176+
args.Task->SetEnableSpilling(runtimeSettings.UseSpilling);
177+
}
178+
175179
if (args.Deadline) {
176180
runtimeSettings.Timeout = args.Deadline - TAppData::TimeProvider->Now();
177181
}

ydb/core/kqp/executer_actor/kqp_planner.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::SerializeReque
196196
request.SetStartAllOrFail(true);
197197
if (UseDataQueryPool) {
198198
request.MutableRuntimeSettings()->SetExecType(NYql::NDqProto::TComputeRuntimeSettings::DATA);
199+
request.MutableRuntimeSettings()->SetUseSpilling(WithSpilling);
199200
} else {
200201
request.MutableRuntimeSettings()->SetExecType(NYql::NDqProto::TComputeRuntimeSettings::SCAN);
201202
request.MutableRuntimeSettings()->SetUseSpilling(WithSpilling);

ydb/core/kqp/provider/yql_kikimr_exec.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -815,10 +815,11 @@ class TKiSourceCallableExecutionTransformer : public TAsyncCallbackTransformer<T
815815

816816
TVector<TExprBase> fakeReads;
817817
auto paramsType = NDq::CollectParameters(programLambda, ctx);
818+
NDq::TSpillingSettings spillingSettings{SessionCtx->Config().GetEnabledSpillingNodes()};
818819
lambda = NDq::BuildProgram(
819820
programLambda, *paramsType, compiler, SessionCtx->Query().QueryData->GetAllocState()->TypeEnv,
820821
*SessionCtx->Query().QueryData->GetAllocState()->HolderFactory.GetFunctionRegistry(),
821-
ctx, fakeReads);
822+
ctx, fakeReads, spillingSettings);
822823

823824
NKikimr::NMiniKQL::TProgramBuilder programBuilder(SessionCtx->Query().QueryData->GetAllocState()->TypeEnv,
824825
*SessionCtx->Query().QueryData->GetAllocState()->HolderFactory.GetFunctionRegistry());

ydb/core/kqp/provider/yql_kikimr_settings.cpp

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
#include <ydb/core/protos/config.pb.h>
44
#include <ydb/core/protos/table_service_config.pb.h>
55
#include <util/generic/size_literals.h>
6+
#include <util/string/split.h>
7+
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
68

79
namespace NYql {
810

@@ -23,6 +25,21 @@ EOptionalFlag GetOptionalFlagValue(const TMaybe<TType>& flag) {
2325
return EOptionalFlag::Disabled;
2426
}
2527

28+
29+
ui64 ParseEnableSpillingNodes(const TString &v) {
30+
ui64 res = 0;
31+
TVector<TString> vec;
32+
StringSplitter(v).SplitBySet(",;| ").AddTo(&vec);
33+
for (auto& s: vec) {
34+
if (s.empty()) {
35+
throw yexception() << "Empty value item";
36+
}
37+
auto value = FromString<NYql::TDqSettings::EEnabledSpillingNodes>(s);
38+
res |= ui64(value);
39+
}
40+
return res;
41+
}
42+
2643
static inline bool GetFlagValue(const TMaybe<bool>& flag) {
2744
return flag ? flag.GetRef() : false;
2845
}
@@ -73,6 +90,8 @@ TKikimrConfiguration::TKikimrConfiguration() {
7390

7491
REGISTER_SETTING(*this, OptUseFinalizeByKey);
7592
REGISTER_SETTING(*this, CostBasedOptimizationLevel);
93+
REGISTER_SETTING(*this, EnableSpillingNodes)
94+
.Parser([](const TString& v) { return ParseEnableSpillingNodes(v); });
7695

7796
REGISTER_SETTING(*this, MaxDPccpDPTableSize);
7897

@@ -129,7 +148,6 @@ bool TKikimrSettings::HasOptUseFinalizeByKey() const {
129148
return GetOptionalFlagValue(OptUseFinalizeByKey.Get()) != EOptionalFlag::Disabled;
130149
}
131150

132-
133151
EOptionalFlag TKikimrSettings::GetOptPredicateExtract() const {
134152
return GetOptionalFlagValue(OptEnablePredicateExtract.Get());
135153
}
@@ -151,4 +169,12 @@ TKikimrSettings::TConstPtr TKikimrConfiguration::Snapshot() const {
151169
return std::make_shared<const TKikimrSettings>(*this);
152170
}
153171

172+
void TKikimrConfiguration::SetDefaultEnabledSpillingNodes(const TString& node) {
173+
DefaultEnableSpillingNodes = ParseEnableSpillingNodes(node);
174+
}
175+
176+
ui64 TKikimrConfiguration::GetEnabledSpillingNodes() const {
177+
return EnableSpillingNodes.Get().GetOrElse(DefaultEnableSpillingNodes);
178+
}
179+
154180
}

ydb/core/kqp/provider/yql_kikimr_settings.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ struct TKikimrSettings {
5959
NCommon::TConfSetting<TString, false> OptJoinAlgoHints;
6060
NCommon::TConfSetting<TString, false> OptJoinOrderHints;
6161

62+
6263
/* Disable optimizer rules */
6364
NCommon::TConfSetting<bool, false> OptDisableTopSort;
6465
NCommon::TConfSetting<bool, false> OptDisableSqlInToJoin;
@@ -175,6 +176,10 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi
175176
bool EnableSpillingGenericQuery = false;
176177
ui32 DefaultCostBasedOptimizationLevel = 4;
177178
bool EnableConstantFolding = true;
179+
ui64 DefaultEnableSpillingNodes = 0;
180+
181+
void SetDefaultEnabledSpillingNodes(const TString& node);
182+
ui64 GetEnabledSpillingNodes() const;
178183
};
179184

180185
}

ydb/core/kqp/proxy_service/kqp_proxy_service.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
#include <library/cpp/monlib/service/pages/templates.h>
4747
#include <library/cpp/resource/resource.h>
4848

49+
#include <util/folder/dirut.h>
4950

5051
namespace NKikimr::NKqp {
5152

@@ -236,9 +237,15 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
236237
ResourcePoolsCache.UpdateFeatureFlags(FeatureFlags, ActorContext());
237238

238239
if (auto& cfg = TableServiceConfig.GetSpillingServiceConfig().GetLocalFileConfig(); cfg.GetEnable()) {
240+
TString spillingRoot = cfg.GetRoot();
241+
if (spillingRoot.empty()) {
242+
spillingRoot = NYql::NDq::GetTmpSpillingRootForCurrentUser();
243+
MakeDirIfNotExist(spillingRoot);
244+
}
245+
239246
SpillingService = TlsActivationContext->ExecutorThread.RegisterActor(NYql::NDq::CreateDqLocalFileSpillingService(
240247
NYql::NDq::TFileSpillingServiceConfig{
241-
.Root = cfg.GetRoot(),
248+
.Root = spillingRoot,
242249
.MaxTotalSize = cfg.GetMaxTotalSize(),
243250
.MaxFileSize = cfg.GetMaxFileSize(),
244251
.MaxFilePartSize = cfg.GetMaxFilePartSize(),

ydb/core/kqp/query_compiler/kqp_query_compiler.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -777,8 +777,9 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
777777
stageProto.SetIsEffectsStage(hasEffects || hasTxTableSink);
778778

779779
auto paramsType = CollectParameters(stage, ctx);
780+
NDq::TSpillingSettings spillingSettings{Config->GetEnabledSpillingNodes()};
780781
auto programBytecode = NDq::BuildProgram(stage.Program(), *paramsType, *KqlCompiler, TypeEnv, FuncRegistry,
781-
ctx, {});
782+
ctx, {}, spillingSettings);
782783

783784
auto& programProto = *stageProto.MutableProgram();
784785
programProto.SetRuntimeVersion(NYql::NDqProto::ERuntimeVersion::RUNTIME_VERSION_YQL_1_0);

ydb/core/kqp/ut/olap/clickbench_ut.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -166,10 +166,10 @@ Y_UNIT_TEST_SUITE(KqpOlapClickbench) {
166166
GROUP BY RegionID
167167
ORDER BY c DESC
168168
LIMIT 10
169-
)");
169+
)")
170170
//.SetExpectedReply("[[[\"40999\"];[4];1u];[[\"40998\"];[3];1u];[[\"40997\"];[2];1u]]")
171-
// .SetExpectedReadNodeType("TableFullScan");
172-
// .SetExpectedReadNodeType("Aggregate-TableFullScan");
171+
.SetExpectedReadNodeType("TableFullScan");
172+
173173
q9.FillExpectedAggregationGroupByPlanOptions();
174174

175175
TAggregationTestCase q12;
@@ -214,8 +214,8 @@ Y_UNIT_TEST_SUITE(KqpOlapClickbench) {
214214
ORDER BY c DESC
215215
LIMIT 10;
216216
)")
217-
.AddExpectedPlanOptions("KqpOlapFilter");
218-
// .SetExpectedReadNodeType("TableFullScan");
217+
.AddExpectedPlanOptions("KqpOlapFilter")
218+
.SetExpectedReadNodeType("TableFullScan");
219219
q22.FillExpectedAggregationGroupByPlanOptions();
220220

221221
TAggregationTestCase q39;

ydb/core/kqp/ut/spilling/kqp_scan_spilling_ut.cpp

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,24 @@ NKikimrConfig::TAppConfig AppCfg() {
3232
return appCfg;
3333
}
3434

35+
NKikimrConfig::TAppConfig AppCfgLowComputeLimits(ui64 reasonableTreshold) {
36+
NKikimrConfig::TAppConfig appCfg;
37+
38+
auto* rm = appCfg.MutableTableServiceConfig()->MutableResourceManager();
39+
rm->SetMkqlLightProgramMemoryLimit(100);
40+
rm->SetMkqlHeavyProgramMemoryLimit(300);
41+
rm->SetReasonableSpillingTreshold(reasonableTreshold);
42+
appCfg.MutableTableServiceConfig()->SetEnableQueryServiceSpilling(true);
43+
44+
auto* spilling = appCfg.MutableTableServiceConfig()->MutableSpillingServiceConfig()->MutableLocalFileConfig();
45+
46+
spilling->SetEnable(true);
47+
spilling->SetRoot("./spilling/");
48+
49+
return appCfg;
50+
}
51+
52+
3553
} // anonymous namespace
3654

3755
Y_UNIT_TEST_SUITE(KqpScanSpilling) {
@@ -54,6 +72,48 @@ Y_UNIT_TEST(SpillingPragmaParseError) {
5472
UNIT_ASSERT_VALUES_EQUAL_C(planres.GetStatus(), EStatus::GENERIC_ERROR, planres.GetIssues().ToString());
5573
}
5674

75+
Y_UNIT_TEST_TWIN(SpillingInRuntimeNodes, EnabledSpilling) {
76+
ui64 reasonableTreshold = EnabledSpilling ? 100 : 200_MB;
77+
Cerr << "cwd: " << NFs::CurrentWorkingDirectory() << Endl;
78+
TKikimrRunner kikimr(AppCfgLowComputeLimits(reasonableTreshold));
79+
80+
auto db = kikimr.GetQueryClient();
81+
82+
for (ui32 i = 0; i < 300; ++i) {
83+
auto result = db.ExecuteQuery(Sprintf(R"(
84+
--!syntax_v1
85+
REPLACE INTO `/Root/KeyValue` (Key, Value) VALUES (%d, "%s")
86+
)", i, TString(200000 + i, 'a' + (i % 26)).c_str()), NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
87+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
88+
}
89+
90+
auto query = R"(
91+
--!syntax_v1
92+
PRAGMA ydb.EnableSpillingNodes="GraceJoin";
93+
select t1.Key, t1.Value, t2.Key, t2.Value
94+
from `/Root/KeyValue` as t1 full join `/Root/KeyValue` as t2 on t1.Value = t2.Value
95+
order by t1.Value
96+
)";
97+
98+
auto explainMode = NYdb::NQuery::TExecuteQuerySettings().ExecMode(NYdb::NQuery::EExecMode::Explain);
99+
auto planres = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx(), explainMode).ExtractValueSync();
100+
UNIT_ASSERT_VALUES_EQUAL_C(planres.GetStatus(), EStatus::SUCCESS, planres.GetIssues().ToString());
101+
102+
Cerr << planres.GetStats()->GetAst() << Endl;
103+
104+
auto result = db.ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), NYdb::NQuery::TExecuteQuerySettings()).ExtractValueSync();
105+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
106+
107+
TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
108+
if (EnabledSpilling) {
109+
UNIT_ASSERT(counters.SpillingWriteBlobs->Val() > 0);
110+
UNIT_ASSERT(counters.SpillingReadBlobs->Val() > 0);
111+
} else {
112+
UNIT_ASSERT(counters.SpillingWriteBlobs->Val() == 0);
113+
UNIT_ASSERT(counters.SpillingReadBlobs->Val() == 0);
114+
}
115+
}
116+
57117
Y_UNIT_TEST(SelfJoinQueryService) {
58118
Cerr << "cwd: " << NFs::CurrentWorkingDirectory() << Endl;
59119

ydb/core/protos/table_service_config.proto

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ message TTableServiceConfig {
5555

5656
message TSpillingServiceConfig {
5757
message TLocalFileConfig {
58-
optional bool Enable = 1 [default = false];
59-
optional string Root = 2 [default = "/tmp/kikimr_spilling/"];
58+
optional bool Enable = 1 [default = true];
59+
optional string Root = 2 [default = ""];
6060
optional uint64 MaxTotalSize = 3 [default = 21474836480]; // 20 GiB
6161
optional uint64 MaxFileSize = 4 [default = 5368709120]; // 5 GiB
6262
optional uint64 MaxFilePartSize = 5 [default = 104857600]; // 100 MB
@@ -309,4 +309,5 @@ message TTableServiceConfig {
309309

310310
optional bool EnableImplicitQueryParameterTypes = 66 [ default = false ];
311311

312+
optional string EnableSpillingNodes = 67 [ default = "All" ];
312313
};

ydb/library/yql/core/common_opt/yql_co_flow1.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1409,7 +1409,7 @@ TExprNode::TPtr OptimizeFlatMap(const TExprNode::TPtr& node, TExprContext& ctx,
14091409
return FuseFlatMapOverByKey<false>(*node, ctx);
14101410
}
14111411

1412-
if (node->Head().IsCallable({"PartitionByKey", "PartitionsByKeys"})) {
1412+
if (node->Head().IsCallable({"PartitionByKey", "PartitionsByKeys", "ShuffleByKeys"})) {
14131413
return FuseFlatMapOverByKey<true>(*node, ctx);
14141414
}
14151415
}
@@ -1477,7 +1477,7 @@ TExprNode::TPtr OptimizeFlatMap(const TExprNode::TPtr& node, TExprContext& ctx,
14771477
{
14781478
auto canPush = [&](const auto& child) {
14791479
// we push FlatMap over Extend only if it can later be fused with child
1480-
return child->IsCallable({Ordered ? "OrderedFlatMap" : "FlatMap", "GroupByKey", "CombineByKey", "PartitionByKey", "PartitionsByKeys",
1480+
return child->IsCallable({Ordered ? "OrderedFlatMap" : "FlatMap", "GroupByKey", "CombineByKey", "PartitionByKey", "PartitionsByKeys", "ShuffleByKeys",
14811481
"ListIf", "FlatListIf", "AsList", "ToList"}) && optCtx.IsSingleUsage(*child);
14821482
};
14831483
if (AllOf(node->Head().ChildrenList(), canPush)) {

ydb/library/yql/core/yql_opt_utils.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1813,7 +1813,8 @@ TExprNode::TPtr FindNonYieldTransparentNodeImpl(const TExprNode::TPtr& root, con
18131813
|| TCoForwardList::Match(node.Get())
18141814
|| TCoApply::Match(node.Get())
18151815
|| TCoSwitch::Match(node.Get())
1816-
|| node->IsCallable("DqReplicate");
1816+
|| node->IsCallable("DqReplicate")
1817+
|| TCoPartitionsByKeys::Match(node.Get());
18171818
}
18181819
);
18191820

@@ -1851,6 +1852,11 @@ TExprNode::TPtr FindNonYieldTransparentNodeImpl(const TExprNode::TPtr& root, con
18511852
return node;
18521853
}
18531854
}
1855+
} else if (TCoPartitionsByKeys::Match(candidate.Get())) {
1856+
const auto handlerChild = candidate->Child(TCoPartitionsByKeys::idx_ListHandlerLambda);
1857+
if (auto node = FindNonYieldTransparentNodeImpl(handlerChild->TailPtr(), udfSupportsYield, TNodeSet{&handlerChild->Head().Head()})) {
1858+
return node;
1859+
}
18541860
}
18551861
}
18561862
return {};

0 commit comments

Comments
 (0)