Skip to content

Commit d0fceb7

Browse files
authored
Untie Keep/DoNotKeep flags from blackboard KIKIMR-20527 (#773)
1 parent 60568b3 commit d0fceb7

File tree

4 files changed

+24
-25
lines changed

4 files changed

+24
-25
lines changed

ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp

+6-13
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ bool TBlobState::Restore(const TBlobStorageGroupInfo &info) {
8787
}
8888

8989
void TBlobState::AddResponseData(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber,
90-
ui32 shift, TRope&& data, bool keep, bool doNotKeep) {
90+
ui32 shift, TRope&& data) {
9191
// Add actual data to Parts
9292
Y_ABORT_UNLESS(id.PartId() != 0);
9393
const ui32 partIdx = id.PartId() - 1;
@@ -112,9 +112,6 @@ void TBlobState::AddResponseData(const TBlobStorageGroupInfo &info, const TLogoB
112112
TIntervalVec<i32> responseInterval(shift, shift + dataSize);
113113
diskPart.Requested.Subtract(responseInterval);
114114
}
115-
116-
Keep |= keep;
117-
DoNotKeep |= doNotKeep;
118115
}
119116

120117
void TBlobState::AddNoDataResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber) {
@@ -164,8 +161,7 @@ void TBlobState::AddErrorResponse(const TBlobStorageGroupInfo &info, const TLogo
164161
diskPart.Requested.Clear();
165162
}
166163

167-
void TBlobState::AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber,
168-
bool keep, bool doNotKeep) {
164+
void TBlobState::AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber) {
169165
Y_ABORT_UNLESS(id.PartId() != 0);
170166
const ui32 partIdx = id.PartId() - 1;
171167
IsChanged = true;
@@ -179,9 +175,6 @@ void TBlobState::AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLog
179175
TDiskPart &diskPart = disk.DiskParts[partIdx];
180176
diskPart.Situation = ESituation::Lost;
181177
diskPart.Requested.Clear();
182-
183-
Keep |= keep;
184-
DoNotKeep |= doNotKeep;
185178
}
186179

187180
ui64 TBlobState::GetPredictedDelayNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
@@ -377,11 +370,11 @@ void TBlackboard::AddPutOkResponse(const TLogoBlobID &id, ui32 orderNumber) {
377370
state.AddPutOkResponse(*Info, id, orderNumber);
378371
}
379372

380-
void TBlackboard::AddResponseData(const TLogoBlobID &id, ui32 orderNumber, ui32 shift, TRope&& data, bool keep, bool doNotKeep) {
373+
void TBlackboard::AddResponseData(const TLogoBlobID &id, ui32 orderNumber, ui32 shift, TRope&& data) {
381374
Y_ABORT_UNLESS(bool(id));
382375
Y_ABORT_UNLESS(id.PartId() != 0);
383376
TBlobState &state = GetState(id);
384-
state.AddResponseData(*Info, id, orderNumber, shift, std::move(data), keep, doNotKeep);
377+
state.AddResponseData(*Info, id, orderNumber, shift, std::move(data));
385378
}
386379

387380
void TBlackboard::AddNoDataResponse(const TLogoBlobID &id, ui32 orderNumber) {
@@ -391,11 +384,11 @@ void TBlackboard::AddNoDataResponse(const TLogoBlobID &id, ui32 orderNumber) {
391384
state.AddNoDataResponse(*Info, id, orderNumber);
392385
}
393386

394-
void TBlackboard::AddNotYetResponse(const TLogoBlobID &id, ui32 orderNumber, bool keep, bool doNotKeep) {
387+
void TBlackboard::AddNotYetResponse(const TLogoBlobID &id, ui32 orderNumber) {
395388
Y_ABORT_UNLESS(bool(id));
396389
Y_ABORT_UNLESS(id.PartId() != 0);
397390
TBlobState &state = GetState(id);
398-
state.AddNotYetResponse(*Info, id, orderNumber, keep, doNotKeep);
391+
state.AddNotYetResponse(*Info, id, orderNumber);
399392
}
400393

401394
void TBlackboard::AddErrorResponse(const TLogoBlobID &id, ui32 orderNumber) {

ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h

+4-7
Original file line numberDiff line numberDiff line change
@@ -84,21 +84,18 @@ struct TBlobState {
8484
NKikimrProto::EReplyStatus Status = NKikimrProto::UNKNOWN;
8585
ui8 BlobIdx;
8686
bool IsChanged = false;
87-
bool Keep = false;
88-
bool DoNotKeep = false;
8987

9088
void Init(const TLogoBlobID &id, const TBlobStorageGroupInfo &Info);
9189
void AddNeeded(ui64 begin, ui64 size);
9290
void AddPartToPut(ui32 partIdx, TRope&& partData);
9391
void MarkBlobReadyToPut(ui8 blobIdx = 0);
9492
bool Restore(const TBlobStorageGroupInfo &info);
9593
void AddResponseData(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring,
96-
ui32 shift, TRope&& data, bool keep, bool doNotKeep);
94+
ui32 shift, TRope&& data);
9795
void AddPutOkResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber);
9896
void AddNoDataResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring);
9997
void AddErrorResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring);
100-
void AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring,
101-
bool keep, bool doNotKeep);
98+
void AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring);
10299
ui64 GetPredictedDelayNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
103100
ui32 diskIdxInSubring, NKikimrBlobStorage::EVDiskQueueId queueId) const;
104101
void GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
@@ -204,11 +201,11 @@ struct TBlackboard {
204201
void AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TRope&& partData);
205202
void MarkBlobReadyToPut(const TLogoBlobID &id, ui8 blobIdx = 0);
206203
void MoveBlobStateToDone(const TLogoBlobID &id);
207-
void AddResponseData(const TLogoBlobID &id, ui32 orderNumber, ui32 shift, TRope&& data, bool keep, bool doNotKeep);
204+
void AddResponseData(const TLogoBlobID &id, ui32 orderNumber, ui32 shift, TRope&& data);
208205
void AddPutOkResponse(const TLogoBlobID &id, ui32 orderNumber);
209206
void AddNoDataResponse(const TLogoBlobID &id, ui32 orderNumber);
210207
void AddErrorResponse(const TLogoBlobID &id, ui32 orderNumber);
211-
void AddNotYetResponse(const TLogoBlobID &id, ui32 orderNumber, bool keep, bool doNotKeep);
208+
void AddNotYetResponse(const TLogoBlobID &id, ui32 orderNumber);
212209
EStrategyOutcome RunStrategies(TLogContext& logCtx, const TStackVec<IStrategy*, 1>& strategies,
213210
TBatchedVec<TBlobStates::value_type*> *finished = nullptr, const TBlobStorageGroupInfo::TGroupVDisks *expired = nullptr);
214211
EStrategyOutcome RunStrategy(TLogContext &logCtx, const IStrategy& s, TBatchedVec<TBlobStates::value_type*> *finished = nullptr,

ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp

+4-2
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,14 @@ void TGetImpl::PrepareReply(NKikimrProto::EReplyStatus status, TString errorReas
4444
const TBlobState &blobState = Blackboard.GetState(query.Id);
4545
outResponse.Id = query.Id;
4646
outResponse.PartMap = blobState.PartMap;
47-
outResponse.Keep = blobState.Keep;
48-
outResponse.DoNotKeep = blobState.DoNotKeep;
4947
outResponse.LooksLikePhantom = PhantomCheck
5048
? std::make_optional(blobState.WholeSituation == TBlobState::ESituation::Absent)
5149
: std::nullopt;
5250

51+
// fill in keep/doNotKeep flags
52+
const auto it = BlobFlags.find(query.Id);
53+
std::tie(outResponse.Keep, outResponse.DoNotKeep) = it != BlobFlags.end() ? it->second : std::make_tuple(false, false);
54+
5355
if (blobState.WholeSituation == TBlobState::ESituation::Absent) {
5456
bool okay = true;
5557

ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h

+10-3
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ class TGetImpl {
5353

5454
std::optional<TEvBlobStorage::TEvGet::TReaderTabletData> ReaderTabletData;
5555

56+
std::unordered_map<TLogoBlobID, std::tuple<bool, bool>> BlobFlags; // keep, doNotKeep per blob
57+
5658
public:
5759
TGetImpl(const TIntrusivePtr<TBlobStorageGroupInfo> &info, const TIntrusivePtr<TGroupQueues> &groupQueues,
5860
TEvBlobStorage::TEvGet *ev, TNodeLayoutInfoPtr&& nodeLayout, const TString& requestPrefix = {})
@@ -219,6 +221,12 @@ class TGetImpl {
219221
}
220222
}
221223

224+
if (result.HasKeep() || result.HasDoNotKeep()) {
225+
auto& [a, b] = BlobFlags[blobId];
226+
a |= result.GetKeep();
227+
b |= result.GetDoNotKeep();
228+
}
229+
222230
if (replyStatus == NKikimrProto::OK) {
223231
// TODO(cthulhu): Verify shift and response size, and cookie
224232
R_LOG_DEBUG_SX(logCtx, "BPG58", "Got# OK orderNumber# " << orderNumber << " vDiskId# " << vdisk.ToString());
@@ -228,8 +236,7 @@ class TGetImpl {
228236
resultBuffer.ExtractFrontPlain(temp.GetDataMut(), temp.size());
229237
resultBuffer.Insert(resultBuffer.End(), std::move(temp));
230238
}
231-
Blackboard.AddResponseData(blobId, orderNumber, resultShift, std::move(resultBuffer), result.GetKeep(),
232-
result.GetDoNotKeep());
239+
Blackboard.AddResponseData(blobId, orderNumber, resultShift, std::move(resultBuffer));
233240
} else if (replyStatus == NKikimrProto::NODATA) {
234241
R_LOG_DEBUG_SX(logCtx, "BPG59", "Got# NODATA orderNumber# " << orderNumber
235242
<< " vDiskId# " << vdisk.ToString());
@@ -243,7 +250,7 @@ class TGetImpl {
243250
} else if (replyStatus == NKikimrProto::NOT_YET) {
244251
R_LOG_DEBUG_SX(logCtx, "BPG67", "Got# NOT_YET orderNumber# " << orderNumber
245252
<< " vDiskId# " << vdisk.ToString());
246-
Blackboard.AddNotYetResponse(blobId, orderNumber, result.GetKeep(), result.GetDoNotKeep());
253+
Blackboard.AddNotYetResponse(blobId, orderNumber);
247254
} else {
248255
Y_ABORT_UNLESS(false, "Unexpected reply status# %s", NKikimrProto::EReplyStatus_Name(replyStatus).data());
249256
}

0 commit comments

Comments
 (0)