@@ -312,7 +312,6 @@ std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup(
312
312
313
313
TDuration TColumnEngineForLogs::ProcessTiering (const ui64 pathId, const TTiering& ttl, TTieringProcessContext& context) const {
314
314
AFL_DEBUG (NKikimrServices::TX_COLUMNSHARD)(" event" , " ProcessTiering" )(" path_id" , pathId)(" ttl" , ttl.GetDebugString ());
315
- ui64 evictionSize = 0 ;
316
315
ui64 dropBlobs = 0 ;
317
316
auto & indexInfo = VersionedIndex.GetLastSchema ()->GetIndexInfo ();
318
317
Y_ABORT_UNLESS (context.Changes ->Tiering .emplace (pathId, ttl).second );
@@ -340,9 +339,8 @@ TDuration TColumnEngineForLogs::ProcessTiering(const ui64 pathId, const TTiering
340
339
continue ;
341
340
}
342
341
343
- context.AllowEviction = (evictionSize <= context.MaxEvictBytes );
344
342
context.AllowDrop = (dropBlobs <= TCompactionLimits::MAX_BLOBS_TO_DELETE);
345
- const bool tryEvictPortion = context. AllowEviction && ttl. HasTiers ();
343
+ const bool tryEvictPortion = ttl. HasTiers () && context. HasMemoryForEviction ();
346
344
347
345
if (auto max = info->MaxValue (ttlColumnId)) {
348
346
bool keep = !expireTimestampOpt;
@@ -390,8 +388,8 @@ TDuration TColumnEngineForLogs::ProcessTiering(const ui64 pathId, const TTiering
390
388
}
391
389
if (currentTierName != tierName) {
392
390
AFL_DEBUG (NKikimrServices::TX_COLUMNSHARD)(" event" , " tiering switch detected" )(" from" , currentTierName)(" to" , tierName);
393
- evictionSize += info->BlobsSizes ().first ;
394
391
context.Changes ->AddPortionToEvict (*info, TPortionEvictionFeatures (tierName, pathId, StoragesManager->GetOperator (tierName)));
392
+ context.AppPortionForCheckMemoryUsage (*info);
395
393
SignalCounters.OnPortionToEvict (info->BlobsBytes ());
396
394
}
397
395
}
@@ -406,7 +404,7 @@ TDuration TColumnEngineForLogs::ProcessTiering(const ui64 pathId, const TTiering
406
404
SignalCounters.OnPortionNoBorder (info->BlobsBytes ());
407
405
}
408
406
}
409
- if (dWaiting > TDuration::MilliSeconds (500 ) && (!context.AllowEviction || !context.AllowDrop )) {
407
+ if (dWaiting > TDuration::MilliSeconds (500 ) && (!context.HasMemoryForEviction () || !context.AllowDrop )) {
410
408
dWaiting = TDuration::MilliSeconds (500 );
411
409
}
412
410
Y_ABORT_UNLESS (!!dWaiting);
@@ -447,7 +445,7 @@ bool TColumnEngineForLogs::DrainEvictionQueue(std::map<TMonotonic, std::vector<T
447
445
return hasChanges;
448
446
}
449
447
450
- std::shared_ptr<TTTLColumnEngineChanges> TColumnEngineForLogs::StartTtl (const THashMap<ui64, TTiering>& pathEviction, const THashSet<TPortionAddress>& busyPortions, ui64 maxEvictBytes ) noexcept {
448
+ std::shared_ptr<TTTLColumnEngineChanges> TColumnEngineForLogs::StartTtl (const THashMap<ui64, TTiering>& pathEviction, const THashSet<TPortionAddress>& busyPortions, const ui64 memoryUsageLimit ) noexcept {
451
449
AFL_DEBUG (NKikimrServices::TX_COLUMNSHARD)(" event" , " StartTtl" )(" external" , pathEviction.size ())
452
450
(" internal" , EvictionsController.MutableNextCheckInstantForTierings ().size ())
453
451
;
@@ -456,7 +454,7 @@ std::shared_ptr<TTTLColumnEngineChanges> TColumnEngineForLogs::StartTtl(const TH
456
454
457
455
auto changes = std::make_shared<TTTLColumnEngineChanges>(TSplitSettings (), saverContext);
458
456
459
- TTieringProcessContext context (maxEvictBytes , changes, busyPortions);
457
+ TTieringProcessContext context (memoryUsageLimit , changes, busyPortions, TTTLColumnEngineChanges::BuildMemoryPredictor () );
460
458
bool hasExternalChanges = false ;
461
459
for (auto && i : pathEviction) {
462
460
context.DurationsForced [i.first ] = ProcessTiering (i.first , i.second , context);
@@ -575,9 +573,11 @@ void TColumnEngineForLogs::DoRegisterTable(const ui64 pathId) {
575
573
AFL_VERIFY (Tables.emplace (pathId, std::make_shared<TGranuleMeta>(pathId, GranulesStorage, SignalCounters.RegisterGranuleDataCounters (), VersionedIndex)).second );
576
574
}
577
575
578
- TColumnEngineForLogs::TTieringProcessContext::TTieringProcessContext (const ui64 maxEvictBytes, std::shared_ptr<TTTLColumnEngineChanges> changes, const THashSet<TPortionAddress>& busyPortions)
579
- : Now(TlsActivationContext ? AppData()->TimeProvider->Now () : TInstant::Now())
580
- , MaxEvictBytes(maxEvictBytes)
576
+ TColumnEngineForLogs::TTieringProcessContext::TTieringProcessContext (const ui64 memoryUsageLimit,
577
+ std::shared_ptr<TTTLColumnEngineChanges> changes, const THashSet<TPortionAddress>& busyPortions, const std::shared_ptr<TColumnEngineChanges::IMemoryPredictor>& memoryPredictor)
578
+ : MemoryUsageLimit(memoryUsageLimit)
579
+ , MemoryPredictor(memoryPredictor)
580
+ , Now(TlsActivationContext ? AppData()->TimeProvider->Now () : TInstant::Now())
581
581
, Changes(changes)
582
582
, BusyPortions(busyPortions)
583
583
{
0 commit comments