Skip to content

Commit a77c0ad

Browse files
authored
support memory pools configurations & tune spilling settings (#7510)
1 parent 93d3c0a commit a77c0ad

11 files changed

+152
-71
lines changed

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,13 @@ struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
1414
, std::shared_ptr<IKqpNodeState> state
1515
, TIntrusivePtr<NRm::TTxState> tx
1616
, TIntrusivePtr<NRm::TTaskState> task
17-
, ui64 limit
18-
, ui64 reasonableSpillingTreshold)
17+
, ui64 limit)
1918
: NYql::NDq::TGuaranteeQuotaManager(limit, limit)
2019
, ResourceManager(std::move(resourceManager))
2120
, MemoryPool(memoryPool)
2221
, State(std::move(state))
2322
, Tx(std::move(tx))
2423
, Task(std::move(task))
25-
, ReasonableSpillingTreshold(reasonableSpillingTreshold)
2624
{
2725
}
2826

@@ -57,7 +55,7 @@ struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
5755
}
5856

5957
bool IsReasonableToUseSpilling() const override {
60-
return Tx->GetExtraMemoryAllocatedSize() >= ReasonableSpillingTreshold;
58+
return Task->IsReasonableToStartSpilling();
6159
}
6260

6361
TString MemoryConsumptionDetails() const override {
@@ -88,7 +86,6 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
8886
std::atomic<ui64> MkqlLightProgramMemoryLimit = 0;
8987
std::atomic<ui64> MkqlHeavyProgramMemoryLimit = 0;
9088
std::atomic<ui64> MinChannelBufferSize = 0;
91-
std::atomic<ui64> ReasonableSpillingTreshold = 0;
9289

9390
public:
9491
TKqpCaFactory(const NKikimrConfig::TTableServiceConfig::TResourceManager& config,
@@ -107,7 +104,6 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
107104
MkqlLightProgramMemoryLimit.store(config.GetMkqlLightProgramMemoryLimit());
108105
MkqlHeavyProgramMemoryLimit.store(config.GetMkqlHeavyProgramMemoryLimit());
109106
MinChannelBufferSize.store(config.GetMinChannelBufferSize());
110-
ReasonableSpillingTreshold.store(config.GetReasonableSpillingTreshold());
111107
}
112108

113109
TActorStartResult CreateKqpComputeActor(TCreateArgs&& args) override {
@@ -158,8 +154,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
158154
std::move(args.State),
159155
std::move(args.TxInfo),
160156
std::move(task),
161-
limit,
162-
ReasonableSpillingTreshold.load());
157+
limit);
163158

164159
auto runtimeSettings = args.RuntimeSettings;
165160
runtimeSettings.ExtraMemoryAllocationPool = args.MemoryPool;

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ struct IKqpNodeComputeActorFactory {
122122
const TInstant& Deadline;
123123
const bool ShareMailbox;
124124
const TMaybe<NYql::NDqProto::TRlPath>& RlPath;
125+
125126
TComputeStagesWithScan* ComputesByStages = nullptr;
126127
std::shared_ptr<IKqpNodeState> State = nullptr;
127128
TComputeActorSchedulingOptions SchedulingOptions = {};

ydb/core/kqp/executer_actor/kqp_planner.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::SerializeReque
223223
}
224224

225225
request.SetSchedulerGroup(UserRequestContext->PoolId);
226+
request.SetMemoryPoolPercent(UserRequestContext->PoolConfig->QueryMemoryLimitPercentPerNode);
226227

227228
return result;
228229
}
@@ -351,7 +352,8 @@ TString TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize)
351352
NYql::NDq::TComputeRuntimeSettings settings;
352353
if (!TxInfo) {
353354
TxInfo = MakeIntrusive<NRm::TTxState>(
354-
TxId, TInstant::Now(), ResourceManager_->GetCounters());
355+
TxId, TInstant::Now(), ResourceManager_->GetCounters(),
356+
UserRequestContext->PoolId, UserRequestContext->PoolConfig->QueryMemoryLimitPercentPerNode);
355357
}
356358

357359
auto startResult = CaFactory_->CreateKqpComputeActor({
@@ -370,7 +372,7 @@ TString TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize)
370372
.StatsMode = GetDqStatsMode(StatsMode),
371373
.Deadline = Deadline,
372374
.ShareMailbox = (computeTasksSize <= 1),
373-
.RlPath = Nothing()
375+
.RlPath = Nothing(),
374376
});
375377

376378
if (const auto* rmResult = std::get_if<NRm::TKqpRMAllocateResult>(&startResult)) {

ydb/core/kqp/node_service/kqp_node_service.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,8 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
205205
}
206206

207207
TIntrusivePtr<NRm::TTxState> txInfo = MakeIntrusive<NRm::TTxState>(
208-
txId, TInstant::Now(), ResourceManager_->GetCounters());
208+
txId, TInstant::Now(), ResourceManager_->GetCounters(),
209+
msg.GetSchedulerGroup(), msg.GetMemoryPoolPercent());
209210

210211
const ui32 tasksCount = msg.GetTasks().size();
211212
for (auto& dqTask: *msg.MutableTasks()) {
@@ -246,7 +247,7 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
246247
.RlPath = rlPath,
247248
.ComputesByStages = &computesByStage,
248249
.State = State_,
249-
.SchedulingOptions = std::move(schedulingOptions)
250+
.SchedulingOptions = std::move(schedulingOptions),
250251
});
251252

252253
if (const auto* rmResult = std::get_if<NRm::TKqpRMAllocateResult>(&result)) {

0 commit comments

Comments
 (0)