@@ -264,6 +264,10 @@ class TKqpResourceManager : public IKqpResourceManager {
264
264
// all other requests are not guaranteed to be satisfied.
265
265
// In the nearest future we need to implement several layers of memory requests.
266
266
bool isFirstAllocationRequest = (resources.ExecutionUnits > 0 && resources.MemoryPool == EKqpMemoryPool::DataQuery);
267
+ if (resources.ExecutionUnits > 0 ) {
268
+ Counters->RmOnStartAllocs ->Inc ();
269
+ }
270
+
267
271
if (isFirstAllocationRequest) {
268
272
auto & txBucket = TxBucket (txId);
269
273
with_lock (txBucket.Lock ) {
@@ -370,6 +374,12 @@ class TKqpResourceManager : public IKqpResourceManager {
370
374
371
375
auto & txBucket = TxBucket (txId);
372
376
377
+ if (resources.ReleaseAllResources ) {
378
+ Counters->RmOnCompleteFree ->Inc ();
379
+ } else {
380
+ Counters->RmExtraMemFree ->Inc ();
381
+ }
382
+
373
383
{
374
384
TMaybe<TGuard<TMutex>> guard;
375
385
guard.ConstructInPlace (txBucket.Lock );
@@ -385,6 +395,7 @@ class TKqpResourceManager : public IKqpResourceManager {
385
395
return ;
386
396
}
387
397
398
+ guard.Clear ();
388
399
auto & task = taskIt->second ;
389
400
if (resources.ReleaseAllResources && task.ExecutionUnits ) {
390
401
FreeExecutionUnits (task.ExecutionUnits );
@@ -400,10 +411,7 @@ class TKqpResourceManager : public IKqpResourceManager {
400
411
}
401
412
402
413
task.ScanQueryMemory -= releaseScanQueryMemory;
403
- tx.TxScanQueryMemory -= releaseScanQueryMemory;
404
-
405
414
task.ExternalDataQueryMemory -= releaseExternalDataQueryMemory;
406
- tx.TxExternalDataQueryMemory -= releaseExternalDataQueryMemory;
407
415
408
416
if (task.ScanQueryMemory == 0 ) {
409
417
if (task.ResourceBrokerTaskId ) {
@@ -419,6 +427,9 @@ class TKqpResourceManager : public IKqpResourceManager {
419
427
Y_DEBUG_ABORT_UNLESS (reduced);
420
428
}
421
429
430
+ guard.ConstructInPlace (txBucket.Lock );
431
+ tx.TxScanQueryMemory -= releaseScanQueryMemory;
432
+ tx.TxExternalDataQueryMemory -= releaseExternalDataQueryMemory;
422
433
if (resources.ExecutionUnits ) {
423
434
ui64 remainsTasks = tx.Tasks .size () - 1 ;
424
435
if (remainsTasks == 0 ) {
@@ -428,16 +439,20 @@ class TKqpResourceManager : public IKqpResourceManager {
428
439
}
429
440
}
430
441
442
+ guard.Clear ();
443
+
431
444
i64 prev = ExternalDataQueryMemory.fetch_sub (releaseExternalDataQueryMemory);
432
445
Counters->RmExternalMemory ->Sub (releaseExternalDataQueryMemory);
433
446
Y_DEBUG_ABORT_UNLESS (prev >= 0 );
434
447
Counters->RmMemory ->Sub (releaseScanQueryMemory);
435
448
Y_DEBUG_ABORT_UNLESS (Counters->RmMemory ->Val () >= 0 );
436
449
} // with_lock (txBucket.Lock)
437
450
438
- with_lock (Lock) {
439
- ScanQueryMemoryResource.Release (releaseScanQueryMemory);
440
- } // with_lock (Lock)
451
+ if (releaseScanQueryMemory > 0 ) {
452
+ with_lock (Lock) {
453
+ ScanQueryMemoryResource.Release (releaseScanQueryMemory);
454
+ } // with_lock (Lock)
455
+ }
441
456
442
457
LOG_AS_D (" TxId: " << txId << " , taskId: " << taskId << " . Released resources, "
443
458
<< " ScanQueryMemory: " << releaseScanQueryMemory << " , "
0 commit comments