Skip to content

Commit eb0a2c4

Browse files
committed
avoid locks in cases of free tier allocations and don't keep transactions in hash map in resource manager
1 parent 6583eb3 commit eb0a2c4

File tree

5 files changed

+224
-252
lines changed

5 files changed

+224
-252
lines changed

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -12,75 +12,71 @@ struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
1212
TMemoryQuotaManager(std::shared_ptr<NRm::IKqpResourceManager> resourceManager
1313
, NRm::EKqpMemoryPool memoryPool
1414
, std::shared_ptr<IKqpNodeState> state
15-
, ui64 txId
16-
, ui64 taskId
15+
, TIntrusivePtr<NRm::TTxState> tx
16+
, TIntrusivePtr<NRm::TTaskState> task
1717
, ui64 limit
1818
, ui64 reasonableSpillingTreshold)
1919
: NYql::NDq::TGuaranteeQuotaManager(limit, limit)
2020
, ResourceManager(std::move(resourceManager))
2121
, MemoryPool(memoryPool)
2222
, State(std::move(state))
23-
, TxId(txId)
24-
, TaskId(taskId)
23+
, Tx(std::move(tx))
24+
, Task(std::move(task))
2525
, ReasonableSpillingTreshold(reasonableSpillingTreshold)
2626
{
2727
}
2828

2929
~TMemoryQuotaManager() override {
3030
if (State) {
31-
State->OnTaskTerminate(TxId, TaskId, Success);
31+
State->OnTaskTerminate(Tx->TxId, Task->TaskId, Success);
3232
}
3333

34-
ResourceManager->FreeResources(TxId, TaskId);
34+
ResourceManager->FreeResources(Tx, Task);
3535
}
3636

3737
bool AllocateExtraQuota(ui64 extraSize) override {
38-
auto result = ResourceManager->AllocateResources(TxId, TaskId,
38+
auto result = ResourceManager->AllocateResources(Tx, Task,
3939
NRm::TKqpResourcesRequest{.MemoryPool = MemoryPool, .Memory = extraSize});
4040

4141
if (!result) {
4242
AFL_WARN(NKikimrServices::KQP_COMPUTE)
4343
("problem", "cannot_allocate_memory")
44-
("tx_id", TxId)
45-
("task_id", TaskId)
44+
("tx_id", Tx->TxId)
45+
("task_id", Task->TaskId)
4646
("memory", extraSize);
4747

4848
return false;
4949
}
5050

51-
TotalQueryAllocationsSize = result.TotalAllocatedQueryMemory;
52-
5351
return true;
5452
}
5553

5654
void FreeExtraQuota(ui64 extraSize) override {
57-
ResourceManager->FreeResources(TxId, TaskId,
58-
NRm::TKqpResourcesRequest{.MemoryPool = MemoryPool, .Memory = extraSize}
59-
);
55+
NRm::TKqpResourcesRequest request = NRm::TKqpResourcesRequest{.MemoryPool = MemoryPool, .Memory = extraSize};
56+
ResourceManager->FreeResources(Tx, Task, Task->FitRequest(request));
6057
}
6158

6259
bool IsReasonableToUseSpilling() const override {
63-
return TotalQueryAllocationsSize >= ReasonableSpillingTreshold;
60+
return Tx->GetExtraMemoryAllocatedSize() >= ReasonableSpillingTreshold;
6461
}
6562

6663
TString MemoryConsumptionDetails() const override {
67-
return ResourceManager->GetTxResourcesUsageDebugInfo(TxId);
64+
return Tx->ToString();
6865
}
6966

7067
void TerminateHandler(bool success, const NYql::TIssues& issues) {
7168
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)
7269
("problem", "finish_compute_actor")
73-
("tx_id", TxId)("task_id", TaskId)("success", success)("message", issues.ToOneLineString());
70+
("tx_id", Tx->TxId)("task_id", Task->TaskId)("success", success)("message", issues.ToOneLineString());
7471
Success = success;
7572
}
7673

7774
std::shared_ptr<NRm::IKqpResourceManager> ResourceManager;
7875
NRm::EKqpMemoryPool MemoryPool;
7976
std::shared_ptr<IKqpNodeState> State;
80-
ui64 TxId;
81-
ui64 TaskId;
77+
TIntrusivePtr<NRm::TTxState> Tx;
78+
TIntrusivePtr<NRm::TTaskState> Task;
8279
bool Success = true;
83-
ui64 TotalQueryAllocationsSize = 0;
8480
ui64 ReasonableSpillingTreshold = 0;
8581
};
8682

@@ -126,8 +122,11 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
126122
resourcesRequest.ExecutionUnits = 1;
127123
resourcesRequest.Memory = memoryLimits.MkqlLightProgramMemoryLimit;
128124

125+
TIntrusivePtr<NRm::TTxState> tx = MakeIntrusive<NRm::TTxState>(args.TxId, TInstant::Now(), ResourceManager_->GetCounters());
126+
TIntrusivePtr<NRm::TTaskState> task = MakeIntrusive<NRm::TTaskState>(args.Task->GetId(), tx->CreatedAt);
127+
129128
auto rmResult = ResourceManager_->AllocateResources(
130-
args.TxId, args.Task->GetId(), resourcesRequest);
129+
tx, task, resourcesRequest);
131130

132131
if (!rmResult) {
133132
return NRm::TKqpRMAllocateResult{rmResult};
@@ -158,8 +157,8 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
158157
ResourceManager_,
159158
args.MemoryPool,
160159
std::move(args.State),
161-
args.TxId,
162-
args.Task->GetId(),
160+
std::move(tx),
161+
std::move(task),
163162
limit,
164163
ReasonableSpillingTreshold.load());
165164

0 commit comments

Comments
 (0)