10
10
11
11
#include < util/datetime/base.h>
12
12
#include < util/string/builder.h>
13
+ #include < util/stream/format.h>
13
14
14
15
#include " kqp_resource_estimation.h"
15
16
@@ -122,6 +123,8 @@ class TTxState : public TAtomicRefCount<TTxState> {
122
123
std::atomic<ui64> TxScanQueryMemory = 0 ;
123
124
std::atomic<ui64> TxExternalDataQueryMemory = 0 ;
124
125
std::atomic<ui32> TxExecutionUnits = 0 ;
126
+ std::atomic<ui64> TxMaxAllocation = 0 ;
127
+ std::atomic<ui64> TxFailedAllocation = 0 ;
125
128
126
129
public:
127
130
explicit TTxState (ui64 txId, TInstant now, TIntrusivePtr<TKqpCounters> counters, const TString& poolId, const double memoryPoolPercent,
@@ -145,12 +148,14 @@ class TTxState : public TAtomicRefCount<TTxState> {
145
148
146
149
if (!PoolId.empty ()) {
147
150
res << " , PoolId: " << PoolId
148
- << " , MemoryPoolPercent: " << Sprintf (" %.2f" , MemoryPoolPercent);
151
+ << " , MemoryPoolPercent: " << Sprintf (" %.2f" , MemoryPoolPercent > 0 ? MemoryPoolPercent : 100 );
149
152
}
150
153
151
- res << " , memory initially granted resources: " << TxExternalDataQueryMemory.load ()
152
- << " , tx total allocations " << TxScanQueryMemory.load ()
153
- << " , execution units: " << TxExecutionUnits.load ()
154
+ res << " , tx initially granted memory: " << HumanReadableSize (TxExternalDataQueryMemory.load (), SF_BYTES)
155
+ << " , tx total memory allocations: " << HumanReadableSize (TxScanQueryMemory.load (), SF_BYTES)
156
+ << " , tx largest successful memory allocation: " << HumanReadableSize (TxMaxAllocation.load (), SF_BYTES)
157
+ << " , tx largest failed memory allocation: " << HumanReadableSize (TxFailedAllocation.load (), SF_BYTES)
158
+ << " , tx total execution units: " << TxExecutionUnits.load ()
154
159
<< " , started at: " << CreatedAt
155
160
<< " }" ;
156
161
@@ -161,6 +166,11 @@ class TTxState : public TAtomicRefCount<TTxState> {
161
166
return TxScanQueryMemory.load ();
162
167
}
163
168
169
+ void AckFailedMemoryAlloc (ui64 memory) {
170
+ ui64 maxAlloc = TxFailedAllocation.load ();
171
+ while (maxAlloc < memory && !TxFailedAllocation.compare_exchange_weak (maxAlloc, memory));
172
+ }
173
+
164
174
void Released (TIntrusivePtr<TTaskState>& taskState, const TKqpResourcesRequest& resources) {
165
175
if (resources.ExecutionUnits ) {
166
176
Counters->RmOnCompleteFree ->Inc ();
@@ -176,6 +186,9 @@ class TTxState : public TAtomicRefCount<TTxState> {
176
186
taskState->ScanQueryMemory -= resources.Memory ;
177
187
Counters->RmMemory ->Sub (resources.Memory );
178
188
189
+ ui64 maxAlloc = TxMaxAllocation.load ();
190
+ while (maxAlloc < resources.Memory && !TxMaxAllocation.compare_exchange_weak (maxAlloc, resources.Memory ));
191
+
179
192
TxExecutionUnits.fetch_sub (resources.ExecutionUnits );
180
193
taskState->ExecutionUnits -= resources.ExecutionUnits ;
181
194
Counters->RmComputeActors ->Sub (resources.ExecutionUnits );
0 commit comments