Skip to content

Commit faa5f83

Browse files
authored
Fix shred interface in PDisk (#13926)
1 parent 30de712 commit faa5f83

File tree

4 files changed

+30
-23
lines changed

4 files changed

+30
-23
lines changed

ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,13 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
6565
ui32 SlotId = 0;
6666
bool IsShred = false;
6767
ui64 ShredGeneration = 0;
68+
ui64 Cookie;
6869

69-
TInitQueueItem(const TActorId sender, const ui64 shredGeneration)
70+
TInitQueueItem(const TActorId sender, const ui64 shredGeneration, ui64 cookie)
7071
: Sender(sender)
7172
, IsShred(true)
7273
, ShredGeneration(shredGeneration)
74+
, Cookie(cookie)
7375
{}
7476

7577
TInitQueueItem(TOwnerRound ownerRound, TVDiskID vDisk, ui64 pDiskGuid, TActorId sender, TActorId cutLogId,
@@ -299,7 +301,7 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
299301
Become(&TThis::StateError);
300302
for (TList<TInitQueueItem>::iterator it = InitQueue.begin(); it != InitQueue.end(); ++it) {
301303
if (it->IsShred) {
302-
Send(it->Sender, new NPDisk::TEvShredPDiskResult(NKikimrProto::CORRUPTED, it->ShredGeneration, errorReason));
304+
Send(it->Sender, new NPDisk::TEvShredPDiskResult(NKikimrProto::CORRUPTED, it->ShredGeneration, errorReason), 0, it->Cookie);
303305
if (PDisk) {
304306
PDisk->Mon.ShredPDisk.CountResponse();
305307
}
@@ -617,6 +619,7 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
617619
if (it->IsShred) {
618620
NPDisk::TEvShredPDisk evShredPDisk(it->ShredGeneration);
619621
auto* request = PDisk->ReqCreator.CreateFromEv<NPDisk::TShredPDisk>(evShredPDisk, it->Sender);
622+
request->Cookie = it->Cookie;
620623
PDisk->InputRequest(request);
621624
} else {
622625
NPDisk::TEvYardInit evInit(it->OwnerRound, it->VDisk, it->PDiskGuid, it->CutLogId, it->WhiteboardProxyId,
@@ -667,7 +670,7 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
667670

668671
void InitHandle(NPDisk::TEvShredPDisk::TPtr &ev) {
669672
const NPDisk::TEvShredPDisk &evShredPDisk = *ev->Get();
670-
InitQueue.emplace_back(ev->Sender, evShredPDisk.ShredGeneration);
673+
InitQueue.emplace_back(ev->Sender, evShredPDisk.ShredGeneration, ev->Cookie);
671674
}
672675

673676
void InitHandle(NPDisk::TEvPreShredCompactVDiskResult::TPtr &ev) {
@@ -843,7 +846,7 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
843846

844847
void ErrorHandle(NPDisk::TEvShredPDisk::TPtr &ev) {
845848
// Respond with error, can't shred in this state.
846-
Send(ev->Sender, new NPDisk::TEvShredPDiskResult(NKikimrProto::CORRUPTED, 0, StateErrorReason));
849+
Send(ev->Sender, new NPDisk::TEvShredPDiskResult(NKikimrProto::CORRUPTED, 0, StateErrorReason), 0, ev->Cookie);
847850
}
848851

849852
void ErrorHandle(NPDisk::TEvPreShredCompactVDiskResult::TPtr &ev) {
@@ -1060,6 +1063,7 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
10601063

10611064
void Handle(NPDisk::TEvShredPDisk::TPtr &ev) {
10621065
auto* request = PDisk->ReqCreator.CreateFromEv<TShredPDisk>(*ev->Get(), ev->Sender);
1066+
request->Cookie = ev->Cookie;
10631067
PDisk->InputRequest(request);
10641068
}
10651069

ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3935,7 +3935,7 @@ void TPDisk::ProgressShredState() {
39353935
<< " sends compact request to VDisk# " << data.VDiskId
39363936
<< " ownerId# " << ownerId
39373937
<< " request# " << compactRequest->ToString());
3938-
PCtx->ActorSystem->Send(data.CutLogId, compactRequest.Release());
3938+
PCtx->ActorSystem->Send(new IEventHandle(data.CutLogId, PCtx->PDiskActor, compactRequest.Release()));
39393939
data.LastShredGeneration = ShredGeneration;
39403940
data.ShredState = TOwnerData::VDISK_SHRED_STATE_COMPACT_REQUESTED;
39413941
}
@@ -3984,7 +3984,7 @@ void TPDisk::ProgressShredState() {
39843984
<< " sends shred request to VDisk# " << data.VDiskId
39853985
<< " ownerId# " << ownerId
39863986
<< " request# " << shredRequest->ToString());
3987-
PCtx->ActorSystem->Send(data.CutLogId, shredRequest.Release());
3987+
PCtx->ActorSystem->Send(new IEventHandle(data.CutLogId, PCtx->PDiskActor, shredRequest.Release()));
39883988
data.ShredState = TOwnerData::VDISK_SHRED_STATE_SHRED_REQUESTED;
39893989
data.LastShredGeneration = ShredGeneration;
39903990
}
@@ -4011,8 +4011,9 @@ void TPDisk::ProgressShredState() {
40114011
LOG_NOTICE_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED,
40124012
"Shred request is finished at PDisk# " << PCtx->PDiskId
40134013
<< " ShredGeneration# " << ShredGeneration);
4014-
for (TActorId &requester : ShredRequesters) {
4015-
PCtx->ActorSystem->Send(requester, new TEvShredPDiskResult(NKikimrProto::OK, ShredGeneration, ""));
4014+
for (auto& [requester, cookie] : ShredRequesters) {
4015+
PCtx->ActorSystem->Send(new IEventHandle(requester, PCtx->PDiskActor, new TEvShredPDiskResult(
4016+
NKikimrProto::OK, ShredGeneration, ""), 0, cookie));
40164017
}
40174018
ShredRequesters.clear();
40184019
}
@@ -4029,27 +4030,28 @@ void TPDisk::ProcessShredPDisk(TShredPDisk& request) {
40294030
TGuard<TMutex> guard(StateMutex);
40304031
if (request.ShredGeneration < ShredGeneration) {
40314032
guard.Release();
4032-
PCtx->ActorSystem->Send(request.Sender,
4033-
new TEvShredPDiskResult(NKikimrProto::RACE, request.ShredGeneration,
4034-
"A shred request with a higher generation is already in progress"));
4033+
PCtx->ActorSystem->Send(new IEventHandle(request.Sender, PCtx->PDiskActor, new TEvShredPDiskResult(
4034+
NKikimrProto::RACE, request.ShredGeneration, "A shred request with a higher generation is already in progress"),
4035+
0, request.Cookie));
40354036
return;
40364037
}
40374038
if (request.ShredGeneration == ShredGeneration) {
40384039
// Do nothing, since we already have a shred request with the same generation.
40394040
// Just add the sender to the list of requesters.
4040-
ShredRequesters.push_back(request.Sender);
4041+
ShredRequesters.emplace_back(request.Sender, request.Cookie);
40414042
return;
40424043
}
40434044
// ShredGeneration > request.ShredGeneration
40444045
if (ShredRequesters.size() > 0) {
4045-
for (TActorId &requester : ShredRequesters) {
4046-
PCtx->ActorSystem->Send(requester, new TEvShredPDiskResult(NKikimrProto::RACE, request.ShredGeneration,
4047-
"A shred request with a higher generation is received"));
4046+
for (auto& [requester, cookie] : ShredRequesters) {
4047+
PCtx->ActorSystem->Send(new IEventHandle(requester, PCtx->PDiskActor, new TEvShredPDiskResult(
4048+
NKikimrProto::RACE, request.ShredGeneration, "A shred request with a higher generation is received"), 0,
4049+
cookie));
40484050
}
40494051
ShredRequesters.clear();
40504052
}
40514053
ShredGeneration = request.ShredGeneration;
4052-
ShredRequesters.push_back(request.Sender);
4054+
ShredRequesters.emplace_back(request.Sender, request.Cookie);
40534055
ShredState = EShredStateSendPreShredCompactVDisk;
40544056
ProgressShredState();
40554057
}
@@ -4094,7 +4096,7 @@ void TPDisk::ProcessPreShredCompactVDiskResult(TPreShredCompactVDiskResult& requ
40944096
}
40954097
if (request.Status != NKikimrProto::OK) {
40964098
ShredState = EShredStateFailed;
4097-
for (TActorId &requester : ShredRequesters) {
4099+
for (auto& [requester, cookie] : ShredRequesters) {
40984100
TStringStream str;
40994101
str << "Shred request failed at PDisk# " << PCtx->PDiskId
41004102
<< " for shredGeneration# " << request.ShredGeneration
@@ -4103,8 +4105,8 @@ void TPDisk::ProcessPreShredCompactVDiskResult(TPreShredCompactVDiskResult& requ
41034105
<< " replied with PreShredCompactVDiskResult status# " << request.Status
41044106
<< " and ErrorReason# " << request.ErrorReason;
41054107
LOG_ERROR_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED, str.Str());
4106-
PCtx->ActorSystem->Send(requester, new TEvShredPDiskResult(NKikimrProto::ERROR, request.ShredGeneration,
4107-
str.Str()));
4108+
PCtx->ActorSystem->Send(new IEventHandle(requester, PCtx->PDiskActor, new TEvShredPDiskResult(
4109+
NKikimrProto::ERROR, request.ShredGeneration, str.Str()), 0, cookie));
41084110
}
41094111
ShredRequesters.clear();
41104112
return;
@@ -4153,7 +4155,7 @@ void TPDisk::ProcessShredVDiskResult(TShredVDiskResult& request) {
41534155
}
41544156
if (request.Status != NKikimrProto::OK) {
41554157
ShredState = EShredStateFailed;
4156-
for (TActorId &requester : ShredRequesters) {
4158+
for (auto& [requester, cookie] : ShredRequesters) {
41574159
TStringStream str;
41584160
str << "Shred request failed at PDisk# " << PCtx->PDiskId
41594161
<< " for shredGeneration# " << request.ShredGeneration
@@ -4162,8 +4164,8 @@ void TPDisk::ProcessShredVDiskResult(TShredVDiskResult& request) {
41624164
<< " replied with status# " << request.Status
41634165
<< " and ErrorReason# " << request.ErrorReason;
41644166
LOG_ERROR_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED, str.Str());
4165-
PCtx->ActorSystem->Send(requester, new TEvShredPDiskResult(NKikimrProto::ERROR, request.ShredGeneration,
4166-
str.Str()));
4167+
PCtx->ActorSystem->Send(new IEventHandle(requester, PCtx->PDiskActor, new TEvShredPDiskResult(
4168+
NKikimrProto::ERROR, request.ShredGeneration, str.Str()), 0, cookie));
41674169
}
41684170
ShredRequesters.clear();
41694171
return;

ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ class TPDisk : public IPDisk {
157157
};
158158
EShredState ShredState = EShredStateDefault;
159159
ui64 ShredGeneration = 0;
160-
std::deque<TActorId> ShredRequesters;
160+
std::deque<std::tuple<TActorId, ui64>> ShredRequesters;
161161

162162
// Chunks that are owned by killed owner, but have operations InFlight
163163
TVector<TChunkIdx> QuarantineChunks;

ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1144,6 +1144,7 @@ class TContinueReadMetadata : public TRequestBase {
11441144
class TShredPDisk : public TRequestBase {
11451145
public:
11461146
ui64 ShredGeneration;
1147+
ui64 Cookie;
11471148

11481149
TShredPDisk(NPDisk::TEvShredPDisk& ev, TActorId sender, TAtomicBase reqIdx)
11491150
: TRequestBase(sender, TReqId(TReqId::ShredPDisk, reqIdx), OwnerSystem, 0, NPriInternal::Other)

0 commit comments

Comments
 (0)