Skip to content

Commit d32d5d0

Browse files
authored
Improve replication code (#6501)
1 parent 456c93b commit d32d5d0

File tree

3 files changed

+54
-24
lines changed

3 files changed

+54
-24
lines changed

ydb/core/blobstorage/vdisk/repl/blobstorage_hullrepljob.cpp

+11-17
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,8 @@ namespace NKikimr {
5858
Recipient = parentId;
5959

6060
// count unreplicated so far blobs in this work too
61-
for (const TLogoBlobID& id : *UnreplicatedBlobsPtr) {
62-
ReplInfo->WorkUnitsTotal += id.BlobSize();
63-
}
64-
ReplInfo->ItemsTotal += UnreplicatedBlobsPtr->size();
61+
ReplInfo->WorkUnitsTotal += UnreplicatedBlobsPtr->GetNumWorkUnits();
62+
ReplInfo->ItemsTotal += UnreplicatedBlobsPtr->GetNumItems();
6563

6664
// prepare the recovery machine
6765
RecoveryMachine = std::make_unique<TRecoveryMachine>(ReplCtx, ReplInfo);
@@ -88,12 +86,12 @@ namespace NKikimr {
8886

8987
if (BlobsToReplicatePtr) {
9088
// iterate over queue items and match them with iterator
91-
for (; !BlobsToReplicatePtr->empty() && AddingTasks; BlobsToReplicatePtr->pop_front()) {
89+
for (; !BlobsToReplicatePtr->IsEmpty() && AddingTasks; BlobsToReplicatePtr->PopFront()) {
9290
if (++counter % 1024 == 0 && GetCycleCountFast() >= plannedEndTime) {
9391
Send(ReplCtx->SkeletonId, new TEvTakeHullSnapshot(true));
9492
return;
9593
} else {
96-
const TLogoBlobID& key = BlobsToReplicatePtr->front();
94+
const TLogoBlobID& key = BlobsToReplicatePtr->Front();
9795
it.Seek(key);
9896
const bool processed = it.Valid() && it.GetCurKey().LogoBlobID() == key &&
9997
ProcessItem(it, *barriers, allowKeepFlags);
@@ -102,13 +100,9 @@ namespace NKikimr {
102100
}
103101
}
104102
}
105-
if (!AddingTasks) {
106-
for (const TLogoBlobID& key : *BlobsToReplicatePtr) {
107-
ReplInfo->WorkUnitsTotal += key.BlobSize();
108-
}
109-
ReplInfo->ItemsTotal += BlobsToReplicatePtr->size();
110-
}
111-
eof = BlobsToReplicatePtr->empty();
103+
ReplInfo->WorkUnitsTotal += BlobsToReplicatePtr->GetNumWorkUnits();
104+
ReplInfo->ItemsTotal += BlobsToReplicatePtr->GetNumItems();
105+
eof = BlobsToReplicatePtr->IsEmpty();
112106
} else {
113107
// scan through the index until we have enough blobs to recover or the time is out
114108
const TBlobStorageGroupInfo::TTopology& topology = *ReplCtx->VCtx->Top;
@@ -360,8 +354,8 @@ namespace NKikimr {
360354
(ReplItemsRemaining, (ui64)mon.ReplItemsRemaining()),
361355
(LastKey, LastKey),
362356
(Eof, Eof),
363-
(BlobsToReplicatePtr.size, ssize_t(BlobsToReplicatePtr ? BlobsToReplicatePtr->size() : (ssize_t)-1)),
364-
(UnreplicatedBlobsPtr.size, UnreplicatedBlobsPtr->size()));
357+
(BlobsToReplicatePtr.size, ssize_t(BlobsToReplicatePtr ? BlobsToReplicatePtr->GetNumItems() : (ssize_t)-1)),
358+
(UnreplicatedBlobsPtr.size, UnreplicatedBlobsPtr->GetNumItems()));
365359
}
366360

367361
mon.ReplWorkUnitsRemaining() = ReplInfo->WorkUnitsTotal;
@@ -651,7 +645,7 @@ namespace NKikimr {
651645
(RecoveryQueueSize, RecoveryQueue.size()));
652646

653647
// sort unreplicated blobs vector as it may contain records in incorrect order due to phantom checking
654-
std::sort(UnreplicatedBlobsPtr->begin(), UnreplicatedBlobsPtr->end());
648+
UnreplicatedBlobsPtr->Sort();
655649
return true;
656650
}
657651

@@ -762,7 +756,7 @@ namespace NKikimr {
762756
} else if (record.LooksLikePhantom) {
763757
++ReplCtx->MonGroup.ReplPhantomBlobsWithProblems();
764758
}
765-
UnreplicatedBlobsPtr->push_back(item.Id);
759+
UnreplicatedBlobsPtr->Push(item.Id);
766760
}
767761

768762
void DropUnreplicatedBlobRecord(const TLogoBlobID& id) {

ydb/core/blobstorage/vdisk/repl/blobstorage_repl.cpp

+6-6
Original file line numberDiff line numberDiff line change
@@ -367,15 +367,15 @@ namespace NKikimr {
367367
BlobsToReplicatePtr = std::exchange(UnreplicatedBlobsPtr, std::make_shared<TBlobIdQueue>());
368368

369369
#ifndef NDEBUG
370-
Y_VERIFY_DEBUG_S(BlobsToReplicatePtr->size() == UnreplicatedBlobRecords.size(),
371-
"BlobsToReplicatePtr->size# " << BlobsToReplicatePtr->size()
370+
Y_VERIFY_DEBUG_S(BlobsToReplicatePtr->GetNumItems() == UnreplicatedBlobRecords.size(),
371+
"BlobsToReplicatePtr->size# " << BlobsToReplicatePtr->GetNumItems()
372372
<< " UnreplicatedBlobRecords.size# " << UnreplicatedBlobRecords.size());
373-
for (const TLogoBlobID& id : *BlobsToReplicatePtr) {
373+
for (const TLogoBlobID& id : BlobsToReplicatePtr->Queue) {
374374
Y_DEBUG_ABORT_UNLESS(UnreplicatedBlobRecords.contains(id));
375375
}
376376
#endif
377377

378-
if (BlobsToReplicatePtr->empty()) {
378+
if (BlobsToReplicatePtr->IsEmpty()) {
379379
// no more blobs to replicate -- consider replication finished
380380
finished = true;
381381
for (const auto& donor : std::exchange(DonorQueue, {})) {
@@ -402,7 +402,7 @@ namespace NKikimr {
402402

403403
if (finished) {
404404
STLOG(PRI_DEBUG, BS_REPL, BSVR17, VDISKP(ReplCtx->VCtx->VDiskLogPrefix, "REPL COMPLETED"),
405-
(BlobsToReplicate, BlobsToReplicatePtr->size()));
405+
(BlobsToReplicate, BlobsToReplicatePtr->GetNumItems()));
406406
LastReplEnd = now;
407407

408408
if (State == WaitQueues || State == Replication) {
@@ -412,7 +412,7 @@ namespace NKikimr {
412412
ResetReplProgressTimer(true);
413413

414414
Become(&TThis::StateRelax);
415-
if (!BlobsToReplicatePtr->empty()) {
415+
if (!BlobsToReplicatePtr->IsEmpty()) {
416416
// try again for unreplicated blobs in some future
417417
State = Relaxation;
418418
Schedule(ReplCtx->VDiskCfg->ReplTimeInterval, new TEvents::TEvWakeup);

ydb/core/blobstorage/vdisk/repl/blobstorage_repl.h

+37-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,43 @@ namespace NKikimr {
1111
struct TProxyStat;
1212
};
1313

14-
using TBlobIdQueue = std::deque<TLogoBlobID>;
14+
struct TBlobIdQueue {
15+
std::deque<TLogoBlobID> Queue;
16+
ui64 WorkUnits = 0;
17+
18+
void Push(const TLogoBlobID& id) {
19+
WorkUnits += id.BlobSize();
20+
Queue.push_back(id);
21+
}
22+
23+
void PopFront() {
24+
WorkUnits -= Queue.front().BlobSize();
25+
Queue.pop_front();
26+
}
27+
28+
bool IsEmpty() const {
29+
return Queue.empty();
30+
}
31+
32+
TLogoBlobID Front() const {
33+
return Queue.front();
34+
}
35+
36+
size_t GetNumItems() const {
37+
return Queue.size();
38+
}
39+
40+
ui64 GetNumWorkUnits() const {
41+
return WorkUnits;
42+
}
43+
44+
void Sort() {
45+
if (!std::is_sorted(Queue.begin(), Queue.end())) {
46+
std::sort(Queue.begin(), Queue.end());
47+
}
48+
}
49+
};
50+
1551
using TBlobIdQueuePtr = std::shared_ptr<TBlobIdQueue>;
1652

1753
struct TUnreplicatedBlobRecord { // for monitoring purposes

0 commit comments

Comments
 (0)