@@ -94,6 +94,7 @@ namespace NKikimr::NBlobDepot {
94
94
95
95
Become (&TThis::StateFunc);
96
96
Action ();
97
+ UpdateBytesCopiedQ ();
97
98
}
98
99
99
100
void TAssimilator::PassAway () {
@@ -116,8 +117,10 @@ namespace NKikimr::NBlobDepot {
116
117
hFunc (TEvTabletPipe::TEvClientDestroyed, Handle );
117
118
hFunc (TEvBlobStorage::TEvControllerGroupDecommittedResponse, Handle );
118
119
cFunc (TEvPrivate::EvResume, Action);
120
+ cFunc (TEvPrivate::EvResumeScanDataForPlanning, HandleResumeScanDataForPlanning);
119
121
cFunc (TEvPrivate::EvResumeScanDataForCopying, HandleResumeScanDataForCopying);
120
122
fFunc (TEvPrivate::EvTxComplete, HandleTxComplete);
123
+ cFunc (TEvPrivate::EvUpdateBytesCopiedQ, UpdateBytesCopiedQ);
121
124
cFunc (TEvents::TSystem::Poison, PassAway);
122
125
123
126
default :
@@ -134,7 +137,11 @@ namespace NKikimr::NBlobDepot {
134
137
if (Self->DecommitState < EDecommitState::BlobsFinished) {
135
138
SendAssimilateRequest ();
136
139
} else if (Self->DecommitState < EDecommitState::BlobsCopied) {
137
- ScanDataForCopying ();
140
+ if (PlanningComplete) {
141
+ ScanDataForCopying ();
142
+ } else {
143
+ ScanDataForPlanning ();
144
+ }
138
145
} else if (Self->DecommitState == EDecommitState::BlobsCopied) {
139
146
Y_ABORT_UNLESS (!PipeId);
140
147
CreatePipe ();
@@ -283,7 +290,61 @@ namespace NKikimr::NBlobDepot {
283
290
}
284
291
}
285
292
293
+ void TAssimilator::ScanDataForPlanning () {
294
+ if (ResumeScanDataForPlanningInFlight) {
295
+ return ;
296
+ }
297
+
298
+ THPTimer timer;
299
+ ui32 numItems = 0 ;
300
+ bool timeout = false ;
301
+
302
+ if (!LastPlanScannedKey) {
303
+ ++Self->Assimilator .CopyIteration ;
304
+ Self->Assimilator .BytesToCopy = 0 ;
305
+ }
306
+
307
+ TData::TScanRange range{
308
+ LastPlanScannedKey ? TData::TKey (*LastPlanScannedKey) : TData::TKey::Min (),
309
+ TData::TKey::Max (),
310
+ };
311
+ Self->Data ->ScanRange (range, nullptr , nullptr , [&](const TData::TKey& key, const TData::TValue& value) {
312
+ if (++numItems == 1000 ) {
313
+ numItems = 0 ;
314
+ if (TDuration::Seconds (timer.Passed ()) >= TDuration::MilliSeconds (1 )) {
315
+ timeout = true ;
316
+ return false ;
317
+ }
318
+ }
319
+ if (value.GoingToAssimilate ) {
320
+ Self->Assimilator .BytesToCopy += key.GetBlobId ().BlobSize ();
321
+ }
322
+ LastPlanScannedKey.emplace (key.GetBlobId ());
323
+ return true ;
324
+ });
325
+
326
+ if (timeout) {
327
+ ResumeScanDataForPlanningInFlight = true ;
328
+ TActivationContext::Send (new IEventHandle (TEvPrivate::EvResumeScanDataForPlanning, 0 , SelfId (), {}, nullptr , 0 ));
329
+ return ;
330
+ }
331
+
332
+ ActionInProgress = false ;
333
+ PlanningComplete = true ;
334
+ Action ();
335
+ }
336
+
337
+ void TAssimilator::HandleResumeScanDataForPlanning () {
338
+ Y_ABORT_UNLESS (ResumeScanDataForPlanningInFlight);
339
+ ResumeScanDataForPlanningInFlight = false ;
340
+ ScanDataForPlanning ();
341
+ }
342
+
286
343
void TAssimilator::ScanDataForCopying () {
344
+ if (ResumeScanDataForCopyingInFlight) {
345
+ return ;
346
+ }
347
+
287
348
STLOG (PRI_DEBUG, BLOB_DEPOT, BDT54, " TAssimilator::ScanDataForCopying" , (Id, Self->GetLogId ()),
288
349
(LastScannedKey, LastScannedKey), (NumGetsUnprocessed, GetIdToUnprocessedPuts.size ()));
289
350
@@ -324,11 +385,8 @@ namespace NKikimr::NBlobDepot {
324
385
(EntriesToProcess, EntriesToProcess), (Timeout, timeout), (NumGetsUnprocessed, GetIdToUnprocessedPuts.size ()));
325
386
326
387
if (timeout) { // timeout hit, reschedule work
327
- if (!ResumeScanDataForCopyingInFlight) {
328
- TActivationContext::Send (new IEventHandle (TEvPrivate::EvResumeScanDataForCopying, 0 , SelfId (), {}, nullptr , 0 ));
329
- ResumeScanDataForCopyingInFlight = true ;
330
- }
331
- break ;
388
+ TActivationContext::Send (new IEventHandle (TEvPrivate::EvResumeScanDataForCopying, 0 , SelfId (), {}, nullptr , 0 ));
389
+ ResumeScanDataForCopyingInFlight = true ;
332
390
} else if (!ScanQ.empty ()) {
333
391
using TQuery = TEvBlobStorage::TEvGet::TQuery;
334
392
const ui32 sz = ScanQ.size ();
@@ -345,15 +403,18 @@ namespace NKikimr::NBlobDepot {
345
403
GetIdToUnprocessedPuts.try_emplace (getId);
346
404
ScanQ.clear ();
347
405
TotalSize = 0 ;
348
- } else if (!GetIdToUnprocessedPuts.empty ()) { // there are some unprocessed get queries, still have to wait
349
- break ;
406
+ continue ;
407
+ } else if (!GetIdToUnprocessedPuts.empty ()) {
408
+ // there are some unprocessed get queries, still have to wait
350
409
} else if (!EntriesToProcess) { // we have finished scanning the whole table without any entries, copying is done
351
410
OnCopyDone ();
352
- break ;
353
411
} else { // we have finished scanning, but we have replicated some data, restart scanning to ensure that nothing left
354
412
LastScannedKey.reset ();
355
- EntriesToProcess = false ;
413
+ LastPlanScannedKey.reset ();
414
+ EntriesToProcess = PlanningComplete = ActionInProgress = false ;
415
+ Action ();
356
416
}
417
+ break ;
357
418
}
358
419
}
359
420
@@ -365,7 +426,7 @@ namespace NKikimr::NBlobDepot {
365
426
366
427
void TAssimilator::Handle (TEvBlobStorage::TEvGetResult::TPtr ev) {
367
428
auto & msg = *ev->Get ();
368
- (msg.Status == NKikimrProto::OK ? Self->AssimilatorLatestOkGet : Self->AssimilatorLatestOkPut ) = TInstant::Now ();
429
+ (msg.Status == NKikimrProto::OK ? Self->Assimilator . LatestOkGet : Self->Assimilator . LatestErrorGet ) = TInstant::Now ();
369
430
const auto it = GetIdToUnprocessedPuts.find (ev->Cookie );
370
431
Y_ABORT_UNLESS (it != GetIdToUnprocessedPuts.end ());
371
432
ui32 getBytes = 0 ;
@@ -389,26 +450,25 @@ namespace NKikimr::NBlobDepot {
389
450
++it->second ;
390
451
}
391
452
getBytes += resp.Id .BlobSize ();
392
- ++Self->AssimilatorBlobsReadOk ;
453
+ ++Self->Assimilator . BlobsReadOk ;
393
454
} else if (resp.Status == NKikimrProto::NODATA) {
394
455
Self->Data ->ExecuteTxCommitAssimilatedBlob (NKikimrProto::NODATA, TBlobSeqId (), TData::TKey (resp.Id ),
395
456
TEvPrivate::EvTxComplete, SelfId (), it->first );
396
457
++it->second ;
397
- ++Self->AssimilatorBlobsReadNoData ;
458
+ ++Self->Assimilator .BlobsReadNoData ;
459
+ Self->Assimilator .BytesToCopy -= resp.Id .BlobSize ();
398
460
} else {
399
- ++Self->AssimilatorBlobsReadError ;
461
+ ++Self->Assimilator . BlobsReadError ;
400
462
continue ;
401
463
}
402
- Self->AssimilatorLastReadBlobId = resp.Id ;
464
+ Self->Assimilator . LastReadBlobId = resp.Id ;
403
465
}
404
466
if (getBytes) {
405
467
Self->TabletCounters ->Cumulative ()[NKikimrBlobDepot::COUNTER_DECOMMIT_GET_BYTES] += getBytes;
406
468
}
407
469
if (!it->second ) {
408
470
GetIdToUnprocessedPuts.erase (it);
409
- if (!ResumeScanDataForCopyingInFlight) {
410
- ScanDataForCopying ();
411
- }
471
+ ScanDataForCopying ();
412
472
}
413
473
}
414
474
@@ -417,20 +477,20 @@ namespace NKikimr::NBlobDepot {
417
477
Y_ABORT_UNLESS (it != GetIdToUnprocessedPuts.end ());
418
478
if (!--it->second ) {
419
479
GetIdToUnprocessedPuts.erase (it);
420
- if (!ResumeScanDataForCopyingInFlight) {
421
- ScanDataForCopying ();
422
- }
480
+ ScanDataForCopying ();
423
481
}
424
482
}
425
483
426
484
void TAssimilator::Handle (TEvBlobStorage::TEvPutResult::TPtr ev) {
427
485
auto & msg = *ev->Get ();
428
- (msg.Status == NKikimrProto::OK ? Self->AssimilatorLatestOkPut : Self->AssimilatorLatestErrorPut ) = TInstant::Now ();
486
+ (msg.Status == NKikimrProto::OK ? Self->Assimilator . LatestOkPut : Self->Assimilator . LatestErrorPut ) = TInstant::Now ();
429
487
if (msg.Status == NKikimrProto::OK) {
430
488
Self->TabletCounters ->Cumulative ()[NKikimrBlobDepot::COUNTER_DECOMMIT_PUT_OK_BYTES] += msg.Id .BlobSize ();
431
- ++Self->AssimilatorBlobsPutOk ;
489
+ ++Self->Assimilator .BlobsPutOk ;
490
+ Self->Assimilator .BytesToCopy -= msg.Id .BlobSize ();
491
+ Self->Assimilator .BytesCopied += msg.Id .BlobSize ();
432
492
} else {
433
- ++Self->AssimilatorBlobsPutError ;
493
+ ++Self->Assimilator . BlobsPutError ;
434
494
}
435
495
const auto it = PutIdToKey.find (ev->Cookie );
436
496
Y_ABORT_UNLESS (it != PutIdToKey.end ());
@@ -554,14 +614,41 @@ namespace NKikimr::NBlobDepot {
554
614
}
555
615
556
616
void TAssimilator::UpdateAssimilatorPosition () const {
557
- Self->AssimilatorPosition = TStringBuilder ()
617
+ Self->Assimilator . Position = TStringBuilder ()
558
618
<< " SkipBlocksUpTo# " << (SkipBlocksUpTo ? ToString (*SkipBlocksUpTo) : " <none>" ) << Endl
559
619
<< " SkipBarriersUpTo# " << (SkipBarriersUpTo
560
620
? TString (TStringBuilder () << std::get<0 >(*SkipBarriersUpTo) << ' :' << (int )std::get<1 >(*SkipBarriersUpTo))
561
621
: " <none>" ) << Endl
562
622
<< " SkipBlobsUpTo# " << (SkipBlobsUpTo ? SkipBlobsUpTo->ToString () : " <none>" );
563
623
}
564
624
625
+ void TAssimilator::UpdateBytesCopiedQ () {
626
+ while (BytesCopiedQ.size () >= 3 ) {
627
+ BytesCopiedQ.pop_front ();
628
+ }
629
+ BytesCopiedQ.emplace_back (TActivationContext::Monotonic (), Self->Assimilator .BytesCopied );
630
+
631
+ Self->Assimilator .CopySpeed = 0 ;
632
+ Self->Assimilator .CopyTimeRemaining = TDuration::Max ();
633
+
634
+ if (BytesCopiedQ.size () > 1 ) {
635
+ const auto & [frontTs, frontBytes] = BytesCopiedQ.front ();
636
+ const auto & [backTs, backBytes] = BytesCopiedQ.back ();
637
+ const TDuration deltaTs = backTs - frontTs;
638
+ const ui64 deltaBytes = backBytes - frontBytes;
639
+ if (deltaTs != TDuration::Zero ()) {
640
+ Self->Assimilator .CopySpeed = deltaBytes * 1'000'000 / deltaTs.MicroSeconds ();
641
+ }
642
+ if (deltaBytes) {
643
+ Self->Assimilator .CopyTimeRemaining = TDuration::MicroSeconds (Self->Assimilator .BytesToCopy *
644
+ deltaTs.MicroSeconds () / deltaBytes);
645
+ }
646
+ }
647
+
648
+ TActivationContext::Schedule (TDuration::Seconds (1 ), new IEventHandle (TEvPrivate::EvUpdateBytesCopiedQ, 0 ,
649
+ SelfId (), {}, nullptr , 0 ));
650
+ }
651
+
565
652
void TBlobDepot::TData::ExecuteTxCommitAssimilatedBlob (NKikimrProto::EReplyStatus status, TBlobSeqId blobSeqId,
566
653
TData::TKey key, ui32 notifyEventType, TActorId parentId, ui64 cookie, bool keep, bool doNotKeep) {
567
654
Self->Execute (std::make_unique<TTxCommitAssimilatedBlob>(Self, status, blobSeqId, std::move (key),
0 commit comments