Skip to content

Merge some fixes from main #1672

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ydb/core/blobstorage/backpressure/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ class TBlobStorageQueue {
return Queues.InFlight.size();
}

ui64 GetInFlightCost() const {
return InFlightCost;
}

void UpdateCostModel(TInstant now, const NKikimrBlobStorage::TVDiskCostSettings& settings,
const TBlobStorageGroupType& type);
void InvalidateCosts();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,11 @@ class TVDiskBackpressureClientActor : public TActorBootstrapped<TVDiskBackpressu
<< " msgId# " << msgId << " sequenceId# " << sequenceId
<< " expectedMsgId# " << expectedMsgId << " expectedSequenceId# " << expectedSequenceId
<< " status# " << NKikimrProto::EReplyStatus_Name(status)
<< " ws# " << NKikimrBlobStorage::TWindowFeedback_EStatus_Name(ws));
<< " ws# " << NKikimrBlobStorage::TWindowFeedback_EStatus_Name(ws)
<< " InFlightCost# " << Queue.GetInFlightCost()
<< " InFlightCount# " << Queue.InFlightCount()
<< " ItemsWaiting# " << Queue.GetItemsWaiting()
<< " BytesWaiting# " << Queue.GetBytesWaiting());

switch (ws) {
case NKikimrBlobStorage::TWindowFeedback::IncorrectMsgId:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,7 @@ namespace NKikimr {
}
}

TWindowStatus *Processed(bool checkMsgId, const TMessageId &msgId, ui64 cost, TWindowStatus *opStatus) {
Y_UNUSED(checkMsgId);
Y_UNUSED(msgId);
TWindowStatus *Processed(bool /*checkMsgId*/, const TMessageId& /*msgId*/, ui64 cost, TWindowStatus *opStatus) {
Y_ABORT_UNLESS(Cost >= cost);
Cost -= cost;
--InFlight;
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,13 @@ void TGetImpl::PrepareRequests(TLogContext &logCtx, TDeque<std::unique_ptr<TEvBl
msg->SetId(ReaderTabletData->Id);
msg->SetGeneration(ReaderTabletData->Generation);
}
R_LOG_DEBUG_SX(logCtx, "BPG14", "Send get to orderNumber# " << get.OrderNumber
<< " vget# " << vget->ToString());
}

for (auto& vget : gets) {
if (vget) {
R_LOG_DEBUG_SX(logCtx, "BPG14", "Send get to orderNumber# "
<< Info->GetTopology().GetOrderNumber(VDiskIDFromVDiskID(vget->Record.GetVDiskID()))
<< " vget# " << vget->ToString());
outVGets.push_back(std::move(vget));
++RequestIndex;
}
Expand Down
26 changes: 17 additions & 9 deletions ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
TStackVec<bool, TypicalDisksInSubring> ReceivedResponseFlags;
TStackVec<bool, TypicalDisksInSubring> EmptyResponseFlags;
TStackVec<bool, TypicalDisksInSubring> ErrorResponseFlags;
TStackVec<bool, TypicalDisksInSubring> ForceStopFlags;
TBlobStorageGroupInfo::TVDiskIds VDisks;

bool UseVPatch = false;
Expand Down Expand Up @@ -332,8 +333,15 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
}

void Handle(TEvBlobStorage::TEvVPatchResult::TPtr &ev) {
ReceivedResults++;
NKikimrBlobStorage::TEvVPatchResult &record = ev->Get()->Record;

Y_ABORT_UNLESS(record.HasCookie());
ui8 subgroupIdx = record.GetCookie();
if (ForceStopFlags[subgroupIdx]) {
return; // ignore force stop response
}
ReceivedResults++;

PullOutStatusFlagsAndFressSpace(record);
Y_ABORT_UNLESS(record.HasStatus());
NKikimrProto::EReplyStatus status = record.GetStatus();
Expand All @@ -342,9 +350,6 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
errorReason = record.GetErrorReason();
}

Y_ABORT_UNLESS(record.HasCookie());
ui8 subgroupIdx = record.GetCookie();

PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA23, "Received VPatchResult",
(Status, status),
(SubgroupIdx, (ui32)subgroupIdx),
Expand Down Expand Up @@ -413,15 +418,16 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
void SendStopDiffs() {
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA18, "Send stop diffs");
TDeque<std::unique_ptr<TEvBlobStorage::TEvVPatchDiff>> events;
for (ui32 vdiskIdx = 0; vdiskIdx < VDisks.size(); ++vdiskIdx) {
if (!ErrorResponseFlags[vdiskIdx] && !EmptyResponseFlags[vdiskIdx] && ReceivedResponseFlags[vdiskIdx]) {
for (ui32 subgroupIdx = 0; subgroupIdx < VDisks.size(); ++subgroupIdx) {
if (!ErrorResponseFlags[subgroupIdx] && !EmptyResponseFlags[subgroupIdx] && ReceivedResponseFlags[subgroupIdx]) {
std::unique_ptr<TEvBlobStorage::TEvVPatchDiff> ev = std::make_unique<TEvBlobStorage::TEvVPatchDiff>(
OriginalId, PatchedId, VDisks[vdiskIdx], 0, Deadline, vdiskIdx);
OriginalId, PatchedId, VDisks[subgroupIdx], 0, Deadline, subgroupIdx);
ev->SetForceEnd();
ForceStopFlags[subgroupIdx] = true;
events.emplace_back(std::move(ev));
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA19, "Send stop message",
(VDiskIdxInSubgroup, vdiskIdx),
(VDiskId, VDisks[vdiskIdx]));
(VDiskIdxInSubgroup, subgroupIdx),
(VDiskId, VDisks[subgroupIdx]));
}
}
SendToQueues(events, false);
Expand Down Expand Up @@ -495,6 +501,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA20, "Send TEvVPatchDiff",
(VDiskIdxInSubgroup, idxInSubgroup),
(PatchedVDiskIdxInSubgroup, patchedIdxInSubgroup),
(PartId, (ui64)partPlacement.PartId),
(DiffsForPart, diffsForPart.size()),
(ParityPlacements, parityPlacements.size()),
(WaitedXorDiffs, waitedXorDiffs));
Expand Down Expand Up @@ -586,6 +593,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
ReceivedResponseFlags.assign(VDisks.size(), false);
ErrorResponseFlags.assign(VDisks.size(), false);
EmptyResponseFlags.assign(VDisks.size(), false);
ForceStopFlags.assign(VDisks.size(), false);

TDeque<std::unique_ptr<TEvBlobStorage::TEvVPatchStart>> events;

Expand Down
9 changes: 9 additions & 0 deletions ydb/core/blobstorage/pdisk/mock/pdisk_mock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@ struct TPDiskMockState::TImpl {
}
}

bool HasCorruptedArea(ui32 chunkIdx, ui32 begin, ui32 end) {
const ui64 chunkBegin = ui64(chunkIdx) * ChunkSize;
return static_cast<bool>(Corrupted & TIntervalSet{chunkBegin + begin, chunkBegin + end});
}

std::set<ui32> GetChunks() {
std::set<ui32> res;
for (auto& [ownerId, owner] : Owners) {
Expand Down Expand Up @@ -290,6 +295,10 @@ void TPDiskMockState::SetCorruptedArea(ui32 chunkIdx, ui32 begin, ui32 end, bool
Impl->SetCorruptedArea(chunkIdx, begin, end, enabled);
}

bool TPDiskMockState::HasCorruptedArea(ui32 chunkIdx, ui32 begin, ui32 end) {
return Impl->HasCorruptedArea(chunkIdx, begin, end);
}

std::set<ui32> TPDiskMockState::GetChunks() {
return Impl->GetChunks();
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/pdisk/mock/pdisk_mock.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ namespace NKikimr {
~TPDiskMockState();

void SetCorruptedArea(ui32 chunkIdx, ui32 begin, ui32 end, bool enabled);
bool HasCorruptedArea(ui32 chunkIdx, ui32 begin, ui32 end);
std::set<ui32> GetChunks();
TMaybe<NPDisk::TOwnerRound> GetOwnerRound(const TVDiskID& vDiskId) const;
ui32 GetChunkSize() const;
Expand Down
152 changes: 109 additions & 43 deletions ydb/core/blobstorage/ut_blobstorage/scrub_fast.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,64 +16,130 @@ void Test() {

TString data = TString::Uninitialized(8_MB);
memset(data.Detach(), 'X', data.size());
TLogoBlobID id(1, 1, 1, 0, data.size(), 0);

{ // write data to group
TActorId sender = runtime->AllocateEdgeActor(1);
runtime->WrapInActorContext(sender, [&] {
SendToBSProxy(sender, info->GroupID, new TEvBlobStorage::TEvPut(id, data, TInstant::Max()));
});
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(sender);
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
}

auto checkReadable = [&](NKikimrProto::EReplyStatus status) {
TActorId sender = runtime->AllocateEdgeActor(1);
runtime->WrapInActorContext(sender, [&] {
SendToBSProxy(sender, info->GroupID, new TEvBlobStorage::TEvGet(id, 0, 0, TInstant::Max(),
NKikimrBlobStorage::EGetHandleClass::FastRead));
});
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvGetResult>(sender);
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
UNIT_ASSERT_VALUES_EQUAL(res->Get()->ResponseSz, 1);
auto& r = res->Get()->Responses[0];
UNIT_ASSERT_VALUES_EQUAL(r.Status, status);
if (status == NKikimrProto::OK) {
UNIT_ASSERT_VALUES_EQUAL(r.Buffer.ConvertToString(), data);
for (ui32 step = 1; step < 100; ++step) {
TLogoBlobID id(1, 1, step, 0, data.size(), 0);

{ // write data to group
TActorId sender = runtime->AllocateEdgeActor(1);
runtime->WrapInActorContext(sender, [&] {
SendToBSProxy(sender, info->GroupID, new TEvBlobStorage::TEvPut(id, data, TInstant::Max()));
});
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(sender);
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
}
};

checkReadable(NKikimrProto::OK);
auto checkReadable = [&] {
TActorId sender = runtime->AllocateEdgeActor(1);
runtime->WrapInActorContext(sender, [&] {
SendToBSProxy(sender, info->GroupID, new TEvBlobStorage::TEvGet(id, 0, 0, TInstant::Max(),
NKikimrBlobStorage::EGetHandleClass::FastRead));
});
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvGetResult>(sender);
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
UNIT_ASSERT_VALUES_EQUAL(res->Get()->ResponseSz, 1);
auto& r = res->Get()->Responses[0];
UNIT_ASSERT_VALUES_EQUAL(r.Status, NKikimrProto::OK);
UNIT_ASSERT_VALUES_EQUAL(r.Buffer.ConvertToString(), data);

ui32 partsMask = 0;
for (ui32 i = 0; i < info->GetTotalVDisksNum(); ++i) {
const TVDiskID& vdiskId = info->GetVDiskId(i);
env.WithQueueId(vdiskId, NKikimrBlobStorage::EVDiskQueueId::GetFastRead, [&](TActorId queueId) {
const TActorId sender = runtime->AllocateEdgeActor(1);
auto ev = TEvBlobStorage::TEvVGet::CreateExtremeDataQuery(vdiskId, TInstant::Max(),
NKikimrBlobStorage::EGetHandleClass::FastRead);
ev->AddExtremeQuery(id, 0, 0);
runtime->Send(new IEventHandle(queueId, sender, ev.release()), sender.NodeId());
auto reply = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvVGetResult>(sender);
auto& record = reply->Get()->Record;
UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrProto::OK);
UNIT_ASSERT_VALUES_EQUAL(record.ResultSize(), 1);
for (const auto& result : record.GetResult()) {
if (result.GetStatus() == NKikimrProto::OK) {
const TLogoBlobID& id = LogoBlobIDFromLogoBlobID(result.GetBlobID());
UNIT_ASSERT(id.PartId());
const ui32 partIdx = id.PartId() - 1;
const ui32 mask = 1 << partIdx;
UNIT_ASSERT(!(partsMask & mask));
partsMask |= mask;
} else {
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NKikimrProto::NODATA);
}
}
});
}
UNIT_ASSERT_VALUES_EQUAL(partsMask, (1 << info->Type.TotalPartCount()) - 1);
};

for (ui32 i = 0; i < info->GetTotalVDisksNum(); ++i) {
const TActorId vdiskActorId = info->GetActorId(i);
checkReadable();

ui32 nodeId, pdiskId;
std::tie(nodeId, pdiskId, std::ignore) = DecomposeVDiskServiceId(vdiskActorId);
auto it = env.PDiskMockStates.find(std::make_pair(nodeId, pdiskId));
Y_ABORT_UNLESS(it != env.PDiskMockStates.end());
ui32 mask = 0;

const TActorId sender = runtime->AllocateEdgeActor(vdiskActorId.NodeId());
env.Runtime->Send(new IEventHandle(vdiskActorId, sender, new TEvBlobStorage::TEvCaptureVDiskLayout), sender.NodeId());
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvCaptureVDiskLayoutResult>(sender);
for (ui32 i = 0; i < info->GetTotalVDisksNum(); ++i) {
const TActorId vdiskActorId = info->GetActorId(i);

for (auto& item : res->Get()->Layout) {
using T = TEvBlobStorage::TEvCaptureVDiskLayoutResult;
if (item.Database == T::EDatabase::LogoBlobs && item.RecordType == T::ERecordType::HugeBlob) {
const TDiskPart& part = item.Location;
it->second->SetCorruptedArea(part.ChunkIdx, part.Offset, part.Offset + part.Size, true);
break;
ui32 nodeId, pdiskId;
std::tie(nodeId, pdiskId, std::ignore) = DecomposeVDiskServiceId(vdiskActorId);
auto it = env.PDiskMockStates.find(std::make_pair(nodeId, pdiskId));
Y_ABORT_UNLESS(it != env.PDiskMockStates.end());

const TActorId sender = runtime->AllocateEdgeActor(vdiskActorId.NodeId());
env.Runtime->Send(new IEventHandle(vdiskActorId, sender, new TEvBlobStorage::TEvCaptureVDiskLayout), sender.NodeId());
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvCaptureVDiskLayoutResult>(sender);

for (auto& item : res->Get()->Layout) {
using T = TEvBlobStorage::TEvCaptureVDiskLayoutResult;
if (item.Database == T::EDatabase::LogoBlobs && item.RecordType == T::ERecordType::HugeBlob && item.BlobId.FullID() == id) {
const TDiskPart& part = item.Location;
mask |= 1 << i;
it->second->SetCorruptedArea(part.ChunkIdx, part.Offset, part.Offset + 1 + RandomNumber(part.Size), true);
break;
}
}

checkReadable();
}

checkReadable(NKikimrProto::OK);
}
env.Sim(TDuration::Seconds(60));

env.Sim(TDuration::Seconds(60));
for (ui32 i = 0; i < info->GetTotalVDisksNum(); ++i) {
if (~mask >> i & 1) {
continue;
}

const TActorId vdiskActorId = info->GetActorId(i);

ui32 nodeId, pdiskId;
std::tie(nodeId, pdiskId, std::ignore) = DecomposeVDiskServiceId(vdiskActorId);
auto it = env.PDiskMockStates.find(std::make_pair(nodeId, pdiskId));
Y_ABORT_UNLESS(it != env.PDiskMockStates.end());

const TActorId sender = runtime->AllocateEdgeActor(vdiskActorId.NodeId());
env.Runtime->Send(new IEventHandle(vdiskActorId, sender, new TEvBlobStorage::TEvCaptureVDiskLayout), sender.NodeId());
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvCaptureVDiskLayoutResult>(sender);

bool anyPartReadable = false;

for (auto& item : res->Get()->Layout) {
using T = TEvBlobStorage::TEvCaptureVDiskLayoutResult;
if (item.Database == T::EDatabase::LogoBlobs && item.RecordType == T::ERecordType::HugeBlob && item.BlobId.FullID() == id) {
const TDiskPart& part = item.Location;
anyPartReadable = !it->second->HasCorruptedArea(part.ChunkIdx, part.Offset, part.Offset + part.Size);
if (anyPartReadable) {
break;
}
}
}

UNIT_ASSERT(anyPartReadable);
}
}
}

Y_UNIT_TEST_SUITE(ScrubFast) {
Y_UNIT_TEST(SingleBlob) {
Test();
}
}

13 changes: 11 additions & 2 deletions ydb/core/blobstorage/vdisk/common/vdisk_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -366,10 +366,12 @@ namespace NKikimr {
: public TEventLocal<TEvVDiskRequestCompleted, TEvBlobStorage::EvVDiskRequestCompleted> {
TVMsgContext Ctx;
std::unique_ptr<IEventHandle> Event;
bool DoNotResend;

TEvVDiskRequestCompleted(const TVMsgContext &ctx, std::unique_ptr<IEventHandle> event)
TEvVDiskRequestCompleted(const TVMsgContext &ctx, std::unique_ptr<IEventHandle> event, bool doNotResend = false)
: Ctx(ctx)
, Event(std::move(event))
, DoNotResend(doNotResend)
{
Y_DEBUG_ABORT_UNLESS(Ctx.ExtQueueId != NKikimrBlobStorage::EVDiskQueueId::Unknown);
Y_DEBUG_ABORT_UNLESS(Ctx.IntQueueId != NKikimrBlobStorage::EVDiskInternalQueueId::IntUnknown);
Expand Down Expand Up @@ -468,6 +470,9 @@ namespace NKikimr {
TActorIDPtr SkeletonFrontIDPtr;
THPTimer ExecutionTimer;

protected:
bool DoNotResendFromSkeletonFront = false;

public:
TEvVResultBaseWithQoSPB() = default;

Expand Down Expand Up @@ -526,7 +531,7 @@ namespace NKikimr {
byteSize, this->ToString().data());

if (SkeletonFrontIDPtr && MsgCtx.IntQueueId != NKikimrBlobStorage::IntUnknown) {
ctx.Send(*SkeletonFrontIDPtr, new TEvVDiskRequestCompleted(MsgCtx, std::move(ev)));
ctx.Send(*SkeletonFrontIDPtr, new TEvVDiskRequestCompleted(MsgCtx, std::move(ev), DoNotResendFromSkeletonFront));
} else {
TActivationContext::Send(ev.release());
}
Expand Down Expand Up @@ -2182,6 +2187,10 @@ namespace NKikimr {
Record.SetApproximateFreeSpaceShare(approximateFreeSpaceShare);
}

void SetForceEndResponse() {
DoNotResendFromSkeletonFront = true;
}

void MakeError(NKikimrProto::EReplyStatus status, const TString &errorReason,
const NKikimrBlobStorage::TEvVPatchDiff &request)
{
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/blobstorage/vdisk/scrub/blob_recovery_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,15 @@ namespace NKikimr {

// a map to fill upon receiving VGet result
struct TPerBlobInfo {
const TInstant Deadline;
std::weak_ptr<TInFlightContext> Context;
TEvRecoverBlobResult::TItem *Item; // item to update
ui32 BlobReplyCounter = 0; // number of unreplied queries for this blob
};
std::unordered_multimap<TLogoBlobID, TPerBlobInfo, THash<TLogoBlobID>> VGetResultMap;
std::set<std::tuple<TVDiskIdShort, TLogoBlobID>> GetsInFlight;

void AddBlobQuery(const TLogoBlobID& id, NMatrix::TVectorType needed, const std::shared_ptr<TInFlightContext>& context, TEvRecoverBlobResult::TItem *item);
void AddExtremeQuery(const TVDiskID& vdiskId, const TLogoBlobID& id, TInstant deadline, ui32 worstReplySize);
void AddExtremeQuery(const TVDiskID& vdiskId, const TLogoBlobID& id, TInstant deadline, ui32 idxInSubgroup);
void SendPendingQueries();
void Handle(TEvBlobStorage::TEvVGetResult::TPtr ev);
NKikimrProto::EReplyStatus ProcessItemData(TEvRecoverBlobResult::TItem& item);
Expand Down
Loading