@@ -12,75 +12,71 @@ struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
12
12
TMemoryQuotaManager (std::shared_ptr<NRm::IKqpResourceManager> resourceManager
13
13
, NRm::EKqpMemoryPool memoryPool
14
14
, std::shared_ptr<IKqpNodeState> state
15
- , ui64 txId
16
- , ui64 taskId
15
+ , TIntrusivePtr<NRm::TTxState> tx
16
+ , TIntrusivePtr<NRm::TTaskState> task
17
17
, ui64 limit
18
18
, ui64 reasonableSpillingTreshold)
19
19
: NYql::NDq::TGuaranteeQuotaManager(limit, limit)
20
20
, ResourceManager(std::move(resourceManager))
21
21
, MemoryPool(memoryPool)
22
22
, State(std::move(state))
23
- , TxId(txId )
24
- , TaskId(taskId )
23
+ , Tx(std::move(tx) )
24
+ , Task(std::move(task) )
25
25
, ReasonableSpillingTreshold(reasonableSpillingTreshold)
26
26
{
27
27
}
28
28
29
29
~TMemoryQuotaManager () override {
30
30
if (State) {
31
- State->OnTaskTerminate (TxId, TaskId, Success);
31
+ State->OnTaskTerminate (Tx-> TxId , Task-> TaskId , Success);
32
32
}
33
33
34
- ResourceManager->FreeResources (TxId, TaskId );
34
+ ResourceManager->FreeResources (Tx, Task );
35
35
}
36
36
37
37
bool AllocateExtraQuota (ui64 extraSize) override {
38
- auto result = ResourceManager->AllocateResources (TxId, TaskId ,
38
+ auto result = ResourceManager->AllocateResources (Tx, Task ,
39
39
NRm::TKqpResourcesRequest{.MemoryPool = MemoryPool, .Memory = extraSize});
40
40
41
41
if (!result) {
42
42
AFL_WARN (NKikimrServices::KQP_COMPUTE)
43
43
(" problem" , " cannot_allocate_memory" )
44
- (" tx_id" , TxId)
45
- (" task_id" , TaskId)
44
+ (" tx_id" , Tx-> TxId )
45
+ (" task_id" , Task-> TaskId )
46
46
(" memory" , extraSize);
47
47
48
48
return false ;
49
49
}
50
50
51
- TotalQueryAllocationsSize = result.TotalAllocatedQueryMemory ;
52
-
53
51
return true ;
54
52
}
55
53
56
54
void FreeExtraQuota (ui64 extraSize) override {
57
- ResourceManager->FreeResources (TxId, TaskId,
58
- NRm::TKqpResourcesRequest{.MemoryPool = MemoryPool, .Memory = extraSize}
59
- );
55
+ NRm::TKqpResourcesRequest request = NRm::TKqpResourcesRequest{.MemoryPool = MemoryPool, .Memory = extraSize};
56
+ ResourceManager->FreeResources (Tx, Task, Task->FitRequest (request));
60
57
}
61
58
62
59
bool IsReasonableToUseSpilling () const override {
63
- return TotalQueryAllocationsSize >= ReasonableSpillingTreshold;
60
+ return Tx-> GetExtraMemoryAllocatedSize () >= ReasonableSpillingTreshold;
64
61
}
65
62
66
63
TString MemoryConsumptionDetails () const override {
67
- return ResourceManager-> GetTxResourcesUsageDebugInfo (TxId );
64
+ return Tx-> ToString ( );
68
65
}
69
66
70
67
void TerminateHandler (bool success, const NYql::TIssues& issues) {
71
68
AFL_DEBUG (NKikimrServices::KQP_COMPUTE)
72
69
(" problem" , " finish_compute_actor" )
73
- (" tx_id" , TxId)(" task_id" , TaskId)(" success" , success)(" message" , issues.ToOneLineString ());
70
+ (" tx_id" , Tx-> TxId )(" task_id" , Task-> TaskId )(" success" , success)(" message" , issues.ToOneLineString ());
74
71
Success = success;
75
72
}
76
73
77
74
std::shared_ptr<NRm::IKqpResourceManager> ResourceManager;
78
75
NRm::EKqpMemoryPool MemoryPool;
79
76
std::shared_ptr<IKqpNodeState> State;
80
- ui64 TxId ;
81
- ui64 TaskId ;
77
+ TIntrusivePtr<NRm::TTxState> Tx ;
78
+ TIntrusivePtr<NRm::TTaskState> Task ;
82
79
bool Success = true ;
83
- ui64 TotalQueryAllocationsSize = 0 ;
84
80
ui64 ReasonableSpillingTreshold = 0 ;
85
81
};
86
82
@@ -126,8 +122,10 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
126
122
resourcesRequest.ExecutionUnits = 1 ;
127
123
resourcesRequest.Memory = memoryLimits.MkqlLightProgramMemoryLimit ;
128
124
125
+ TIntrusivePtr<NRm::TTaskState> task = MakeIntrusive<NRm::TTaskState>(args.Task ->GetId (), args.TxInfo ->CreatedAt );
126
+
129
127
auto rmResult = ResourceManager_->AllocateResources (
130
- args.TxId , args. Task -> GetId () , resourcesRequest);
128
+ args.TxInfo , task , resourcesRequest);
131
129
132
130
if (!rmResult) {
133
131
return NRm::TKqpRMAllocateResult{rmResult};
@@ -158,8 +156,8 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
158
156
ResourceManager_,
159
157
args.MemoryPool ,
160
158
std::move (args.State ),
161
- args.TxId ,
162
- args. Task -> GetId ( ),
159
+ std::move ( args.TxInfo ) ,
160
+ std::move (task ),
163
161
limit,
164
162
ReasonableSpillingTreshold.load ());
165
163
0 commit comments