Skip to content

Commit b569c23

Browse files
authored
fixes in memory pools (#7623)
1 parent c0b4856 commit b569c23

File tree

7 files changed

+94
-18
lines changed

7 files changed

+94
-18
lines changed

ydb/core/kqp/executer_actor/kqp_planner.cpp

+10-2
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,10 @@ std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::SerializeReque
223223
}
224224

225225
request.SetSchedulerGroup(UserRequestContext->PoolId);
226-
request.SetMemoryPoolPercent(UserRequestContext->PoolConfig->QueryMemoryLimitPercentPerNode);
226+
request.SetDatabase(Database);
227+
if (UserRequestContext->PoolConfig.has_value()) {
228+
request.SetMemoryPoolPercent(UserRequestContext->PoolConfig->QueryMemoryLimitPercentPerNode);
229+
}
227230

228231
return result;
229232
}
@@ -351,9 +354,14 @@ TString TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize)
351354
NYql::NDqProto::TDqTask* taskDesc = ArenaSerializeTaskToProto(TasksGraph, task, true);
352355
NYql::NDq::TComputeRuntimeSettings settings;
353356
if (!TxInfo) {
357+
double memoryPoolPercent = 100;
358+
if (UserRequestContext->PoolConfig.has_value()) {
359+
memoryPoolPercent = UserRequestContext->PoolConfig->QueryMemoryLimitPercentPerNode;
360+
}
361+
354362
TxInfo = MakeIntrusive<NRm::TTxState>(
355363
TxId, TInstant::Now(), ResourceManager_->GetCounters(),
356-
UserRequestContext->PoolId, UserRequestContext->PoolConfig->QueryMemoryLimitPercentPerNode);
364+
UserRequestContext->PoolId, memoryPoolPercent, Database);
357365
}
358366

359367
auto startResult = CaFactory_->CreateKqpComputeActor({

ydb/core/kqp/node_service/kqp_node_service.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,8 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
206206

207207
TIntrusivePtr<NRm::TTxState> txInfo = MakeIntrusive<NRm::TTxState>(
208208
txId, TInstant::Now(), ResourceManager_->GetCounters(),
209-
msg.GetSchedulerGroup(), msg.GetMemoryPoolPercent());
209+
msg.GetSchedulerGroup(), msg.GetMemoryPoolPercent(),
210+
msg.GetDatabase());
210211

211212
const ui32 tasksCount = msg.GetTasks().size();
212213
for (auto& dqTask: *msg.MutableTasks()) {

ydb/core/kqp/rm_service/kqp_rm_service.cpp

+25-9
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ class TMemoryResource : public TAtomicRefCount<TMemoryResource> {
9292
SpillingCookie->SpillingPercentReached.store(Available() < OverLimit);
9393
}
9494

95+
ui64 GetUsed() const {
96+
return Used;
97+
}
98+
9599
void Release(ui64 value) {
96100
if (Used > value) {
97101
Used -= value;
@@ -103,7 +107,7 @@ class TMemoryResource : public TAtomicRefCount<TMemoryResource> {
103107
}
104108

105109
void SetNewLimit(ui64 baseLimit, double memoryPoolPercent, double overPercent) {
106-
if (abs(memoryPoolPercent - MemoryPoolPercent) < MYEPS && baseLimit != BaseLimit)
110+
if (abs(memoryPoolPercent - MemoryPoolPercent) < MYEPS && baseLimit == BaseLimit)
107111
return;
108112

109113
BaseLimit = baseLimit;
@@ -256,7 +260,7 @@ class TKqpResourceManager : public IKqpResourceManager {
256260
task->TotalMemoryCookie = TotalMemoryResource->GetSpillingCookie();
257261

258262
if (hasScanQueryMemory && !tx->PoolId.empty() && tx->MemoryPoolPercent > 0) {
259-
auto [it, success] = MemoryNamedPools.emplace(tx->PoolId, nullptr);
263+
auto [it, success] = MemoryNamedPools.emplace(tx->MakePoolId(), nullptr);
260264

261265
if (success) {
262266
it->second = MakeIntrusive<TMemoryResource>(TotalMemoryResource->GetLimit(), tx->MemoryPoolPercent, SpillingPercent.load());
@@ -291,9 +295,15 @@ class TKqpResourceManager : public IKqpResourceManager {
291295
Counters->RmNotEnoughMemory->Inc();
292296
with_lock (Lock) {
293297
TotalMemoryResource->Release(resources.Memory);
294-
auto it = MemoryNamedPools.find(tx->PoolId);
295-
if (it != MemoryNamedPools.end()) {
296-
it->second->Release(resources.Memory);
298+
if (!tx->PoolId.empty()) {
299+
auto it = MemoryNamedPools.find(tx->MakePoolId());
300+
if (it != MemoryNamedPools.end()) {
301+
it->second->Release(resources.Memory);
302+
}
303+
304+
if (it->second->GetUsed() == 0) {
305+
MemoryNamedPools.erase(it);
306+
}
297307
}
298308
}
299309
}
@@ -356,9 +366,15 @@ class TKqpResourceManager : public IKqpResourceManager {
356366
if (resources.Memory > 0) {
357367
with_lock (Lock) {
358368
TotalMemoryResource->Release(resources.Memory);
359-
auto it = MemoryNamedPools.find(tx->PoolId);
360-
if (it != MemoryNamedPools.end()) {
361-
it->second->Release(resources.Memory);
369+
if (!tx->PoolId.empty()) {
370+
auto it = MemoryNamedPools.find(tx->MakePoolId());
371+
if (it != MemoryNamedPools.end()) {
372+
it->second->Release(resources.Memory);
373+
374+
if (it->second->GetUsed() == 0) {
375+
MemoryNamedPools.erase(it);
376+
}
377+
}
362378
}
363379
}
364380
}
@@ -509,7 +525,7 @@ class TKqpResourceManager : public IKqpResourceManager {
509525
std::shared_ptr<TResourceSnapshotState> ResourceSnapshotState;
510526
TActorId ResourceInfoExchanger = TActorId();
511527

512-
absl::flat_hash_map<TString, TIntrusivePtr<TMemoryResource>> MemoryNamedPools;
528+
absl::flat_hash_map<std::pair<TString, TString>, TIntrusivePtr<TMemoryResource>, THash<std::pair<TString, TString>>> MemoryNamedPools;
513529
};
514530

515531
struct TResourceManagers {

ydb/core/kqp/rm_service/kqp_rm_service.h

+20-5
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include <array>
1717
#include <bitset>
1818
#include <functional>
19+
#include <utility>
1920

2021

2122
namespace NKikimr {
@@ -115,31 +116,45 @@ class TTxState : public TAtomicRefCount<TTxState> {
115116
TIntrusivePtr<TKqpCounters> Counters;
116117
const TString PoolId;
117118
const double MemoryPoolPercent;
119+
const TString Database;
118120

119121
private:
120122
std::atomic<ui64> TxScanQueryMemory = 0;
121123
std::atomic<ui64> TxExternalDataQueryMemory = 0;
122124
std::atomic<ui32> TxExecutionUnits = 0;
123125

124126
public:
125-
explicit TTxState(ui64 txId, TInstant now, TIntrusivePtr<TKqpCounters> counters, const TString& poolId, const double memoryPoolPercent)
127+
explicit TTxState(ui64 txId, TInstant now, TIntrusivePtr<TKqpCounters> counters, const TString& poolId, const double memoryPoolPercent,
128+
const TString& database)
126129
: TxId(txId)
127130
, CreatedAt(now)
128131
, Counters(std::move(counters))
129132
, PoolId(poolId)
130133
, MemoryPoolPercent(memoryPoolPercent)
134+
, Database(database)
131135
{}
132136

137+
std::pair<TString, TString> MakePoolId() const {
138+
return std::make_pair(Database, PoolId);
139+
}
140+
133141
TString ToString() const {
134-
return TStringBuilder() << "TxResourcesInfo{ "
142+
auto res = TStringBuilder() << "TxResourcesInfo{ "
135143
<< "TxId: " << TxId
136-
<< ", PoolId: " << PoolId
137-
<< ", MemoryPoolPercent: " << MemoryPoolPercent
138-
<< ", memory initially granted resources: " << TxExternalDataQueryMemory.load()
144+
<< "Database: " << Database;
145+
146+
if (!PoolId.empty()) {
147+
res << ", PoolId: " << PoolId
148+
<< ", MemoryPoolPercent: " << Sprintf("%.2f", MemoryPoolPercent);
149+
}
150+
151+
res << ", memory initially granted resources: " << TxExternalDataQueryMemory.load()
139152
<< ", extra allocations " << TxScanQueryMemory.load()
140153
<< ", execution units: " << TxExecutionUnits.load()
141154
<< ", started at: " << CreatedAt
142155
<< " }";
156+
157+
return res;
143158
}
144159

145160
ui64 GetExtraMemoryAllocatedSize() {

ydb/core/kqp/rm_service/kqp_rm_ut.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ class KqpRm : public TTestBase {
184184
}
185185

186186
TIntrusivePtr<NRm::TTxState> MakeTx(ui64 txId, std::shared_ptr<NRm::IKqpResourceManager> rm) {
187-
return MakeIntrusive<NRm::TTxState>(txId, TInstant::Now(), rm->GetCounters(), "", (double)100);
187+
return MakeIntrusive<NRm::TTxState>(txId, TInstant::Now(), rm->GetCounters(), "", (double)100, "");
188188
}
189189

190190
TIntrusivePtr<NRm::TTaskState> MakeTask(ui64 taskId, TIntrusivePtr<NRm::TTxState> tx) {

ydb/core/kqp/ut/query/kqp_limits_ut.cpp

+35
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <ydb/core/kqp/counters/kqp_counters.h>
44
#include <ydb/library/ydb_issue/proto/issue_id.pb.h>
55

6+
#include <ydb/core/tablet/resource_broker.h>
67
#include <util/random/random.h>
78

89
namespace NKikimr {
@@ -11,6 +12,38 @@ namespace NKqp {
1112
using namespace NYdb;
1213
using namespace NYdb::NTable;
1314

15+
using namespace NResourceBroker;
16+
17+
NKikimrResourceBroker::TResourceBrokerConfig MakeResourceBrokerTestConfig() {
18+
NKikimrResourceBroker::TResourceBrokerConfig config;
19+
20+
auto queue = config.AddQueues();
21+
queue->SetName("queue_default");
22+
queue->SetWeight(5);
23+
queue->MutableLimit()->AddResource(4);
24+
25+
queue = config.AddQueues();
26+
queue->SetName("queue_kqp_resource_manager");
27+
queue->SetWeight(20);
28+
queue->MutableLimit()->AddResource(4);
29+
queue->MutableLimit()->AddResource(50'000);
30+
31+
auto task = config.AddTasks();
32+
task->SetName("unknown");
33+
task->SetQueueName("queue_default");
34+
task->SetDefaultDuration(TDuration::Seconds(5).GetValue());
35+
36+
task = config.AddTasks();
37+
task->SetName(NLocalDb::KqpResourceManagerTaskName);
38+
task->SetQueueName("queue_kqp_resource_manager");
39+
task->SetDefaultDuration(TDuration::Seconds(5).GetValue());
40+
41+
config.MutableResourceLimit()->AddResource(10);
42+
config.MutableResourceLimit()->AddResource(100'000);
43+
44+
return config;
45+
}
46+
1447
namespace {
1548
bool IsRetryable(const EStatus& status) {
1649
return status == EStatus::OVERLOADED;
@@ -133,6 +166,8 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
133166
app.MutableTableServiceConfig()->MutableResourceManager()->SetMkqlLightProgramMemoryLimit(10);
134167
app.MutableTableServiceConfig()->MutableResourceManager()->SetQueryMemoryLimit(2000);
135168

169+
app.MutableResourceBrokerConfig()->CopyFrom(MakeResourceBrokerTestConfig());
170+
136171
TKikimrRunner kikimr(app);
137172
CreateLargeTable(kikimr, 0, 0, 0);
138173

ydb/core/protos/kqp.proto

+1
Original file line numberDiff line numberDiff line change
@@ -551,6 +551,7 @@ message TEvStartKqpTasksRequest {
551551
optional string SerializedGUCSettings = 8;
552552
optional string SchedulerGroup = 9;
553553
optional double MemoryPoolPercent = 10 [default = 100];
554+
optional string Database = 11;
554555
}
555556

556557
message TEvStartKqpTasksResponse {

0 commit comments

Comments
 (0)