Skip to content

Commit bba277c

Browse files
authored
Fix put impl class (#2829) (#2853)
1 parent 9edd757 commit bba277c

File tree

7 files changed

+108
-188
lines changed

7 files changed

+108
-188
lines changed

ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp

+23-48
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,6 @@ void TBlobState::AddPartToPut(ui32 partIdx, TRope&& partData) {
4848
IsChanged = true;
4949
}
5050

51-
void TBlobState::MarkBlobReadyToPut(ui8 blobIdx) {
52-
Y_ABORT_UNLESS(WholeSituation == ESituation::Unknown || WholeSituation == ESituation::Present);
53-
BlobIdx = blobIdx;
54-
IsChanged = true;
55-
}
56-
5751
bool TBlobState::Restore(const TBlobStorageGroupInfo &info) {
5852
const TIntervalVec<i32> fullBlobInterval(0, Id.BlobSize());
5953
const TIntervalSet<i32> here = Whole.Here();
@@ -227,7 +221,7 @@ TString TBlobState::ToString() const {
227221
for (ui32 i = 0; i < Disks.size(); ++i) {
228222
str << Endl << " Disks[" << i << "]# " << Disks[i].ToString() << Endl;
229223
}
230-
str << " BlobIdx# " << (ui32)BlobIdx << Endl;
224+
str << " BlobIdx# " << BlobIdx << Endl;
231225
str << "}";
232226
return str.Str();
233227
}
@@ -304,7 +298,7 @@ void TGroupDiskRequests::AddGet(ui32 diskOrderNumber, const TLogoBlobID &id, ui3
304298
}
305299

306300
void TGroupDiskRequests::AddPut(ui32 diskOrderNumber, const TLogoBlobID &id, TRope buffer,
307-
TDiskPutRequest::EPutReason putReason, bool isHandoff, ui8 blobIdx) {
301+
TDiskPutRequest::EPutReason putReason, bool isHandoff, size_t blobIdx) {
308302
PutsPending.emplace_back(diskOrderNumber, id, buffer, putReason, isHandoff, blobIdx);
309303
}
310304

@@ -340,20 +334,6 @@ void TBlackboard::AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TRope&& part
340334
(*this)[id].AddPartToPut(partIdx, std::move(partData));
341335
}
342336

343-
void TBlackboard::MarkBlobReadyToPut(const TLogoBlobID &id, ui8 blobIdx) {
344-
Y_ABORT_UNLESS(bool(id));
345-
Y_ABORT_UNLESS(id.PartId() == 0);
346-
Y_ABORT_UNLESS(id.BlobSize() != 0);
347-
(*this)[id].MarkBlobReadyToPut(blobIdx);
348-
}
349-
350-
void TBlackboard::MoveBlobStateToDone(const TLogoBlobID &id) {
351-
Y_ABORT_UNLESS(bool(id));
352-
Y_ABORT_UNLESS(id.PartId() == 0);
353-
Y_ABORT_UNLESS(id.BlobSize() != 0);
354-
DoneBlobStates.insert(BlobStates.extract(id));
355-
}
356-
357337
void TBlackboard::AddPutOkResponse(const TLogoBlobID &id, ui32 orderNumber) {
358338
Y_ABORT_UNLESS(bool(id));
359339
Y_ABORT_UNLESS(id.PartId() != 0);
@@ -390,8 +370,7 @@ void TBlackboard::AddErrorResponse(const TLogoBlobID &id, ui32 orderNumber) {
390370
}
391371

392372
EStrategyOutcome TBlackboard::RunStrategies(TLogContext &logCtx, const TStackVec<IStrategy*, 1>& s,
393-
TBatchedVec<TBlobStates::value_type*> *finished, const TBlobStorageGroupInfo::TGroupVDisks *expired) {
394-
TString errorReason;
373+
TBatchedVec<TFinishedBlob> *finished, const TBlobStorageGroupInfo::TGroupVDisks *expired) {
395374
for (auto it = BlobStates.begin(); it != BlobStates.end(); ) {
396375
auto& blob = it->second;
397376
if (!std::exchange(blob.IsChanged, false)) {
@@ -401,23 +380,19 @@ EStrategyOutcome TBlackboard::RunStrategies(TLogContext &logCtx, const TStackVec
401380

402381
// recalculate blob outcome if it is not yet determined
403382
NKikimrProto::EReplyStatus status = NKikimrProto::OK;
383+
TString errorReason;
404384
for (IStrategy *strategy : s) {
405385
switch (auto res = strategy->Process(logCtx, blob, *Info, *this, GroupDiskRequests)) {
406386
case EStrategyOutcome::IN_PROGRESS:
407387
status = NKikimrProto::UNKNOWN;
408388
break;
409389

410390
case EStrategyOutcome::ERROR:
411-
if (IsAllRequestsTogether) {
391+
if (!finished) {
412392
return res;
413393
}
414-
if (errorReason) {
415-
errorReason += " && ";
416-
errorReason += res.ErrorReason;
417-
} else {
418-
errorReason = res.ErrorReason;
419-
}
420394
status = NKikimrProto::ERROR;
395+
errorReason = std::move(res.ErrorReason);
421396
break;
422397

423398
case EStrategyOutcome::DONE:
@@ -431,26 +406,25 @@ EStrategyOutcome TBlackboard::RunStrategies(TLogContext &logCtx, const TStackVec
431406
status = NKikimrProto::UNKNOWN;
432407
}
433408
if (status != NKikimrProto::UNKNOWN) {
409+
if (finished) { // we are operating on independent blobs
410+
finished->push_back(TFinishedBlob{
411+
blob.BlobIdx,
412+
status,
413+
std::move(errorReason),
414+
});
415+
}
434416
const auto [doneIt, inserted, node] = DoneBlobStates.insert(BlobStates.extract(it++));
435417
Y_ABORT_UNLESS(inserted);
436-
if (!IsAllRequestsTogether) {
437-
blob.Status = status;
438-
if (finished) {
439-
finished->push_back(&*doneIt);
440-
}
441-
}
442418
} else {
443419
++it;
444420
}
445421
}
446422

447-
EStrategyOutcome outcome(BlobStates.empty() ? EStrategyOutcome::DONE : EStrategyOutcome::IN_PROGRESS);
448-
outcome.ErrorReason = std::move(errorReason);
449-
return outcome;
423+
return BlobStates.empty() ? EStrategyOutcome::DONE : EStrategyOutcome::IN_PROGRESS;
450424
}
451425

452426
EStrategyOutcome TBlackboard::RunStrategy(TLogContext &logCtx, const IStrategy& s,
453-
TBatchedVec<TBlobStates::value_type*> *finished, const TBlobStorageGroupInfo::TGroupVDisks *expired) {
427+
TBatchedVec<TFinishedBlob> *finished, const TBlobStorageGroupInfo::TGroupVDisks *expired) {
454428
return RunStrategies(logCtx, {const_cast<IStrategy*>(&s)}, finished, expired);
455429
}
456430

@@ -464,8 +438,7 @@ TBlobState& TBlackboard::GetState(const TLogoBlobID &id) {
464438
<< " blobId# " << fullId
465439
<< " BlackBoard# " << ToString());
466440
}
467-
TBlobState &state = it->second;
468-
return state;
441+
return it->second;
469442
}
470443

471444
ssize_t TBlackboard::AddPartMap(const TLogoBlobID &id, ui32 diskOrderNumber, ui32 requestIndex) {
@@ -512,8 +485,12 @@ void TBlackboard::GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, T
512485
}
513486
}
514487

515-
void TBlackboard::RegisterBlobForPut(const TLogoBlobID& id) {
516-
(*this)[id];
488+
void TBlackboard::RegisterBlobForPut(const TLogoBlobID& id, size_t blobIdx) {
489+
const auto [it, inserted] = BlobStates.try_emplace(id);
490+
Y_ABORT_UNLESS(inserted);
491+
TBlobState& state = it->second;
492+
state.Init(id, *Info);
493+
state.BlobIdx = blobIdx;
517494
}
518495

519496
TBlobState& TBlackboard::operator [](const TLogoBlobID& id) {
@@ -559,9 +536,7 @@ void TBlackboard::InvalidatePartStates(ui32 orderNumber) {
559536
const TVDiskID vdiskId = Info->GetVDiskId(orderNumber);
560537
for (auto& [id, state] : BlobStates) {
561538
if (const ui32 diskIdx = Info->GetIdxInSubgroup(vdiskId, id.Hash()); diskIdx != Info->Type.BlobSubgroupSize()) {
562-
TBlobState::TDisk& disk = state.Disks[diskIdx];
563-
for (ui32 partIdx = 0; partIdx < disk.DiskParts.size(); ++partIdx) {
564-
TBlobState::TDiskPart& part = disk.DiskParts[partIdx];
539+
for (TBlobState::TDiskPart& part : state.Disks[diskIdx].DiskParts) {
565540
if (part.Situation == TBlobState::ESituation::Present) {
566541
part.Situation = TBlobState::ESituation::Unknown;
567542
if (state.WholeSituation == TBlobState::ESituation::Present) {

ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h

+15-15
Original file line numberDiff line numberDiff line change
@@ -81,14 +81,12 @@ struct TBlobState {
8181
TStackVec<TState, TypicalPartsInBlob> Parts;
8282
TStackVec<TDisk, TypicalDisksInSubring> Disks;
8383
TVector<TEvBlobStorage::TEvGetResult::TPartMapItem> PartMap;
84-
NKikimrProto::EReplyStatus Status = NKikimrProto::UNKNOWN;
85-
ui8 BlobIdx;
84+
size_t BlobIdx;
8685
bool IsChanged = false;
8786

8887
void Init(const TLogoBlobID &id, const TBlobStorageGroupInfo &Info);
8988
void AddNeeded(ui64 begin, ui64 size);
9089
void AddPartToPut(ui32 partIdx, TRope&& partData);
91-
void MarkBlobReadyToPut(ui8 blobIdx = 0);
9290
bool Restore(const TBlobStorageGroupInfo &info);
9391
void AddResponseData(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring,
9492
ui32 shift, TRope&& data);
@@ -133,9 +131,9 @@ struct TDiskPutRequest {
133131
TRope Buffer;
134132
EPutReason Reason;
135133
bool IsHandoff;
136-
ui8 BlobIdx;
134+
size_t BlobIdx;
137135

138-
TDiskPutRequest(ui32 orderNumber, const TLogoBlobID &id, TRope buffer, EPutReason reason, bool isHandoff, ui8 blobIdx)
136+
TDiskPutRequest(ui32 orderNumber, const TLogoBlobID &id, TRope buffer, EPutReason reason, bool isHandoff, size_t blobIdx)
139137
: OrderNumber(orderNumber)
140138
, Id(id)
141139
, Buffer(std::move(buffer))
@@ -152,7 +150,7 @@ struct TGroupDiskRequests {
152150
void AddGet(ui32 diskOrderNumber, const TLogoBlobID &id, const TIntervalSet<i32> &intervalSet);
153151
void AddGet(ui32 diskOrderNumber, const TLogoBlobID &id, ui32 shift, ui32 size);
154152
void AddPut(ui32 diskOrderNumber, const TLogoBlobID &id, TRope buffer,
155-
TDiskPutRequest::EPutReason putReason, bool isHandoff, ui8 blobIdx);
153+
TDiskPutRequest::EPutReason putReason, bool isHandoff, size_t blobIdx);
156154
};
157155

158156
struct TBlackboard;
@@ -170,6 +168,12 @@ struct TBlackboard {
170168
AccelerationModeSkipMarked
171169
};
172170

171+
struct TFinishedBlob {
172+
size_t BlobIdx;
173+
NKikimrProto::EReplyStatus Status;
174+
TString ErrorReason;
175+
};
176+
173177
using TBlobStates = TMap<TLogoBlobID, TBlobState>;
174178
TBlobStates BlobStates;
175179
TBlobStates DoneBlobStates;
@@ -179,31 +183,27 @@ struct TBlackboard {
179183
EAccelerationMode AccelerationMode;
180184
const NKikimrBlobStorage::EPutHandleClass PutHandleClass;
181185
const NKikimrBlobStorage::EGetHandleClass GetHandleClass;
182-
const bool IsAllRequestsTogether;
183186

184187
TBlackboard(const TIntrusivePtr<TBlobStorageGroupInfo> &info, const TIntrusivePtr<TGroupQueues> &groupQueues,
185-
NKikimrBlobStorage::EPutHandleClass putHandleClass, NKikimrBlobStorage::EGetHandleClass getHandleClass,
186-
bool isAllRequestsTogether = true)
188+
NKikimrBlobStorage::EPutHandleClass putHandleClass, NKikimrBlobStorage::EGetHandleClass getHandleClass)
187189
: Info(info)
188190
, GroupQueues(groupQueues)
189191
, AccelerationMode(AccelerationModeSkipOneSlowest)
190192
, PutHandleClass(putHandleClass)
191193
, GetHandleClass(getHandleClass)
192-
, IsAllRequestsTogether(isAllRequestsTogether)
193194
{}
194195

195196
void AddNeeded(const TLogoBlobID &id, ui32 inShift, ui32 inSize);
196197
void AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TRope&& partData);
197-
void MarkBlobReadyToPut(const TLogoBlobID &id, ui8 blobIdx = 0);
198-
void MoveBlobStateToDone(const TLogoBlobID &id);
199198
void AddResponseData(const TLogoBlobID &id, ui32 orderNumber, ui32 shift, TRope&& data);
200199
void AddPutOkResponse(const TLogoBlobID &id, ui32 orderNumber);
201200
void AddNoDataResponse(const TLogoBlobID &id, ui32 orderNumber);
202201
void AddErrorResponse(const TLogoBlobID &id, ui32 orderNumber);
203202
void AddNotYetResponse(const TLogoBlobID &id, ui32 orderNumber);
203+
204204
EStrategyOutcome RunStrategies(TLogContext& logCtx, const TStackVec<IStrategy*, 1>& strategies,
205-
TBatchedVec<TBlobStates::value_type*> *finished = nullptr, const TBlobStorageGroupInfo::TGroupVDisks *expired = nullptr);
206-
EStrategyOutcome RunStrategy(TLogContext &logCtx, const IStrategy& s, TBatchedVec<TBlobStates::value_type*> *finished = nullptr,
205+
TBatchedVec<TFinishedBlob> *finished = nullptr, const TBlobStorageGroupInfo::TGroupVDisks *expired = nullptr);
206+
EStrategyOutcome RunStrategy(TLogContext &logCtx, const IStrategy& s, TBatchedVec<TFinishedBlob> *finished = nullptr,
207207
const TBlobStorageGroupInfo::TGroupVDisks *expired = nullptr);
208208
TBlobState& GetState(const TLogoBlobID &id);
209209
ssize_t AddPartMap(const TLogoBlobID &id, ui32 diskOrderNumber, ui32 requestIndex);
@@ -221,7 +221,7 @@ struct TBlackboard {
221221

222222
void InvalidatePartStates(ui32 orderNumber);
223223

224-
void RegisterBlobForPut(const TLogoBlobID& id);
224+
void RegisterBlobForPut(const TLogoBlobID& id, size_t blobIdx);
225225

226226
TBlobState& operator [](const TLogoBlobID& id);
227227
};

ydb/core/blobstorage/dsproxy/dsproxy_put.cpp

+15-20
Original file line numberDiff line numberDiff line change
@@ -103,11 +103,11 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
103103
SanityCheck(); // May Die
104104
}
105105

106-
bool Action() {
106+
bool Action(bool accelerate = false) {
107107
UpdateExpiredVDiskSet();
108108

109109
TPutImpl::TPutResultVec putResults;
110-
PutImpl.Step(LogCtx, putResults, ExpiredVDiskSet);
110+
PutImpl.Step(LogCtx, putResults, ExpiredVDiskSet, accelerate);
111111
if (ReplyAndDieWithLastResponse(putResults)) {
112112
return true;
113113
}
@@ -133,9 +133,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
133133
return;
134134
}
135135
IsAccelerated = true;
136-
137-
PutImpl.Accelerate(LogCtx);
138-
Action();
136+
Action(true);
139137
// *(IsMultiPutMode ? Mon->NodeMon->AccelerateEvVMultiPutCount : Mon->NodeMon->AccelerateEvVPutCount) += v.size();
140138
}
141139

@@ -210,9 +208,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
210208
HandleIncarnation(issue, orderNumber, record.GetIncarnationGuid());
211209
}
212210

213-
if (Action()) {
214-
return;
215-
}
211+
Action();
216212
}
217213

218214
void Handle(TEvBlobStorage::TEvVPutResult::TPtr &ev) {
@@ -265,7 +261,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
265261
if (status == NKikimrProto::BLOCKED || status == NKikimrProto::DEADLINE) {
266262
TString error = TStringBuilder() << "Got VPutResult status# " << status << " from VDiskId# " << vdiskId;
267263
TPutImpl::TPutResultVec putResults;
268-
PutImpl.PrepareOneReply(status, blobId.FullID(), blobIdx, LogCtx, std::move(error), putResults);
264+
PutImpl.PrepareOneReply(status, blobIdx, LogCtx, std::move(error), putResults);
269265
ReplyAndDieWithLastResponse(putResults);
270266
} else {
271267
PutImpl.ProcessResponse(*ev->Get());
@@ -351,7 +347,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
351347
Y_ABORT_UNLESS(itemStatus != NKikimrProto::RACE); // we should get RACE for the whole request and handle it in CheckForTermErrors
352348
if (itemStatus == NKikimrProto::BLOCKED || itemStatus == NKikimrProto::DEADLINE) {
353349
ErrorReason = TStringBuilder() << "Got VMultiPutResult itemStatus# " << itemStatus << " from VDiskId# " << vdiskId;
354-
PutImpl.PrepareOneReply(itemStatus, blobId.FullID(), blobIdx, LogCtx, ErrorReason, putResults);
350+
PutImpl.PrepareOneReply(itemStatus, blobIdx, LogCtx, ErrorReason, putResults);
355351
}
356352
}
357353
if (ReplyAndDieWithLastResponse(putResults)) {
@@ -405,7 +401,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
405401
return false;
406402
}
407403

408-
void SendReply(std::unique_ptr<TEvBlobStorage::TEvPutResult> putResult, ui64 blobIdx) {
404+
void SendReply(std::unique_ptr<TEvBlobStorage::TEvPutResult> putResult, size_t blobIdx) {
409405
NKikimrProto::EReplyStatus status = putResult->Status;
410406
A_LOG_LOG_S(false, status == NKikimrProto::OK ? NLog::PRI_INFO : NLog::PRI_NOTICE, "BPP21",
411407
"SendReply putResult# " << putResult->ToString() << " ResponsesSent# " << ResponsesSent
@@ -449,7 +445,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
449445
TString BlobIdSequenceToString() const {
450446
TStringBuilder blobIdsStr;
451447
blobIdsStr << '[';
452-
for (ui64 blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) {
448+
for (size_t blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) {
453449
if (blobIdx) {
454450
blobIdsStr << ' ';
455451
}
@@ -603,7 +599,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
603599

604600
StartTime = TActivationContext::Monotonic();
605601

606-
for (ui64 blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) {
602+
for (size_t blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) {
607603
LWTRACK(DSProxyPutBootstrapStart, PutImpl.Blobs[blobIdx].Orbit);
608604
}
609605

@@ -703,12 +699,11 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
703699
s << ' ';
704700
}
705701
s << i;
706-
auto& record = IncarnationRecords[i];
707-
s << '{';
708-
s << "IncarnationGuid# " << record.IncarnationGuid;
709-
s << " ExpirationTimestamp# " << record.ExpirationTimestamp;
710-
s << " StatusIssueTimestamp# " << record.StatusIssueTimestamp;
711-
s << '}';
702+
auto& r = IncarnationRecords[i];
703+
s << '{' << r.IncarnationGuid
704+
<< ' ' << (r.ExpirationTimestamp != TMonotonic::Max() ? TStringBuilder() << r.ExpirationTimestamp : "-"_sb)
705+
<< ' ' << (r.StatusIssueTimestamp != TMonotonic::Zero() ? TStringBuilder() << r.StatusIssueTimestamp : "-"_sb)
706+
<< '}';
712707
}
713708
s << '}';
714709
return s.Str();
@@ -735,7 +730,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
735730
}
736731

737732
STATEFN(StateWait) {
738-
if (ProcessEvent(ev, IsManyPuts)) {
733+
if (ProcessEvent(ev, true)) {
739734
return;
740735
}
741736
const ui32 type = ev->GetTypeRewrite();

0 commit comments

Comments
 (0)