Skip to content

Commit 4fde7f6

Browse files
mregrockEgor Kulin
and
Egor Kulin
authored
Fix for TEvPut/GetResult (#6040)
Co-authored-by: Egor Kulin <[email protected]>
1 parent 8e0efd4 commit 4fde7f6

File tree

9 files changed

+38
-17
lines changed

9 files changed

+38
-17
lines changed

ydb/core/base/blobstorage.h

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1001,7 +1001,7 @@ struct TEvBlobStorage {
10011001
NKikimrProto::EReplyStatus Status;
10021002
const TLogoBlobID Id;
10031003
const TStorageStatusFlags StatusFlags;
1004-
const TGroupId GroupId;
1004+
const ui32 GroupId;
10051005
const float ApproximateFreeSpaceShare; // 0.f has special meaning 'data could not be obtained'
10061006
TString ErrorReason;
10071007
bool WrittenBeyondBarrier = false; // was this blob written beyond the barrier?
@@ -1014,6 +1014,16 @@ struct TEvBlobStorage {
10141014
: Status(status)
10151015
, Id(id)
10161016
, StatusFlags(statusFlags)
1017+
, GroupId(groupId.GetRawId())
1018+
, ApproximateFreeSpaceShare(approximateFreeSpaceShare)
1019+
, StorageId(storageId)
1020+
{}
1021+
1022+
TEvPutResult(NKikimrProto::EReplyStatus status, const TLogoBlobID &id, const TStorageStatusFlags statusFlags,
1023+
ui32 groupId, float approximateFreeSpaceShare, const TString& storageId = Default<TString>())
1024+
: Status(status)
1025+
, Id(id)
1026+
, StatusFlags(statusFlags)
10171027
, GroupId(groupId)
10181028
, ApproximateFreeSpaceShare(approximateFreeSpaceShare)
10191029
, StorageId(storageId)
@@ -1231,7 +1241,7 @@ struct TEvBlobStorage {
12311241
// todo: replace with queue-like thing
12321242
ui32 ResponseSz;
12331243
TArrayHolder<TResponse> Responses;
1234-
const TGroupId GroupId;
1244+
const ui32 GroupId;
12351245
ui32 BlockedGeneration = 0; // valid only for requests with non-zero TabletId and true AcquireBlockedGeneration.
12361246
TString DebugInfo;
12371247
TString ErrorReason;
@@ -1242,6 +1252,13 @@ struct TEvBlobStorage {
12421252
TInstant Sent;
12431253

12441254
TEvGetResult(NKikimrProto::EReplyStatus status, ui32 sz, TGroupId groupId)
1255+
: Status(status)
1256+
, ResponseSz(sz)
1257+
, Responses(sz == 0 ? nullptr : new TResponse[sz])
1258+
, GroupId(groupId.GetRawId())
1259+
{}
1260+
1261+
TEvGetResult(NKikimrProto::EReplyStatus status, ui32 sz, ui32 groupId)
12451262
: Status(status)
12461263
, ResponseSz(sz)
12471264
, Responses(sz == 0 ? nullptr : new TResponse[sz])

ydb/core/blobstorage/dsproxy/dsproxy_get.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt
368368
LWTRACK(DSProxyGetReply, Orbit);
369369
evResult->Orbit = std::move(Orbit);
370370
LWPROBE(DSProxyRequestDuration, TEvBlobStorage::EvGet, requestSize, duration.SecondsFloat() * 1000.0, tabletId,
371-
evResult->GroupId.GetRawId(), channel, NKikimrBlobStorage::EGetHandleClass_Name(GetImpl.GetHandleClass()),
371+
evResult->GroupId, channel, NKikimrBlobStorage::EGetHandleClass_Name(GetImpl.GetHandleClass()),
372372
success);
373373
A_LOG_LOG_S(true, success ? NLog::PRI_INFO : NLog::PRI_NOTICE, "BPG68", "Result# " << evResult->Print(false));
374374
return SendResponseAndDie(std::unique_ptr<TEvBlobStorage::TEvGetResult>(evResult.Release()));

ydb/core/blobstorage/testing/group_overseer/group_overseer.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,11 @@ namespace NKikimr::NTesting {
9393
QueryToGroup.erase(it);
9494

9595
auto& msg = *ev.Get<T>();
96-
if constexpr (T::EventType != TEvBlobStorage::EvBlockResult &&
96+
97+
if constexpr (T::EventType == TEvBlobStorage::EvGetResult || T::EventType == TEvBlobStorage::EvPutResult){
98+
Y_ABORT_UNLESS(groupId == msg.GroupId);
99+
}
100+
else if constexpr (T::EventType != TEvBlobStorage::EvBlockResult &&
97101
T::EventType != TEvBlobStorage::EvInplacePatchResult &&
98102
T::EventType != TEvBlobStorage::EvCollectGarbageResult &&
99103
T::EventType != TEvBlobStorage::EvDiscoverResult) {

ydb/core/keyvalue/keyvalue_storage_read_request.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,11 +226,11 @@ class TKeyValueStorageReadRequest : public TActorBootstrapped<TKeyValueStorageRe
226226

227227
TGetBatch &batch = Batches[ev->Cookie];
228228

229-
if (result->GroupId.GetRawId() != batch.GroupId) {
229+
if (result->GroupId != batch.GroupId) {
230230
STLOG_WITH_ERROR_DESCRIPTION(ErrorDescription, NLog::PRI_ERROR, NKikimrServices::KEYVALUE, KV318,
231231
"Received EvGetResult from an unexpected storage group.",
232232
(KeyValue, TabletInfo->TabletID),
233-
(GroupId, result->GroupId.GetRawId()),
233+
(GroupId, result->GroupId),
234234
(ExpecetedGroupId, batch.GroupId),
235235
(Status, result->Status),
236236
(Deadline, IntermediateResult->Deadline.MilliSeconds()),

ydb/core/keyvalue/keyvalue_storage_request.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ class TKeyValueStorageRequest : public TActorBootstrapped<TKeyValueStorageReques
109109
IntermediateResults->Stat.PutLatencies.push_back(duration.MilliSeconds());
110110

111111
auto groupId = ev->Get()->GroupId;
112-
CheckYellow(ev->Get()->StatusFlags, groupId.GetRawId());
112+
CheckYellow(ev->Get()->StatusFlags, groupId);
113113

114114
NKikimrProto::EReplyStatus status = ev->Get()->Status;
115115
if (status != NKikimrProto::OK) {
@@ -157,8 +157,8 @@ class TKeyValueStorageRequest : public TActorBootstrapped<TKeyValueStorageReques
157157
wr->StatusFlags.Merge(ev->Get()->StatusFlags.Raw);
158158
wr->Latency = duration;
159159
++WriteRequestsReplied;
160-
IntermediateResults->Stat.GroupWrittenBytes[std::make_pair(ev->Get()->Id.Channel(), groupId.GetRawId())] += ev->Get()->Id.BlobSize();
161-
IntermediateResults->Stat.GroupWrittenIops[std::make_pair(ev->Get()->Id.Channel(), groupId.GetRawId())] += 1; // FIXME: count distinct blobs?
160+
IntermediateResults->Stat.GroupWrittenBytes[std::make_pair(ev->Get()->Id.Channel(), groupId)] += ev->Get()->Id.BlobSize();
161+
IntermediateResults->Stat.GroupWrittenIops[std::make_pair(ev->Get()->Id.Channel(), groupId)] += 1; // FIXME: count distinct blobs?
162162
UpdateRequest(ctx);
163163
}
164164

@@ -337,8 +337,8 @@ class TKeyValueStorageRequest : public TActorBootstrapped<TKeyValueStorageReques
337337
if (response.Status == NKikimrProto::OK) {
338338
Y_ABORT_UNLESS(response.Buffer.size() == readItem.BlobSize);
339339
Y_ABORT_UNLESS(readItem.ValueOffset + readItem.BlobSize <= read.ValueSize);
340-
IntermediateResults->Stat.GroupReadBytes[std::make_pair(response.Id.Channel(), groupId.GetRawId())] += response.Buffer.size();
341-
IntermediateResults->Stat.GroupReadIops[std::make_pair(response.Id.Channel(), groupId.GetRawId())] += 1; // FIXME: count distinct blobs?
340+
IntermediateResults->Stat.GroupReadBytes[std::make_pair(response.Id.Channel(), groupId)] += response.Buffer.size();
341+
IntermediateResults->Stat.GroupReadIops[std::make_pair(response.Id.Channel(), groupId)] += 1; // FIXME: count distinct blobs?
342342
read.Value.Write(readItem.ValueOffset, std::move(response.Buffer));
343343
} else {
344344
Y_VERIFY_DEBUG_S(response.Status != NKikimrProto::NODATA, "NODATA received for TEvGet"

ydb/core/tablet/tablet_req_rebuildhistory.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -589,8 +589,8 @@ class TTabletReqRebuildHistoryGraph : public TActorBootstrapped<TTabletReqRebuil
589589
switch (response.Status) {
590590
case NKikimrProto::OK:
591591
Y_ABORT_UNLESS(1 == RefsToCheck.erase(response.Id));
592-
GroupReadBytes[std::make_pair(response.Id.Channel(), msg->GroupId.GetRawId())] += response.Buffer.size();
593-
GroupReadOps[std::make_pair(response.Id.Channel(), msg->GroupId.GetRawId())] += 1;
592+
GroupReadBytes[std::make_pair(response.Id.Channel(), msg->GroupId)] += response.Buffer.size();
593+
GroupReadOps[std::make_pair(response.Id.Channel(), msg->GroupId)] += 1;
594594
break;
595595
case NKikimrProto::NODATA:
596596
BLOG_W("TTabletReqRebuildHistoryGraph::CheckReferences - NODATA for blob " << response.Id, "TRRH07");

ydb/core/tablet/tablet_req_writelog.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ class TTabletReqWriteLog : public TActorBootstrapped<TTabletReqWriteLog> {
4949
case NKikimrProto::OK:
5050
LOG_DEBUG_S(ctx, NKikimrServices::TABLET_MAIN, "Put Result: " << msg->Print(false));
5151

52-
GroupWrittenBytes[std::make_pair(msg->Id.Channel(), msg->GroupId.GetRawId())] += msg->Id.BlobSize();
53-
GroupWrittenOps[std::make_pair(msg->Id.Channel(), msg->GroupId.GetRawId())] += 1;
52+
GroupWrittenBytes[std::make_pair(msg->Id.Channel(), msg->GroupId)] += msg->Id.BlobSize();
53+
GroupWrittenOps[std::make_pair(msg->Id.Channel(), msg->GroupId)] += 1;
5454

5555
ResponseCookies ^= ev->Cookie;
5656

ydb/core/tablet_flat/flat_ops_compact.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -454,7 +454,7 @@ namespace NTabletFlatExecutor {
454454
}
455455

456456
if (ok) {
457-
Send(Owner, new NBlockIO::TEvStat(NBlockIO::EDir::Write, NBlockIO::EPriority::Bulk, msg.GroupId.GetRawId(), msg.Id));
457+
Send(Owner, new NBlockIO::TEvStat(NBlockIO::EDir::Write, NBlockIO::EPriority::Bulk, msg.GroupId, msg.Id));
458458

459459
while (!WriteQueue.empty() && Writing < MaxFlight) {
460460
SendToBs(std::move(WriteQueue.front()));

ydb/core/tx/columnshard/engines/writer/write_controller.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
namespace NKikimr::NColumnShard {
55

66
void IWriteController::OnBlobWriteResult(const TEvBlobStorage::TEvPutResult& result) {
7-
NOlap::TUnifiedBlobId blobId(result.GroupId.GetRawId(), result.Id);
7+
NOlap::TUnifiedBlobId blobId(result.GroupId, result.Id);
88
auto it = WaitingActions.find(result.StorageId ? result.StorageId : NOlap::IStoragesManager::DefaultStorageId);
99
AFL_VERIFY(it != WaitingActions.end());
1010
it->second->OnBlobWriteResult(blobId, result.Status);

0 commit comments

Comments
 (0)