Skip to content

Commit 4687358

Browse files
Merge 9269127 into 04ee172
2 parents 04ee172 + 9269127 commit 4687358

File tree

117 files changed

+1879
-950
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

117 files changed

+1879
-950
lines changed

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -609,6 +609,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
609609
kqpConfig.EnableSpillingGenericQuery = serviceConfig.GetEnableQueryServiceSpilling();
610610
kqpConfig.DefaultCostBasedOptimizationLevel = serviceConfig.GetDefaultCostBasedOptimizationLevel();
611611
kqpConfig.EnableConstantFolding = serviceConfig.GetEnableConstantFolding();
612+
kqpConfig.SetDefaultEnabledSpillingNodes(serviceConfig.GetEnableSpillingNodes());
612613

613614
if (const auto limit = serviceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit()) {
614615
kqpConfig._KqpYqlCombinerMemoryLimit = std::max(1_GB, limit - (limit >> 2U));

ydb/core/kqp/compile_service/kqp_compile_service.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,8 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
536536
ui64 defaultCostBasedOptimizationLevel = TableServiceConfig.GetDefaultCostBasedOptimizationLevel();
537537
bool enableConstantFolding = TableServiceConfig.GetEnableConstantFolding();
538538

539+
TString enableSpillingNodes = TableServiceConfig.GetEnableSpillingNodes();
540+
539541
TableServiceConfig.Swap(event.MutableConfig()->MutableTableServiceConfig());
540542
LOG_INFO(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE, "Updated config");
541543

@@ -562,6 +564,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
562564
TableServiceConfig.GetExtractPredicateRangesLimit() != rangesLimit ||
563565
TableServiceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit() != mkqlHeavyLimit ||
564566
TableServiceConfig.GetIdxLookupJoinPointsLimit() != idxLookupPointsLimit ||
567+
TableServiceConfig.GetEnableSpillingNodes() != enableSpillingNodes ||
565568
TableServiceConfig.GetEnableQueryServiceSpilling() != enableQueryServiceSpilling ||
566569
TableServiceConfig.GetEnableImplicitQueryParameterTypes() != enableImplicitQueryParameterTypes ||
567570
TableServiceConfig.GetDefaultCostBasedOptimizationLevel() != defaultCostBasedOptimizationLevel ||

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp

+7-8
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,13 @@ struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
1414
, std::shared_ptr<IKqpNodeState> state
1515
, TIntrusivePtr<NRm::TTxState> tx
1616
, TIntrusivePtr<NRm::TTaskState> task
17-
, ui64 limit
18-
, ui64 reasonableSpillingTreshold)
17+
, ui64 limit)
1918
: NYql::NDq::TGuaranteeQuotaManager(limit, limit)
2019
, ResourceManager(std::move(resourceManager))
2120
, MemoryPool(memoryPool)
2221
, State(std::move(state))
2322
, Tx(std::move(tx))
2423
, Task(std::move(task))
25-
, ReasonableSpillingTreshold(reasonableSpillingTreshold)
2624
{
2725
}
2826

@@ -57,7 +55,7 @@ struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
5755
}
5856

5957
bool IsReasonableToUseSpilling() const override {
60-
return Tx->GetExtraMemoryAllocatedSize() >= ReasonableSpillingTreshold;
58+
return Task->IsReasonableToStartSpilling();
6159
}
6260

6361
TString MemoryConsumptionDetails() const override {
@@ -88,7 +86,6 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
8886
std::atomic<ui64> MkqlLightProgramMemoryLimit = 0;
8987
std::atomic<ui64> MkqlHeavyProgramMemoryLimit = 0;
9088
std::atomic<ui64> MinChannelBufferSize = 0;
91-
std::atomic<ui64> ReasonableSpillingTreshold = 0;
9289
std::atomic<ui64> MinMemAllocSize = 8_MB;
9390
std::atomic<ui64> MinMemFreeSize = 32_MB;
9491

@@ -109,7 +106,6 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
109106
MkqlLightProgramMemoryLimit.store(config.GetMkqlLightProgramMemoryLimit());
110107
MkqlHeavyProgramMemoryLimit.store(config.GetMkqlHeavyProgramMemoryLimit());
111108
MinChannelBufferSize.store(config.GetMinChannelBufferSize());
112-
ReasonableSpillingTreshold.store(config.GetReasonableSpillingTreshold());
113109
MinMemAllocSize.store(config.GetMinMemAllocSize());
114110
MinMemFreeSize.store(config.GetMinMemFreeSize());
115111
}
@@ -164,14 +160,17 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
164160
std::move(args.State),
165161
std::move(args.TxInfo),
166162
std::move(task),
167-
limit,
168-
ReasonableSpillingTreshold.load());
163+
limit);
169164

170165
auto runtimeSettings = args.RuntimeSettings;
171166
runtimeSettings.ExtraMemoryAllocationPool = args.MemoryPool;
172167
runtimeSettings.UseSpilling = args.WithSpilling;
173168
runtimeSettings.StatsMode = args.StatsMode;
174169

170+
if (runtimeSettings.UseSpilling) {
171+
args.Task->SetEnableSpilling(runtimeSettings.UseSpilling);
172+
}
173+
175174
if (args.Deadline) {
176175
runtimeSettings.Timeout = args.Deadline - TAppData::TimeProvider->Now();
177176
}

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h

+1
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ struct IKqpNodeComputeActorFactory {
124124
const TInstant& Deadline;
125125
const bool ShareMailbox;
126126
const TMaybe<NYql::NDqProto::TRlPath>& RlPath;
127+
127128
TComputeStagesWithScan* ComputesByStages = nullptr;
128129
std::shared_ptr<IKqpNodeState> State = nullptr;
129130
TComputeActorSchedulingOptions SchedulingOptions = {};

ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ using namespace NYql::NDq;
1414

1515
class TKqpTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContext {
1616
public:
17-
TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp)
18-
: TDqTaskRunnerExecutionContext(txId, std::move(wakeUp))
17+
TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback)
18+
: TDqTaskRunnerExecutionContext(txId, std::move(wakeUpCallback), std::move(errorCallback))
1919
, WithSpilling_(withSpilling)
2020
{
2121
}

ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp

+3-2
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,10 @@ void TKqpComputeActor::DoBootstrap() {
7272
auto taskRunner = MakeDqTaskRunner(TBase::GetAllocatorPtr(), execCtx, settings, logger);
7373
SetTaskRunner(taskRunner);
7474

75-
auto wakeup = [this]{ ContinueExecute(); };
75+
auto wakeupCallback = [this]{ ContinueExecute(); };
76+
auto errorCallback = [this](const TString& error){ SendError(error); };
7677
try {
77-
PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup)));
78+
PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeupCallback), std::move(errorCallback)));
7879
} catch (const NMiniKQL::TKqpEnsureFail& e) {
7980
InternalError((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage());
8081
return;

ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,8 @@ void TKqpScanComputeActor::DoBootstrap() {
243243
TBase::SetTaskRunner(taskRunner);
244244

245245
auto wakeup = [this] { ContinueExecute(); };
246-
TBase::PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup)));
246+
auto errorCallback = [this](const TString& error){ SendError(error); };
247+
TBase::PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup), std::move(errorCallback)));
247248

248249
ComputeCtx.AddTableScan(0, Meta, GetStatsMode());
249250
ScanData = &ComputeCtx.GetTableScan(0);

ydb/core/kqp/executer_actor/kqp_planner.cpp

+9-2
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::SerializeReque
204204
request.SetStartAllOrFail(true);
205205
if (UseDataQueryPool) {
206206
request.MutableRuntimeSettings()->SetExecType(NYql::NDqProto::TComputeRuntimeSettings::DATA);
207+
request.MutableRuntimeSettings()->SetUseSpilling(WithSpilling);
207208
} else {
208209
request.MutableRuntimeSettings()->SetExecType(NYql::NDqProto::TComputeRuntimeSettings::SCAN);
209210
request.MutableRuntimeSettings()->SetUseSpilling(WithSpilling);
@@ -432,8 +433,14 @@ TString TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize)
432433
NYql::NDqProto::TDqTask* taskDesc = ArenaSerializeTaskToProto(TasksGraph, task, true);
433434
NYql::NDq::TComputeRuntimeSettings settings;
434435
if (!TxInfo) {
436+
double memoryPoolPercent = 100;
437+
if (UserRequestContext->PoolConfig.has_value()) {
438+
memoryPoolPercent = UserRequestContext->PoolConfig->QueryMemoryLimitPercentPerNode;
439+
}
440+
435441
TxInfo = MakeIntrusive<NRm::TTxState>(
436-
TxId, TInstant::Now(), ResourceManager_->GetCounters());
442+
TxId, TInstant::Now(), ResourceManager_->GetCounters(),
443+
UserRequestContext->PoolId, memoryPoolPercent, Database);
437444
}
438445

439446
auto startResult = CaFactory_->CreateKqpComputeActor({
@@ -454,7 +461,7 @@ TString TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize)
454461
.StatsMode = GetDqStatsMode(StatsMode),
455462
.Deadline = Deadline,
456463
.ShareMailbox = (computeTasksSize <= 1),
457-
.RlPath = Nothing()
464+
.RlPath = Nothing(),
458465
});
459466

460467
if (const auto* rmResult = std::get_if<NRm::TKqpRMAllocateResult>(&startResult)) {

ydb/core/kqp/node_service/kqp_node_service.cpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,9 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
200200
}
201201

202202
TIntrusivePtr<NRm::TTxState> txInfo = MakeIntrusive<NRm::TTxState>(
203-
txId, TInstant::Now(), ResourceManager_->GetCounters());
203+
txId, TInstant::Now(), ResourceManager_->GetCounters(),
204+
msg.GetSchedulerGroup(), msg.GetMemoryPoolPercent(),
205+
msg.GetDatabase());
204206

205207
const ui32 tasksCount = msg.GetTasks().size();
206208
for (auto& dqTask: *msg.MutableTasks()) {

ydb/core/kqp/opt/logical/kqp_opt_log.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
141141
true, // defaultWatermarksMode
142142
true); // syncActor
143143
} else {
144-
output = DqRewriteAggregate(node, ctx, TypesCtx, false, KqpCtx.Config->HasOptEnableOlapPushdown() || KqpCtx.Config->HasOptUseFinalizeByKey(), KqpCtx.Config->HasOptUseFinalizeByKey());
144+
NDq::TSpillingSettings spillingSettings(KqpCtx.Config->GetEnabledSpillingNodes());
145+
output = DqRewriteAggregate(node, ctx, TypesCtx, false, KqpCtx.Config->HasOptEnableOlapPushdown() || KqpCtx.Config->HasOptUseFinalizeByKey(), KqpCtx.Config->HasOptUseFinalizeByKey(), spillingSettings.IsAggregationSpillingEnabled());
145146
}
146147
if (output) {
147148
DumpAppliedRule("RewriteAggregate", node.Ptr(), output.Cast().Ptr(), ctx);

ydb/core/kqp/opt/physical/kqp_opt_phy.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,8 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
252252
}
253253

254254
TMaybeNode<TExprBase> ExpandAggregatePhase(TExprBase node, TExprContext& ctx) {
255-
auto output = ExpandAggregatePeepholeImpl(node.Ptr(), ctx, TypesCtx, KqpCtx.Config->HasOptUseFinalizeByKey(), false);
255+
NDq::TSpillingSettings spillingSettings(KqpCtx.Config->GetEnabledSpillingNodes());
256+
auto output = ExpandAggregatePeepholeImpl(node.Ptr(), ctx, TypesCtx, KqpCtx.Config->HasOptUseFinalizeByKey(), false, spillingSettings.IsAggregationSpillingEnabled());
256257
DumpAppliedRule("ExpandAggregatePhase", node.Ptr(), output, ctx);
257258
return TExprBase(output);
258259
}

ydb/core/kqp/provider/yql_kikimr_exec.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -815,10 +815,11 @@ class TKiSourceCallableExecutionTransformer : public TAsyncCallbackTransformer<T
815815

816816
TVector<TExprBase> fakeReads;
817817
auto paramsType = NDq::CollectParameters(programLambda, ctx);
818+
NDq::TSpillingSettings spillingSettings{SessionCtx->Config().GetEnabledSpillingNodes()};
818819
lambda = NDq::BuildProgram(
819820
programLambda, *paramsType, compiler, SessionCtx->Query().QueryData->GetAllocState()->TypeEnv,
820821
*SessionCtx->Query().QueryData->GetAllocState()->HolderFactory.GetFunctionRegistry(),
821-
ctx, fakeReads);
822+
ctx, fakeReads, spillingSettings);
822823

823824
NKikimr::NMiniKQL::TProgramBuilder programBuilder(SessionCtx->Query().QueryData->GetAllocState()->TypeEnv,
824825
*SessionCtx->Query().QueryData->GetAllocState()->HolderFactory.GetFunctionRegistry());

ydb/core/kqp/provider/yql_kikimr_settings.cpp

+29-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
#include <ydb/core/protos/config.pb.h>
44
#include <ydb/core/protos/table_service_config.pb.h>
55
#include <util/generic/size_literals.h>
6+
#include <util/string/split.h>
7+
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
68

79
namespace NYql {
810

@@ -23,6 +25,22 @@ EOptionalFlag GetOptionalFlagValue(const TMaybe<TType>& flag) {
2325
return EOptionalFlag::Disabled;
2426
}
2527

28+
29+
ui64 ParseEnableSpillingNodes(const TString &v) {
30+
ui64 res = 0;
31+
TVector<TString> vec;
32+
StringSplitter(v).SplitBySet(",;| ").AddTo(&vec);
33+
for (auto& s: vec) {
34+
if (s.empty()) {
35+
throw yexception() << "Empty value item";
36+
}
37+
38+
auto value = FromString<NDq::EEnabledSpillingNodes>(s);
39+
res |= ui64(value);
40+
}
41+
return res;
42+
}
43+
2644
static inline bool GetFlagValue(const TMaybe<bool>& flag) {
2745
return flag ? flag.GetRef() : false;
2846
}
@@ -73,6 +91,8 @@ TKikimrConfiguration::TKikimrConfiguration() {
7391

7492
REGISTER_SETTING(*this, OptUseFinalizeByKey);
7593
REGISTER_SETTING(*this, CostBasedOptimizationLevel);
94+
REGISTER_SETTING(*this, EnableSpillingNodes)
95+
.Parser([](const TString& v) { return ParseEnableSpillingNodes(v); });
7696

7797
REGISTER_SETTING(*this, MaxDPccpDPTableSize);
7898

@@ -126,10 +146,9 @@ bool TKikimrSettings::HasOptEnableOlapProvideComputeSharding() const {
126146
}
127147

128148
bool TKikimrSettings::HasOptUseFinalizeByKey() const {
129-
return GetOptionalFlagValue(OptUseFinalizeByKey.Get()) != EOptionalFlag::Disabled;
149+
return GetFlagValue(OptUseFinalizeByKey.Get().GetOrElse(true)) != EOptionalFlag::Disabled;
130150
}
131151

132-
133152
EOptionalFlag TKikimrSettings::GetOptPredicateExtract() const {
134153
return GetOptionalFlagValue(OptEnablePredicateExtract.Get());
135154
}
@@ -151,4 +170,12 @@ TKikimrSettings::TConstPtr TKikimrConfiguration::Snapshot() const {
151170
return std::make_shared<const TKikimrSettings>(*this);
152171
}
153172

173+
void TKikimrConfiguration::SetDefaultEnabledSpillingNodes(const TString& node) {
174+
DefaultEnableSpillingNodes = ParseEnableSpillingNodes(node);
175+
}
176+
177+
ui64 TKikimrConfiguration::GetEnabledSpillingNodes() const {
178+
return EnableSpillingNodes.Get().GetOrElse(DefaultEnableSpillingNodes);
179+
}
180+
154181
}

ydb/core/kqp/provider/yql_kikimr_settings.h

+5
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ struct TKikimrSettings {
5858
NCommon::TConfSetting<TString, false> OptCardinalityHints;
5959
NCommon::TConfSetting<TString, false> OptJoinAlgoHints;
6060
NCommon::TConfSetting<TString, false> OptJoinOrderHints;
61+
NCommon::TConfSetting<TString, false> OverrideStatistics;
6162

6263
/* Disable optimizer rules */
6364
NCommon::TConfSetting<bool, false> OptDisableTopSort;
@@ -175,6 +176,10 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi
175176
bool EnableSpillingGenericQuery = false;
176177
ui32 DefaultCostBasedOptimizationLevel = 4;
177178
bool EnableConstantFolding = true;
179+
ui64 DefaultEnableSpillingNodes = 0;
180+
181+
void SetDefaultEnabledSpillingNodes(const TString& node);
182+
ui64 GetEnabledSpillingNodes() const;
178183
};
179184

180185
}

ydb/core/kqp/proxy_service/kqp_proxy_service.cpp

+8-1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
#include <library/cpp/monlib/service/pages/templates.h>
4747
#include <library/cpp/resource/resource.h>
4848

49+
#include <util/folder/dirut.h>
4950

5051
namespace NKikimr::NKqp {
5152

@@ -236,9 +237,15 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
236237
ResourcePoolsCache.UpdateFeatureFlags(FeatureFlags, ActorContext());
237238

238239
if (auto& cfg = TableServiceConfig.GetSpillingServiceConfig().GetLocalFileConfig(); cfg.GetEnable()) {
240+
TString spillingRoot = cfg.GetRoot();
241+
if (spillingRoot.empty()) {
242+
spillingRoot = NYql::NDq::GetTmpSpillingRootForCurrentUser();
243+
MakeDirIfNotExist(spillingRoot);
244+
}
245+
239246
SpillingService = TlsActivationContext->ExecutorThread.RegisterActor(NYql::NDq::CreateDqLocalFileSpillingService(
240247
NYql::NDq::TFileSpillingServiceConfig{
241-
.Root = cfg.GetRoot(),
248+
.Root = spillingRoot,
242249
.MaxTotalSize = cfg.GetMaxTotalSize(),
243250
.MaxFileSize = cfg.GetMaxFileSize(),
244251
.MaxFilePartSize = cfg.GetMaxFilePartSize(),

ydb/core/kqp/query_compiler/kqp_query_compiler.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -777,8 +777,9 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
777777
stageProto.SetIsEffectsStage(hasEffects || hasTxTableSink);
778778

779779
auto paramsType = CollectParameters(stage, ctx);
780+
NDq::TSpillingSettings spillingSettings{Config->GetEnabledSpillingNodes()};
780781
auto programBytecode = NDq::BuildProgram(stage.Program(), *paramsType, *KqlCompiler, TypeEnv, FuncRegistry,
781-
ctx, {});
782+
ctx, {}, spillingSettings);
782783

783784
auto& programProto = *stageProto.MutableProgram();
784785
programProto.SetRuntimeVersion(NYql::NDqProto::ERuntimeVersion::RUNTIME_VERSION_YQL_1_0);

0 commit comments

Comments
 (0)