@@ -26,13 +26,15 @@ namespace NKikimr {
26
26
TLogoBlobID LastKey;
27
27
bool Eof;
28
28
std::deque<TLogoBlobID> DroppedBlobs;
29
+ TMilestoneQueue MilestoneQueue;
29
30
30
31
TEvReplPlanFinished (std::unique_ptr<TRecoveryMachine>&& recoveryMachine, const TLogoBlobID& lastKey, bool eof,
31
- std::deque<TLogoBlobID>&& droppedBlobs)
32
+ std::deque<TLogoBlobID>&& droppedBlobs, TMilestoneQueue&& milestoneQueue )
32
33
: RecoveryMachine(std::move(recoveryMachine))
33
34
, LastKey(lastKey)
34
35
, Eof(eof)
35
36
, DroppedBlobs(std::move(droppedBlobs))
37
+ , MilestoneQueue(std::move(milestoneQueue))
36
38
{}
37
39
};
38
40
@@ -52,6 +54,7 @@ namespace NKikimr {
52
54
std::deque<TLogoBlobID> DroppedBlobs;
53
55
ui64 QuantumBytes = 0 ;
54
56
bool AddingTasks = true ;
57
+ TMilestoneQueue MilestoneQueue;
55
58
56
59
public:
57
60
void Bootstrap (const TActorId& parentId) {
@@ -106,43 +109,83 @@ namespace NKikimr {
106
109
} else {
107
110
// scan through the index until we have enough blobs to recover or the time is out
108
111
const TBlobStorageGroupInfo::TTopology& topology = *ReplCtx->VCtx ->Top ;
109
- for (it.Seek (StartKey); it.Valid (); it.Next ()) {
110
- StartKey = it.GetCurKey ().LogoBlobID ();
112
+
113
+ if (StartKey == TLogoBlobID ()) {
114
+ it.SeekToFirst ();
115
+ } else {
116
+ it.Seek (StartKey);
117
+ if (it.Valid () && it.GetCurKey () == StartKey) {
118
+ it.Next ();
119
+ }
120
+ }
121
+
122
+ auto checkRestart = [&] {
111
123
if (++counter % 1024 == 0 && GetCycleCountFast () >= plannedEndTime) {
112
124
// we have event processing timer expired, restart processing later with new snapshot starting
113
125
// with current key
114
126
Send (ReplCtx->SkeletonId , new TEvTakeHullSnapshot (true ));
115
- return ;
116
- } else if (AddingTasks) {
127
+ return true ;
128
+ }
129
+ return false ;
130
+ };
131
+
132
+ if (AddingTasks) {
133
+ for (; it.Valid (); it.Next ()) {
134
+ if (checkRestart ()) {
135
+ return ;
136
+ }
137
+
138
+ StartKey = it.GetCurKey ().LogoBlobID ();
139
+
117
140
// we still have some space in recovery machine logic, so we can add new item
118
141
ProcessItem (it, *barriers, allowKeepFlags);
119
- } else {
120
- // no space in recovery machine logic, but we still have to count remaining work
121
- const TMemRecLogoBlob memRec = it.GetMemRec ();
122
- const TIngress ingress = memRec.GetIngress ();
123
- const auto parts = ingress.PartsWeMustHaveLocally (&topology, ReplCtx->VCtx ->ShortSelfVDisk ,
124
- StartKey) - ingress.LocalParts (topology.GType );
125
- if (!parts.Empty () && barriers->Keep (StartKey, memRec, {}, allowKeepFlags,
126
- true /* allowGarbageCollection*/ ).KeepData ) {
127
- ++ReplInfo->ItemsTotal ;
128
- ReplInfo->WorkUnitsTotal += StartKey.BlobSize ();
129
- }
142
+ MilestoneQueue.PopIfNeeded (StartKey);
130
143
131
- if (!KeyToResumeNextTime) {
132
- // this is first valid key that is not processed with ProcessItem, so we remember it to
133
- // start next quantum with this exact key
134
- KeyToResumeNextTime.emplace (StartKey);
144
+ if (!AddingTasks) { // we have finished adding tasks after this key, remember it
145
+ it.Next ();
146
+ Y_ABORT_UNLESS (!KeyToResumeNextTime);
147
+ if (it.Valid ()) {
148
+ KeyToResumeNextTime.emplace (it.GetCurKey ().LogoBlobID ());
149
+ }
150
+ break ;
135
151
}
136
152
}
153
+ eof = !it.Valid (); // we finish this quantum when there are no more tasks to generate
137
154
}
138
155
139
- // we shall run next quantum only if we have KeyToResumeNextTime filled in
140
- eof = !KeyToResumeNextTime;
156
+ for (; it.Valid (); it.Next ()) {
157
+ if (checkRestart ()) {
158
+ return ;
159
+ }
160
+
161
+ StartKey = it.GetCurKey ().LogoBlobID ();
162
+
163
+ // check the milestone queue, if we have requested blob
164
+ if (MilestoneQueue.Match (StartKey, &ReplInfo->ItemsTotal , &ReplInfo->WorkUnitsTotal )) {
165
+ break ;
166
+ }
167
+
168
+ // no space in recovery machine logic, but we still have to count remaining work
169
+ const TMemRecLogoBlob memRec = it.GetMemRec ();
170
+ const TIngress ingress = memRec.GetIngress ();
171
+ const auto parts = ingress.PartsWeMustHaveLocally (&topology, ReplCtx->VCtx ->ShortSelfVDisk ,
172
+ StartKey) - ingress.LocalParts (topology.GType );
173
+ if (!parts.Empty () && barriers->Keep (StartKey, memRec, {}, allowKeepFlags,
174
+ true /* allowGarbageCollection*/ ).KeepData ) {
175
+ ++ReplInfo->ItemsTotal ;
176
+ ReplInfo->WorkUnitsTotal += StartKey.BlobSize ();
177
+ MilestoneQueue.Push (StartKey, StartKey.BlobSize ());
178
+ }
179
+ }
180
+
181
+ if (!it.Valid ()) {
182
+ MilestoneQueue.Finish ();
183
+ }
141
184
}
142
185
143
186
// the planning stage has finished, issue reply to the job actor
144
187
Send (Recipient, new TEvReplPlanFinished (std::move (RecoveryMachine), KeyToResumeNextTime.value_or (TLogoBlobID ()),
145
- eof, std::move (DroppedBlobs)));
188
+ eof, std::move (DroppedBlobs), std::move (MilestoneQueue) ));
146
189
147
190
// finish processing for this actor
148
191
PassAway ();
@@ -219,13 +262,15 @@ namespace NKikimr {
219
262
const TLogoBlobID &startKey,
220
263
TEvReplFinished::TInfoPtr replInfo,
221
264
TBlobIdQueuePtr blobsToReplicatePtr,
222
- TBlobIdQueuePtr unreplicatedBlobsPtr)
265
+ TBlobIdQueuePtr unreplicatedBlobsPtr,
266
+ TMilestoneQueue milestoneQueue)
223
267
: ReplCtx(std::move(replCtx))
224
268
, GInfo(std::move(ginfo))
225
269
, StartKey(startKey)
226
270
, ReplInfo(replInfo)
227
271
, BlobsToReplicatePtr(std::move(blobsToReplicatePtr))
228
272
, UnreplicatedBlobsPtr(std::move(unreplicatedBlobsPtr))
273
+ , MilestoneQueue(std::move(milestoneQueue))
229
274
{}
230
275
};
231
276
@@ -265,6 +310,7 @@ namespace NKikimr {
265
310
TBlobIdQueuePtr BlobsToReplicatePtr;
266
311
TBlobIdQueuePtr UnreplicatedBlobsPtr;
267
312
TUnreplicatedBlobRecords UnreplicatedBlobRecords;
313
+ TMilestoneQueue MilestoneQueue;
268
314
std::optional<std::pair<TVDiskID, TActorId>> Donor;
269
315
270
316
// parameters from planner
@@ -305,7 +351,7 @@ namespace NKikimr {
305
351
for (const auto & proxy : DiskProxySet) {
306
352
dropDonor = dropDonor && proxy && proxy->NoTransientErrors ();
307
353
}
308
- ReplInfo->Finish (LastKey, Eof, Donor && dropDonor, std::move (UnreplicatedBlobRecords));
354
+ ReplInfo->Finish (LastKey, Eof, Donor && dropDonor, std::move (UnreplicatedBlobRecords), std::move (MilestoneQueue) );
309
355
310
356
TProxyStat stat;
311
357
for (const TVDiskProxyPtr& p : DiskProxySet) {
@@ -326,7 +372,8 @@ namespace NKikimr {
326
372
STLOG (PRI_DEBUG, BS_REPL, BSVR02, VDISKP (ReplCtx->VCtx ->VDiskLogPrefix , " THullReplJobActor::Bootstrap" ));
327
373
328
374
TimeAccount.SetState (ETimeState::PREPARE_PLAN);
329
- auto actor = std::make_unique<THullReplPlannerActor>(ReplCtx, GInfo, StartKey, ReplInfo, BlobsToReplicatePtr, UnreplicatedBlobsPtr);
375
+ auto actor = std::make_unique<THullReplPlannerActor>(ReplCtx, GInfo, StartKey, ReplInfo, BlobsToReplicatePtr,
376
+ UnreplicatedBlobsPtr, std::move (MilestoneQueue));
330
377
auto aid = RunInBatchPool (ActorContext (), actor.release ());
331
378
ActiveActors.Insert (aid, __FILE__, __LINE__, TActivationContext::AsActorContext (), NKikimrServices::BLOBSTORAGE);
332
379
Become (&TThis::StatePreparePlan);
@@ -338,6 +385,7 @@ namespace NKikimr {
338
385
RecoveryMachine = std::move (ev->Get ()->RecoveryMachine );
339
386
LastKey = ev->Get ()->LastKey ;
340
387
Eof = ev->Get ()->Eof ;
388
+ MilestoneQueue = std::move (ev->Get ()->MilestoneQueue );
341
389
342
390
for (const TLogoBlobID& id : ev->Get ()->DroppedBlobs ) {
343
391
DropUnreplicatedBlobRecord (id);
@@ -940,7 +988,8 @@ namespace NKikimr {
940
988
TBlobIdQueuePtr&& blobsToReplicatePtr,
941
989
TBlobIdQueuePtr&& unreplicatedBlobsPtr,
942
990
const std::optional<std::pair<TVDiskID, TActorId>>& donor,
943
- TUnreplicatedBlobRecords&& ubr)
991
+ TUnreplicatedBlobRecords&& ubr,
992
+ TMilestoneQueue&& milestoneQueue)
944
993
: TActorBootstrapped<THullReplJobActor>()
945
994
, ReplCtx(std::move(replCtx))
946
995
, GInfo(ReplCtx->GInfo) // it is safe to take it here
@@ -956,6 +1005,7 @@ namespace NKikimr {
956
1005
, BlobsToReplicatePtr(std::move(blobsToReplicatePtr))
957
1006
, UnreplicatedBlobsPtr(std::move(unreplicatedBlobsPtr))
958
1007
, UnreplicatedBlobRecords(std::move(ubr))
1008
+ , MilestoneQueue(std::move(milestoneQueue))
959
1009
, Donor(donor)
960
1010
{
961
1011
if (Donor) {
@@ -987,10 +1037,12 @@ namespace NKikimr {
987
1037
TBlobIdQueuePtr blobsToReplicatePtr,
988
1038
TBlobIdQueuePtr unreplicatedBlobsPtr,
989
1039
const std::optional<std::pair<TVDiskID, TActorId>>& donor,
990
- TUnreplicatedBlobRecords&& ubr)
1040
+ TUnreplicatedBlobRecords&& ubr,
1041
+ TMilestoneQueue&& milestoneQueue)
991
1042
{
992
1043
return new THullReplJobActor (std::move (replCtx), parentId, startKey, std::move (queueActorMapPtr),
993
- std::move (blobsToReplicatePtr), std::move (unreplicatedBlobsPtr), donor, std::move (ubr));
1044
+ std::move (blobsToReplicatePtr), std::move (unreplicatedBlobsPtr), donor, std::move (ubr),
1045
+ std::move (milestoneQueue));
994
1046
}
995
1047
996
1048
} // NKikimr
0 commit comments