Skip to content

Commit 0e41d1c

Browse files
alexvruvporyadke
authored andcommitted
Fix scrubbing and flapping unittest (ydb-platform#1640)
1 parent c12cb99 commit 0e41d1c

11 files changed

+217
-96
lines changed

ydb/core/blobstorage/backpressure/queue.h

+4
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,10 @@ class TBlobStorageQueue {
180180
return Queues.InFlight.size();
181181
}
182182

183+
ui64 GetInFlightCost() const {
184+
return InFlightCost;
185+
}
186+
183187
void UpdateCostModel(TInstant now, const NKikimrBlobStorage::TVDiskCostSettings& settings,
184188
const TBlobStorageGroupType& type);
185189
void InvalidateCosts();

ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp

+5-1
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,11 @@ class TVDiskBackpressureClientActor : public TActorBootstrapped<TVDiskBackpressu
365365
<< " msgId# " << msgId << " sequenceId# " << sequenceId
366366
<< " expectedMsgId# " << expectedMsgId << " expectedSequenceId# " << expectedSequenceId
367367
<< " status# " << NKikimrProto::EReplyStatus_Name(status)
368-
<< " ws# " << NKikimrBlobStorage::TWindowFeedback_EStatus_Name(ws));
368+
<< " ws# " << NKikimrBlobStorage::TWindowFeedback_EStatus_Name(ws)
369+
<< " InFlightCost# " << Queue.GetInFlightCost()
370+
<< " InFlightCount# " << Queue.InFlightCount()
371+
<< " ItemsWaiting# " << Queue.GetItemsWaiting()
372+
<< " BytesWaiting# " << Queue.GetBytesWaiting());
369373

370374
switch (ws) {
371375
case NKikimrBlobStorage::TWindowFeedback::IncorrectMsgId:

ydb/core/blobstorage/backpressure/queue_backpressure_server.h

+1-3
Original file line numberDiff line numberDiff line change
@@ -196,9 +196,7 @@ namespace NKikimr {
196196
}
197197
}
198198

199-
TWindowStatus *Processed(bool checkMsgId, const TMessageId &msgId, ui64 cost, TWindowStatus *opStatus) {
200-
Y_UNUSED(checkMsgId);
201-
Y_UNUSED(msgId);
199+
TWindowStatus *Processed(bool /*checkMsgId*/, const TMessageId& /*msgId*/, ui64 cost, TWindowStatus *opStatus) {
202200
Y_ABORT_UNLESS(Cost >= cost);
203201
Cost -= cost;
204202
--InFlight;

ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp

+3-2
Original file line numberDiff line numberDiff line change
@@ -293,12 +293,13 @@ void TGetImpl::PrepareRequests(TLogContext &logCtx, TDeque<std::unique_ptr<TEvBl
293293
msg->SetId(ReaderTabletData->Id);
294294
msg->SetGeneration(ReaderTabletData->Generation);
295295
}
296-
R_LOG_DEBUG_SX(logCtx, "BPG14", "Send get to orderNumber# " << get.OrderNumber
297-
<< " vget# " << vget->ToString());
298296
}
299297

300298
for (auto& vget : gets) {
301299
if (vget) {
300+
R_LOG_DEBUG_SX(logCtx, "BPG14", "Send get to orderNumber# "
301+
<< Info->GetTopology().GetOrderNumber(VDiskIDFromVDiskID(vget->Record.GetVDiskID()))
302+
<< " vget# " << vget->ToString());
302303
outVGets.push_back(std::move(vget));
303304
++RequestIndex;
304305
}

ydb/core/blobstorage/pdisk/mock/pdisk_mock.cpp

+9
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,11 @@ struct TPDiskMockState::TImpl {
209209
}
210210
}
211211

212+
bool HasCorruptedArea(ui32 chunkIdx, ui32 begin, ui32 end) {
213+
const ui64 chunkBegin = ui64(chunkIdx) * ChunkSize;
214+
return static_cast<bool>(Corrupted & TIntervalSet{chunkBegin + begin, chunkBegin + end});
215+
}
216+
212217
std::set<ui32> GetChunks() {
213218
std::set<ui32> res;
214219
for (auto& [ownerId, owner] : Owners) {
@@ -293,6 +298,10 @@ void TPDiskMockState::SetCorruptedArea(ui32 chunkIdx, ui32 begin, ui32 end, bool
293298
Impl->SetCorruptedArea(chunkIdx, begin, end, enabled);
294299
}
295300

301+
bool TPDiskMockState::HasCorruptedArea(ui32 chunkIdx, ui32 begin, ui32 end) {
302+
return Impl->HasCorruptedArea(chunkIdx, begin, end);
303+
}
304+
296305
std::set<ui32> TPDiskMockState::GetChunks() {
297306
return Impl->GetChunks();
298307
}

ydb/core/blobstorage/pdisk/mock/pdisk_mock.h

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ namespace NKikimr {
2727
~TPDiskMockState();
2828

2929
void SetCorruptedArea(ui32 chunkIdx, ui32 begin, ui32 end, bool enabled);
30+
bool HasCorruptedArea(ui32 chunkIdx, ui32 begin, ui32 end);
3031
std::set<ui32> GetChunks();
3132
TMaybe<NPDisk::TOwnerRound> GetOwnerRound(const TVDiskID& vDiskId) const;
3233
ui32 GetChunkSize() const;

ydb/core/blobstorage/ut_blobstorage/scrub_fast.cpp

+109-43
Original file line numberDiff line numberDiff line change
@@ -16,64 +16,130 @@ void Test() {
1616

1717
TString data = TString::Uninitialized(8_MB);
1818
memset(data.Detach(), 'X', data.size());
19-
TLogoBlobID id(1, 1, 1, 0, data.size(), 0);
20-
21-
{ // write data to group
22-
TActorId sender = runtime->AllocateEdgeActor(1);
23-
runtime->WrapInActorContext(sender, [&] {
24-
SendToBSProxy(sender, info->GroupID, new TEvBlobStorage::TEvPut(id, data, TInstant::Max()));
25-
});
26-
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(sender);
27-
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
28-
}
2919

30-
auto checkReadable = [&](NKikimrProto::EReplyStatus status) {
31-
TActorId sender = runtime->AllocateEdgeActor(1);
32-
runtime->WrapInActorContext(sender, [&] {
33-
SendToBSProxy(sender, info->GroupID, new TEvBlobStorage::TEvGet(id, 0, 0, TInstant::Max(),
34-
NKikimrBlobStorage::EGetHandleClass::FastRead));
35-
});
36-
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvGetResult>(sender);
37-
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
38-
UNIT_ASSERT_VALUES_EQUAL(res->Get()->ResponseSz, 1);
39-
auto& r = res->Get()->Responses[0];
40-
UNIT_ASSERT_VALUES_EQUAL(r.Status, status);
41-
if (status == NKikimrProto::OK) {
42-
UNIT_ASSERT_VALUES_EQUAL(r.Buffer.ConvertToString(), data);
20+
for (ui32 step = 1; step < 100; ++step) {
21+
TLogoBlobID id(1, 1, step, 0, data.size(), 0);
22+
23+
{ // write data to group
24+
TActorId sender = runtime->AllocateEdgeActor(1);
25+
runtime->WrapInActorContext(sender, [&] {
26+
SendToBSProxy(sender, info->GroupID, new TEvBlobStorage::TEvPut(id, data, TInstant::Max()));
27+
});
28+
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(sender);
29+
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
4330
}
44-
};
4531

46-
checkReadable(NKikimrProto::OK);
32+
auto checkReadable = [&] {
33+
TActorId sender = runtime->AllocateEdgeActor(1);
34+
runtime->WrapInActorContext(sender, [&] {
35+
SendToBSProxy(sender, info->GroupID, new TEvBlobStorage::TEvGet(id, 0, 0, TInstant::Max(),
36+
NKikimrBlobStorage::EGetHandleClass::FastRead));
37+
});
38+
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvGetResult>(sender);
39+
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
40+
UNIT_ASSERT_VALUES_EQUAL(res->Get()->ResponseSz, 1);
41+
auto& r = res->Get()->Responses[0];
42+
UNIT_ASSERT_VALUES_EQUAL(r.Status, NKikimrProto::OK);
43+
UNIT_ASSERT_VALUES_EQUAL(r.Buffer.ConvertToString(), data);
44+
45+
ui32 partsMask = 0;
46+
for (ui32 i = 0; i < info->GetTotalVDisksNum(); ++i) {
47+
const TVDiskID& vdiskId = info->GetVDiskId(i);
48+
env.WithQueueId(vdiskId, NKikimrBlobStorage::EVDiskQueueId::GetFastRead, [&](TActorId queueId) {
49+
const TActorId sender = runtime->AllocateEdgeActor(1);
50+
auto ev = TEvBlobStorage::TEvVGet::CreateExtremeDataQuery(vdiskId, TInstant::Max(),
51+
NKikimrBlobStorage::EGetHandleClass::FastRead);
52+
ev->AddExtremeQuery(id, 0, 0);
53+
runtime->Send(new IEventHandle(queueId, sender, ev.release()), sender.NodeId());
54+
auto reply = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvVGetResult>(sender);
55+
auto& record = reply->Get()->Record;
56+
UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrProto::OK);
57+
UNIT_ASSERT_VALUES_EQUAL(record.ResultSize(), 1);
58+
for (const auto& result : record.GetResult()) {
59+
if (result.GetStatus() == NKikimrProto::OK) {
60+
const TLogoBlobID& id = LogoBlobIDFromLogoBlobID(result.GetBlobID());
61+
UNIT_ASSERT(id.PartId());
62+
const ui32 partIdx = id.PartId() - 1;
63+
const ui32 mask = 1 << partIdx;
64+
UNIT_ASSERT(!(partsMask & mask));
65+
partsMask |= mask;
66+
} else {
67+
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NKikimrProto::NODATA);
68+
}
69+
}
70+
});
71+
}
72+
UNIT_ASSERT_VALUES_EQUAL(partsMask, (1 << info->Type.TotalPartCount()) - 1);
73+
};
4774

48-
for (ui32 i = 0; i < info->GetTotalVDisksNum(); ++i) {
49-
const TActorId vdiskActorId = info->GetActorId(i);
75+
checkReadable();
5076

51-
ui32 nodeId, pdiskId;
52-
std::tie(nodeId, pdiskId, std::ignore) = DecomposeVDiskServiceId(vdiskActorId);
53-
auto it = env.PDiskMockStates.find(std::make_pair(nodeId, pdiskId));
54-
Y_ABORT_UNLESS(it != env.PDiskMockStates.end());
77+
ui32 mask = 0;
5578

56-
const TActorId sender = runtime->AllocateEdgeActor(vdiskActorId.NodeId());
57-
env.Runtime->Send(new IEventHandle(vdiskActorId, sender, new TEvBlobStorage::TEvCaptureVDiskLayout), sender.NodeId());
58-
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvCaptureVDiskLayoutResult>(sender);
79+
for (ui32 i = 0; i < info->GetTotalVDisksNum(); ++i) {
80+
const TActorId vdiskActorId = info->GetActorId(i);
5981

60-
for (auto& item : res->Get()->Layout) {
61-
using T = TEvBlobStorage::TEvCaptureVDiskLayoutResult;
62-
if (item.Database == T::EDatabase::LogoBlobs && item.RecordType == T::ERecordType::HugeBlob) {
63-
const TDiskPart& part = item.Location;
64-
it->second->SetCorruptedArea(part.ChunkIdx, part.Offset, part.Offset + part.Size, true);
65-
break;
82+
ui32 nodeId, pdiskId;
83+
std::tie(nodeId, pdiskId, std::ignore) = DecomposeVDiskServiceId(vdiskActorId);
84+
auto it = env.PDiskMockStates.find(std::make_pair(nodeId, pdiskId));
85+
Y_ABORT_UNLESS(it != env.PDiskMockStates.end());
86+
87+
const TActorId sender = runtime->AllocateEdgeActor(vdiskActorId.NodeId());
88+
env.Runtime->Send(new IEventHandle(vdiskActorId, sender, new TEvBlobStorage::TEvCaptureVDiskLayout), sender.NodeId());
89+
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvCaptureVDiskLayoutResult>(sender);
90+
91+
for (auto& item : res->Get()->Layout) {
92+
using T = TEvBlobStorage::TEvCaptureVDiskLayoutResult;
93+
if (item.Database == T::EDatabase::LogoBlobs && item.RecordType == T::ERecordType::HugeBlob && item.BlobId.FullID() == id) {
94+
const TDiskPart& part = item.Location;
95+
mask |= 1 << i;
96+
it->second->SetCorruptedArea(part.ChunkIdx, part.Offset, part.Offset + 1 + RandomNumber(part.Size), true);
97+
break;
98+
}
6699
}
100+
101+
checkReadable();
67102
}
68103

69-
checkReadable(NKikimrProto::OK);
70-
}
104+
env.Sim(TDuration::Seconds(60));
71105

72-
env.Sim(TDuration::Seconds(60));
106+
for (ui32 i = 0; i < info->GetTotalVDisksNum(); ++i) {
107+
if (~mask >> i & 1) {
108+
continue;
109+
}
110+
111+
const TActorId vdiskActorId = info->GetActorId(i);
112+
113+
ui32 nodeId, pdiskId;
114+
std::tie(nodeId, pdiskId, std::ignore) = DecomposeVDiskServiceId(vdiskActorId);
115+
auto it = env.PDiskMockStates.find(std::make_pair(nodeId, pdiskId));
116+
Y_ABORT_UNLESS(it != env.PDiskMockStates.end());
117+
118+
const TActorId sender = runtime->AllocateEdgeActor(vdiskActorId.NodeId());
119+
env.Runtime->Send(new IEventHandle(vdiskActorId, sender, new TEvBlobStorage::TEvCaptureVDiskLayout), sender.NodeId());
120+
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvCaptureVDiskLayoutResult>(sender);
121+
122+
bool anyPartReadable = false;
123+
124+
for (auto& item : res->Get()->Layout) {
125+
using T = TEvBlobStorage::TEvCaptureVDiskLayoutResult;
126+
if (item.Database == T::EDatabase::LogoBlobs && item.RecordType == T::ERecordType::HugeBlob && item.BlobId.FullID() == id) {
127+
const TDiskPart& part = item.Location;
128+
anyPartReadable = !it->second->HasCorruptedArea(part.ChunkIdx, part.Offset, part.Offset + part.Size);
129+
if (anyPartReadable) {
130+
break;
131+
}
132+
}
133+
}
134+
135+
UNIT_ASSERT(anyPartReadable);
136+
}
137+
}
73138
}
74139

75140
Y_UNIT_TEST_SUITE(ScrubFast) {
76141
Y_UNIT_TEST(SingleBlob) {
77142
Test();
78143
}
79144
}
145+

ydb/core/blobstorage/vdisk/scrub/blob_recovery_impl.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -99,15 +99,15 @@ namespace NKikimr {
9999

100100
// a map to fill upon receiving VGet result
101101
struct TPerBlobInfo {
102-
const TInstant Deadline;
103102
std::weak_ptr<TInFlightContext> Context;
104103
TEvRecoverBlobResult::TItem *Item; // item to update
105104
ui32 BlobReplyCounter = 0; // number of unreplied queries for this blob
106105
};
107106
std::unordered_multimap<TLogoBlobID, TPerBlobInfo, THash<TLogoBlobID>> VGetResultMap;
107+
std::set<std::tuple<TVDiskIdShort, TLogoBlobID>> GetsInFlight;
108108

109109
void AddBlobQuery(const TLogoBlobID& id, NMatrix::TVectorType needed, const std::shared_ptr<TInFlightContext>& context, TEvRecoverBlobResult::TItem *item);
110-
void AddExtremeQuery(const TVDiskID& vdiskId, const TLogoBlobID& id, TInstant deadline, ui32 worstReplySize);
110+
void AddExtremeQuery(const TVDiskID& vdiskId, const TLogoBlobID& id, TInstant deadline, ui32 idxInSubgroup);
111111
void SendPendingQueries();
112112
void Handle(TEvBlobStorage::TEvVGetResult::TPtr ev);
113113
NKikimrProto::EReplyStatus ProcessItemData(TEvRecoverBlobResult::TItem& item);

0 commit comments

Comments
 (0)