Skip to content

Untie Keep/DoNotKeep flags from blackboard KIKIMR-20527 #773

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 6 additions & 13 deletions ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ bool TBlobState::Restore(const TBlobStorageGroupInfo &info) {
}

void TBlobState::AddResponseData(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber,
ui32 shift, TRope&& data, bool keep, bool doNotKeep) {
ui32 shift, TRope&& data) {
// Add actual data to Parts
Y_ABORT_UNLESS(id.PartId() != 0);
const ui32 partIdx = id.PartId() - 1;
Expand All @@ -112,9 +112,6 @@ void TBlobState::AddResponseData(const TBlobStorageGroupInfo &info, const TLogoB
TIntervalVec<i32> responseInterval(shift, shift + dataSize);
diskPart.Requested.Subtract(responseInterval);
}

Keep |= keep;
DoNotKeep |= doNotKeep;
}

void TBlobState::AddNoDataResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber) {
Expand Down Expand Up @@ -164,8 +161,7 @@ void TBlobState::AddErrorResponse(const TBlobStorageGroupInfo &info, const TLogo
diskPart.Requested.Clear();
}

void TBlobState::AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber,
bool keep, bool doNotKeep) {
void TBlobState::AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber) {
Y_ABORT_UNLESS(id.PartId() != 0);
const ui32 partIdx = id.PartId() - 1;
IsChanged = true;
Expand All @@ -179,9 +175,6 @@ void TBlobState::AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLog
TDiskPart &diskPart = disk.DiskParts[partIdx];
diskPart.Situation = ESituation::Lost;
diskPart.Requested.Clear();

Keep |= keep;
DoNotKeep |= doNotKeep;
}

ui64 TBlobState::GetPredictedDelayNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
Expand Down Expand Up @@ -377,11 +370,11 @@ void TBlackboard::AddPutOkResponse(const TLogoBlobID &id, ui32 orderNumber) {
state.AddPutOkResponse(*Info, id, orderNumber);
}

void TBlackboard::AddResponseData(const TLogoBlobID &id, ui32 orderNumber, ui32 shift, TRope&& data, bool keep, bool doNotKeep) {
void TBlackboard::AddResponseData(const TLogoBlobID &id, ui32 orderNumber, ui32 shift, TRope&& data) {
Y_ABORT_UNLESS(bool(id));
Y_ABORT_UNLESS(id.PartId() != 0);
TBlobState &state = GetState(id);
state.AddResponseData(*Info, id, orderNumber, shift, std::move(data), keep, doNotKeep);
state.AddResponseData(*Info, id, orderNumber, shift, std::move(data));
}

void TBlackboard::AddNoDataResponse(const TLogoBlobID &id, ui32 orderNumber) {
Expand All @@ -391,11 +384,11 @@ void TBlackboard::AddNoDataResponse(const TLogoBlobID &id, ui32 orderNumber) {
state.AddNoDataResponse(*Info, id, orderNumber);
}

void TBlackboard::AddNotYetResponse(const TLogoBlobID &id, ui32 orderNumber, bool keep, bool doNotKeep) {
void TBlackboard::AddNotYetResponse(const TLogoBlobID &id, ui32 orderNumber) {
Y_ABORT_UNLESS(bool(id));
Y_ABORT_UNLESS(id.PartId() != 0);
TBlobState &state = GetState(id);
state.AddNotYetResponse(*Info, id, orderNumber, keep, doNotKeep);
state.AddNotYetResponse(*Info, id, orderNumber);
}

void TBlackboard::AddErrorResponse(const TLogoBlobID &id, ui32 orderNumber) {
Expand Down
11 changes: 4 additions & 7 deletions ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,21 +84,18 @@ struct TBlobState {
NKikimrProto::EReplyStatus Status = NKikimrProto::UNKNOWN;
ui8 BlobIdx;
bool IsChanged = false;
bool Keep = false;
bool DoNotKeep = false;

void Init(const TLogoBlobID &id, const TBlobStorageGroupInfo &Info);
void AddNeeded(ui64 begin, ui64 size);
void AddPartToPut(ui32 partIdx, TRope&& partData);
void MarkBlobReadyToPut(ui8 blobIdx = 0);
bool Restore(const TBlobStorageGroupInfo &info);
void AddResponseData(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring,
ui32 shift, TRope&& data, bool keep, bool doNotKeep);
ui32 shift, TRope&& data);
void AddPutOkResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber);
void AddNoDataResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring);
void AddErrorResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring);
void AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring,
bool keep, bool doNotKeep);
void AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring);
ui64 GetPredictedDelayNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
ui32 diskIdxInSubring, NKikimrBlobStorage::EVDiskQueueId queueId) const;
void GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
Expand Down Expand Up @@ -204,11 +201,11 @@ struct TBlackboard {
void AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TRope&& partData);
void MarkBlobReadyToPut(const TLogoBlobID &id, ui8 blobIdx = 0);
void MoveBlobStateToDone(const TLogoBlobID &id);
void AddResponseData(const TLogoBlobID &id, ui32 orderNumber, ui32 shift, TRope&& data, bool keep, bool doNotKeep);
void AddResponseData(const TLogoBlobID &id, ui32 orderNumber, ui32 shift, TRope&& data);
void AddPutOkResponse(const TLogoBlobID &id, ui32 orderNumber);
void AddNoDataResponse(const TLogoBlobID &id, ui32 orderNumber);
void AddErrorResponse(const TLogoBlobID &id, ui32 orderNumber);
void AddNotYetResponse(const TLogoBlobID &id, ui32 orderNumber, bool keep, bool doNotKeep);
void AddNotYetResponse(const TLogoBlobID &id, ui32 orderNumber);
EStrategyOutcome RunStrategies(TLogContext& logCtx, const TStackVec<IStrategy*, 1>& strategies,
TBatchedVec<TBlobStates::value_type*> *finished = nullptr, const TBlobStorageGroupInfo::TGroupVDisks *expired = nullptr);
EStrategyOutcome RunStrategy(TLogContext &logCtx, const IStrategy& s, TBatchedVec<TBlobStates::value_type*> *finished = nullptr,
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ void TGetImpl::PrepareReply(NKikimrProto::EReplyStatus status, TString errorReas
const TBlobState &blobState = Blackboard.GetState(query.Id);
outResponse.Id = query.Id;
outResponse.PartMap = blobState.PartMap;
outResponse.Keep = blobState.Keep;
outResponse.DoNotKeep = blobState.DoNotKeep;
outResponse.LooksLikePhantom = PhantomCheck
? std::make_optional(blobState.WholeSituation == TBlobState::ESituation::Absent)
: std::nullopt;

// fill in keep/doNotKeep flags
const auto it = BlobFlags.find(query.Id);
std::tie(outResponse.Keep, outResponse.DoNotKeep) = it != BlobFlags.end() ? it->second : std::make_tuple(false, false);

if (blobState.WholeSituation == TBlobState::ESituation::Absent) {
bool okay = true;

Expand Down
13 changes: 10 additions & 3 deletions ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class TGetImpl {

std::optional<TEvBlobStorage::TEvGet::TReaderTabletData> ReaderTabletData;

std::unordered_map<TLogoBlobID, std::tuple<bool, bool>> BlobFlags; // keep, doNotKeep per blob

public:
TGetImpl(const TIntrusivePtr<TBlobStorageGroupInfo> &info, const TIntrusivePtr<TGroupQueues> &groupQueues,
TEvBlobStorage::TEvGet *ev, TNodeLayoutInfoPtr&& nodeLayout, const TString& requestPrefix = {})
Expand Down Expand Up @@ -219,6 +221,12 @@ class TGetImpl {
}
}

if (result.HasKeep() || result.HasDoNotKeep()) {
auto& [a, b] = BlobFlags[blobId];
a |= result.GetKeep();
b |= result.GetDoNotKeep();
}

if (replyStatus == NKikimrProto::OK) {
// TODO(cthulhu): Verify shift and response size, and cookie
R_LOG_DEBUG_SX(logCtx, "BPG58", "Got# OK orderNumber# " << orderNumber << " vDiskId# " << vdisk.ToString());
Expand All @@ -228,8 +236,7 @@ class TGetImpl {
resultBuffer.ExtractFrontPlain(temp.GetDataMut(), temp.size());
resultBuffer.Insert(resultBuffer.End(), std::move(temp));
}
Blackboard.AddResponseData(blobId, orderNumber, resultShift, std::move(resultBuffer), result.GetKeep(),
result.GetDoNotKeep());
Blackboard.AddResponseData(blobId, orderNumber, resultShift, std::move(resultBuffer));
} else if (replyStatus == NKikimrProto::NODATA) {
R_LOG_DEBUG_SX(logCtx, "BPG59", "Got# NODATA orderNumber# " << orderNumber
<< " vDiskId# " << vdisk.ToString());
Expand All @@ -243,7 +250,7 @@ class TGetImpl {
} else if (replyStatus == NKikimrProto::NOT_YET) {
R_LOG_DEBUG_SX(logCtx, "BPG67", "Got# NOT_YET orderNumber# " << orderNumber
<< " vDiskId# " << vdisk.ToString());
Blackboard.AddNotYetResponse(blobId, orderNumber, result.GetKeep(), result.GetDoNotKeep());
Blackboard.AddNotYetResponse(blobId, orderNumber);
} else {
Y_ABORT_UNLESS(false, "Unexpected reply status# %s", NKikimrProto::EReplyStatus_Name(replyStatus).data());
}
Expand Down