Skip to content

Add deadlines for each stage of patching #5677

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 200 additions & 14 deletions ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp

Large diffs are not rendered by default.

91 changes: 71 additions & 20 deletions ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,18 @@ enum class ENaivePatchCase {
ErrorOnPut,
};

#define CASE_TO_RETURN_STRING(cs) \
case cs: return #cs \
// end CASE_TO_RETURN_STRING
TString ToString(ENaivePatchCase cs) {
switch (cs) {
CASE_TO_RETURN_STRING(ENaivePatchCase::Ok);
CASE_TO_RETURN_STRING(ENaivePatchCase::ErrorOnGetItem);
CASE_TO_RETURN_STRING(ENaivePatchCase::ErrorOnGet);
CASE_TO_RETURN_STRING(ENaivePatchCase::ErrorOnPut);
}
}

NKikimrProto::EReplyStatus GetPatchResultStatus(ENaivePatchCase naiveCase) {
switch (naiveCase) {
case ENaivePatchCase::Ok:
Expand Down Expand Up @@ -156,6 +168,17 @@ enum class EVPatchCase {
Custom,
};

TString ToString(EVPatchCase cs) {
switch (cs) {
CASE_TO_RETURN_STRING(EVPatchCase::Ok);
CASE_TO_RETURN_STRING(EVPatchCase::OneErrorAndAllPartExistInStart);
CASE_TO_RETURN_STRING(EVPatchCase::OnePartLostInStart);
CASE_TO_RETURN_STRING(EVPatchCase::DeadGroupInStart);
CASE_TO_RETURN_STRING(EVPatchCase::ErrorDuringVPatchDiff);
CASE_TO_RETURN_STRING(EVPatchCase::Custom);
}
}

NKikimrProto::EReplyStatus GetPatchResultStatus(EVPatchCase vpatchCase) {
switch (vpatchCase) {
case EVPatchCase::Ok:
Expand Down Expand Up @@ -249,6 +272,15 @@ enum class EMovedPatchCase {
Error
};

TString ToString(EMovedPatchCase cs) {
switch (cs) {
CASE_TO_RETURN_STRING(EMovedPatchCase::Ok);
CASE_TO_RETURN_STRING(EMovedPatchCase::Error);
}
}

#undef CASE_TO_RETURN_STRING

NKikimrProto::EReplyStatus GetPatchResultStatus(EMovedPatchCase movedCase) {
switch (movedCase) {
case EMovedPatchCase::Ok:
Expand Down Expand Up @@ -289,7 +321,7 @@ void ReceivePatchResult(TTestBasicRuntime &runtime, const TTestArgs &args, NKiki
}

void ConductGet(TTestBasicRuntime &runtime, const TTestArgs &args, ENaivePatchCase naiveCase) {
CTEST << "ConductGet: Start\n";
CTEST << "ConductGet: Start NaiveCase: " << ToString(naiveCase) << "\n";
NKikimrProto::EReplyStatus resultStatus = GetGetResultStatus(naiveCase);
TAutoPtr<IEventHandle> handle;
TEvBlobStorage::TEvGet *get = runtime.GrabEdgeEventRethrow<TEvBlobStorage::TEvGet>(handle);
Expand Down Expand Up @@ -328,10 +360,10 @@ TString MakePatchedBuffer(const TTestArgs &args) {
void ConductPut(TTestBasicRuntime &runtime, const TTestArgs &args, ENaivePatchCase naiveCase) {
NKikimrProto::EReplyStatus resultStatus = GetPutResultStatus(naiveCase);
if (resultStatus == NKikimrProto::UNKNOWN) {
CTEST << "ConductPut: Skip\n";
CTEST << "ConductPut: Skip NaiveCase: " << ToString(naiveCase) << "\n";
return;
}
CTEST << "ConductPut: Start\n";
CTEST << "ConductPut: Start NaiveCase: " << ToString(naiveCase) << "\n";
TAutoPtr<IEventHandle> handle;
TEvBlobStorage::TEvPut *put = runtime.GrabEdgeEventRethrow<TEvBlobStorage::TEvPut>(handle);
UNIT_ASSERT_VALUES_EQUAL(put->Id, args.PatchedId);
Expand All @@ -346,22 +378,35 @@ void ConductPut(TTestBasicRuntime &runtime, const TTestArgs &args, ENaivePatchCa
}

void ConductNaivePatch(TTestBasicRuntime &runtime, const TTestArgs &args, ENaivePatchCase naiveCase) {
CTEST << "ConductNaivePatch: Start\n";
CTEST << "ConductNaivePatch: Start NaiveCase: " << ToString(naiveCase) << Endl;
ConductGet(runtime, args, naiveCase);
ConductPut(runtime, args, naiveCase);
NKikimrProto::EReplyStatus resultStatus = GetPatchResultStatus(naiveCase);
ReceivePatchResult(runtime, args, resultStatus);
CTEST << "ConductNaivePatch: Finish\n";
}

template <typename InnerType>
TString ToString(const TVector<InnerType> &lst) {
TStringBuilder bld;
bld << '[';
for (ui32 idx = 0; idx < lst.size(); ++idx) {
if (idx) {
bld << ", ";
}
bld << lst[idx];
}
bld << ']';
return bld;
}

void ConductVPatchStart(TTestBasicRuntime &runtime, const TDSProxyEnv &env, const TTestArgs &args,
EVPatchCase naiveCase, TVDiskPointer vdiskPointer)
EVPatchCase vpatchCase, TVDiskPointer vdiskPointer)
{
auto [vdiskIdx, idxInSubgroup] = vdiskPointer.GetIndecies(env, args.OriginalId.Hash());
CTEST << "ConductVPatchStart: Start vdiskIdx# " << vdiskIdx << " idxInSubgroup# " << idxInSubgroup << "\n";
CTEST << "ConductVPatchStart: Start vdiskIdx# " << vdiskIdx << " idxInSubgroup# " << idxInSubgroup << " VPatchCase: " << ToString(vpatchCase) << "\n";
TVDiskID vdisk = env.Info->GetVDiskInSubgroup(idxInSubgroup, args.OriginalId.Hash());
auto [status, parts] = GetVPatchFoundPartsStatus(env, args, naiveCase, vdiskPointer);
auto [status, parts] = GetVPatchFoundPartsStatus(env, args, vpatchCase, vdiskPointer);

auto start = runtime.GrabEdgeEventRethrow<TEvBlobStorage::TEvVPatchStart>({env.VDisks[vdiskIdx]});
auto &startRecord = start->Get()->Record;
Expand All @@ -376,21 +421,22 @@ void ConductVPatchStart(TTestBasicRuntime &runtime, const TDSProxyEnv &env, cons
for (auto partId : parts) {
foundParts->AddPart(partId);
}
CTEST << "ConductVPatchStart: Send FoundParts vdiskIdx# " << vdiskIdx << " idxInSubgroup# " << idxInSubgroup << "parts# " << ToString(parts) << "\n";
SendByHandle(runtime, start, std::move(foundParts));
CTEST << "ConductVPatchStart: Finish vdiskIdx# " << vdiskIdx << " idxInSubgroup# " << idxInSubgroup << "\n";
}

void ConductVPatchDiff(TTestBasicRuntime &runtime, const TDSProxyEnv &env, const TTestArgs &args,
EVPatchCase naiveCase, TVDiskPointer vdiskPointer)
EVPatchCase vpatchCase, TVDiskPointer vdiskPointer)
{
auto [vdiskIdx, idxInSubgroup] = vdiskPointer.GetIndecies(env, args.PatchedId.Hash());
TVDiskID vdisk = env.Info->GetVDiskInSubgroup(idxInSubgroup, args.PatchedId.Hash());
NKikimrProto::EReplyStatus resultStatus = GetVPatchResultStatus(env, args, naiveCase, vdiskPointer);
NKikimrProto::EReplyStatus resultStatus = GetVPatchResultStatus(env, args, vpatchCase, vdiskPointer);
if (resultStatus == NKikimrProto::UNKNOWN) {
CTEST << "ConductVPatchDiff: Skip vdiskIdx# " << vdiskIdx << " idxInSubgroup# " << idxInSubgroup << "\n";
CTEST << "ConductVPatchDiff: Skip vdiskIdx# " << vdiskIdx << " idxInSubgroup# " << idxInSubgroup << " VPatchCase: " << ToString(vpatchCase) << "\n";
return;
}
CTEST << "ConductVPatchDiff: Start vdiskIdx# " << vdiskIdx << " idxInSubgroup# " << idxInSubgroup << "\n";
CTEST << "ConductVPatchDiff: Start vdiskIdx# " << vdiskIdx << " idxInSubgroup# " << idxInSubgroup << " VPatchCase: " << ToString(vpatchCase) << "\n";

auto diffEv = runtime.GrabEdgeEventRethrow<TEvBlobStorage::TEvVPatchDiff>({env.VDisks[vdiskIdx]});
auto &diffRecord = diffEv->Get()->Record;
Expand All @@ -415,6 +461,7 @@ void ConductVPatchDiff(TTestBasicRuntime &runtime, const TDSProxyEnv &env, const
}

void ConductFailedVPatch(TTestBasicRuntime &runtime, const TDSProxyEnv &env, const TTestArgs &args) {
return; // disabled vpatch
CTEST << "ConductFailedVPatch: Start\n";
for (ui32 idxInSubgroup = 0; idxInSubgroup < args.GType.BlobSubgroupSize(); ++idxInSubgroup) {
TVDiskPointer vdisk = TVDiskPointer::GetVDiskIdx(idxInSubgroup);
Expand All @@ -429,7 +476,7 @@ void ConductFailedVPatch(TTestBasicRuntime &runtime, const TDSProxyEnv &env, con


void ConductVMovedPatch(TTestBasicRuntime &runtime, const TTestArgs &args, EMovedPatchCase movedCase) {
CTEST << "ConductVMovedPatch: Start\n";
CTEST << "ConductVMovedPatch: Start MovedPatchCase: " << ToString(movedCase) << Endl;
NKikimrProto::EReplyStatus resultStatus = GetVMovedPatchResultStatus(movedCase);
TAutoPtr<IEventHandle> handle;
TEvBlobStorage::TEvVMovedPatch *vPatch = runtime.GrabEdgeEventRethrow<TEvBlobStorage::TEvVMovedPatch>(handle);
Expand Down Expand Up @@ -459,7 +506,7 @@ void ConductVMovedPatch(TTestBasicRuntime &runtime, const TTestArgs &args, EMove
void ConductMovedPatch(TTestBasicRuntime &runtime, const TDSProxyEnv &env, const TTestArgs &args,
EMovedPatchCase movedCase)
{
CTEST << "ConductMovedPatch: Start\n";
CTEST << "ConductMovedPatch: Start MovedPatchCase: " << ToString(movedCase) << Endl;
ConductFailedVPatch(runtime, env, args);
ConductVMovedPatch(runtime, args, movedCase);
NKikimrProto::EReplyStatus resultStatus = GetPatchResultStatus(movedCase);
Expand All @@ -481,7 +528,8 @@ void ConductFallbackPatch(TTestBasicRuntime &runtime, const TTestArgs &args) {
void ConductVPatchEvents(TTestBasicRuntime &runtime, const TDSProxyEnv &env, const TTestArgs &args,
EVPatchCase vpatchCase)
{
CTEST << "ConductVPatchEvents: Start\n";
return; // disabled vpatch
CTEST << "ConductVPatchEvents: Start VPatchCase: " << ToString(vpatchCase) << Endl;
for (ui32 idxInSubgroup = 0; idxInSubgroup < args.GType.BlobSubgroupSize(); ++idxInSubgroup) {
TVDiskPointer vdisk = TVDiskPointer::GetVDiskIdx(idxInSubgroup);
ConductVPatchStart(runtime, env, args, vpatchCase, vdisk);
Expand All @@ -496,7 +544,7 @@ void ConductVPatchEvents(TTestBasicRuntime &runtime, const TDSProxyEnv &env, con
void ConductVPatch(TTestBasicRuntime &runtime, const TDSProxyEnv &env, const TTestArgs &args,
EVPatchCase vpatchCase)
{
CTEST << "ConductFallbackPatch: Start\n";
CTEST << "ConductFallbackPatch: Start VPatchCase: " << ToString(vpatchCase) << Endl;
ConductVPatchEvents(runtime, env, args, vpatchCase);
NKikimrProto::EReplyStatus resultStatus = GetPatchResultStatus(vpatchCase);
if (resultStatus == NKikimrProto::UNKNOWN) {
Expand Down Expand Up @@ -620,17 +668,18 @@ void RunGeneralTest(void(*runner)(TTestBasicRuntime &runtime, const TTestArgs &a
Y_UNIT_TEST_NAIVE(ErrorOnPut, erasure) \
Y_UNIT_TEST_MOVED(Ok, erasure) \
Y_UNIT_TEST_MOVED(Error, erasure) \
Y_UNIT_TEST_VPATCH(Ok, erasure) \
Y_UNIT_TEST_VPATCH(OneErrorAndAllPartExistInStart, erasure) \
Y_UNIT_TEST_VPATCH(OnePartLostInStart, erasure) \
Y_UNIT_TEST_VPATCH(DeadGroupInStart, erasure) \
Y_UNIT_TEST_VPATCH(ErrorDuringVPatchDiff, erasure) \
Y_UNIT_TEST_SECURED(Ok, erasure) \
Y_UNIT_TEST_SECURED(ErrorOnGetItem, erasure) \
Y_UNIT_TEST_SECURED(ErrorOnGet, erasure) \
Y_UNIT_TEST_SECURED(ErrorOnPut, erasure) \
// end Y_UNIT_TEST_PATCH_PACK

// Y_UNIT_TEST_VPATCH(Ok, erasure)
// Y_UNIT_TEST_VPATCH(OneErrorAndAllPartExistInStart, erasure)
// Y_UNIT_TEST_VPATCH(OnePartLostInStart, erasure)
// Y_UNIT_TEST_VPATCH(DeadGroupInStart, erasure)
// Y_UNIT_TEST_VPATCH(ErrorDuringVPatchDiff, erasure)

Y_UNIT_TEST_PATCH_PACK(ErasureNone)
Y_UNIT_TEST_PATCH_PACK(Erasure4Plus2Block)
Y_UNIT_TEST_PATCH_PACK(ErasureMirror3dc)
Expand Down Expand Up @@ -712,6 +761,7 @@ EFaultToleranceCase GetFaultToleranceCaseForBlock4Plus2(const TDSProxyEnv &env,
}
}
}
return EFaultToleranceCase::Fallback; // disabled vpatch
if (layout.CountEffectiveReplicas(env.Info->Type) == env.Info->Type.TotalPartCount()) {
return EFaultToleranceCase::Ok;
} else {
Expand All @@ -736,6 +786,7 @@ EFaultToleranceCase GetFaultToleranceCaseForMirror3dc(const TDSProxyEnv &env, co
for (ui32 dcIdx = 0; dcIdx < dcCnt; ++dcIdx) {
x2cnt += (replInDc[dcIdx] >= 2);
}
return EFaultToleranceCase::Fallback; // disabled vpatch
if ((replInDc[0] && replInDc[1] && replInDc[2]) || x2cnt >= 2) {
return EFaultToleranceCase::Ok;
} else {
Expand Down
78 changes: 77 additions & 1 deletion ydb/core/blobstorage/vdisk/common/vdisk_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -1570,7 +1570,7 @@ namespace NKikimr {
if (deadline != TInstant::Max()) {
this->Record.MutableMsgQoS()->SetDeadlineSeconds((ui32)deadline.Seconds());
}
this->Record.MutableMsgQoS()->SetExtQueueId(HandleClassToQueueId(NKikimrBlobStorage::AsyncBlob));
this->Record.MutableMsgQoS()->SetExtQueueId(NKikimrBlobStorage::PutAsyncBlob);
}

bool GetIgnoreBlock() const {
Expand Down Expand Up @@ -1950,6 +1950,25 @@ namespace NKikimr {
}
Record.MutableMsgQoS()->SetExtQueueId(NKikimrBlobStorage::EVDiskQueueId::GetFastRead);
}

TString ToString() const {
return ToString(this->Record);
}

static TString ToString(const NKikimrBlobStorage::TEvVPatchStart &record) {
TStringStream str;
TLogoBlobID originalId = LogoBlobIDFromLogoBlobID(record.GetOriginalBlobId());
TLogoBlobID patchedId = LogoBlobIDFromLogoBlobID(record.GetPatchedBlobId());
str << "{TEvVPatchStart";
str << " OriginalBlobId# " << originalId.ToString();
str << " PatchedBlobId# " << patchedId.ToString();
if (record.HasMsgQoS()) {
str << " ";
TEvBlobStorage::TEvVPut::OutMsgQos(record.GetMsgQoS(), str);
}
str << "}";
return str.Str();
}
};

struct TEvBlobStorage::TEvVPatchFoundParts
Expand Down Expand Up @@ -1995,6 +2014,25 @@ namespace NKikimr {
Record.SetStatus(status);
}

TString ToString() const {
return ToString(this->Record);
}

static TString ToString(const NKikimrBlobStorage::TEvVPatchFoundParts &record) {
TStringStream str;
TLogoBlobID originalId = LogoBlobIDFromLogoBlobID(record.GetOriginalBlobId());
TLogoBlobID patchedId = LogoBlobIDFromLogoBlobID(record.GetPatchedBlobId());
str << "{TEvVPatchFoundParts";
str << " OriginalBlobId# " << originalId.ToString();
str << " PatchedBlobId# " << patchedId.ToString();
if (record.HasMsgQoS()) {
str << " ";
TEvBlobStorage::TEvVPut::OutMsgQos(record.GetMsgQoS(), str);
}
str << "}";
return str.Str();
}

void MakeError(NKikimrProto::EReplyStatus status, const TString& errorReason,
const NKikimrBlobStorage::TEvVPatchStart &request) {
Record.SetErrorReason(errorReason);
Expand Down Expand Up @@ -2084,6 +2122,25 @@ namespace NKikimr {
}
return result;
}

TString ToString() const {
return ToString(this->Record);
}

static TString ToString(const NKikimrBlobStorage::TEvVPatchDiff &record) {
TStringStream str;
TLogoBlobID originalId = LogoBlobIDFromLogoBlobID(record.GetOriginalPartBlobId());
TLogoBlobID patchedId = LogoBlobIDFromLogoBlobID(record.GetPatchedPartBlobId());
str << "{TEvVPatchDiff";
str << " OriginalBlobId# " << originalId.ToString();
str << " PatchedBlobId# " << patchedId.ToString();
if (record.HasMsgQoS()) {
str << " ";
TEvBlobStorage::TEvVPut::OutMsgQos(record.GetMsgQoS(), str);
}
str << "}";
return str.Str();
}
};


Expand Down Expand Up @@ -2129,6 +2186,25 @@ namespace NKikimr {
}
return result;
}

TString ToString() const {
return ToString(this->Record);
}

static TString ToString(const NKikimrBlobStorage::TEvVPatchXorDiff &record) {
TStringStream str;
TLogoBlobID originalId = LogoBlobIDFromLogoBlobID(record.GetOriginalPartBlobId());
TLogoBlobID patchedId = LogoBlobIDFromLogoBlobID(record.GetPatchedPartBlobId());
str << "{TEvVPatchXorDiff";
str << " OriginalBlobId# " << originalId.ToString();
str << " PatchedBlobId# " << patchedId.ToString();
if (record.HasMsgQoS()) {
str << " ";
TEvBlobStorage::TEvVPut::OutMsgQos(record.GetMsgQoS(), str);
}
str << "}";
return str.Str();
}
};

struct TEvBlobStorage::TEvVPatchXorDiffResult
Expand Down
Loading
Loading