Skip to content

Commit 32f07d1

Browse files
authored
Reduce dsproxy templateness even more (#6646)
1 parent a303aa8 commit 32f07d1

17 files changed

+339
-334
lines changed

ydb/core/blobstorage/dsproxy/dsproxy.h

Lines changed: 46 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -141,26 +141,46 @@ NActors::NLog::EPriority PriorityForStatusOutbound(NKikimrProto::EReplyStatus st
141141
NActors::NLog::EPriority PriorityForStatusResult(NKikimrProto::EReplyStatus status);
142142
NActors::NLog::EPriority PriorityForStatusInbound(NKikimrProto::EReplyStatus status);
143143

144+
#define DSPROXY_ENUM_EVENTS(XX) \
145+
XX(TEvBlobStorage::TEvPut) \
146+
XX(TEvBlobStorage::TEvGet) \
147+
XX(TEvBlobStorage::TEvBlock) \
148+
XX(TEvBlobStorage::TEvDiscover) \
149+
XX(TEvBlobStorage::TEvRange) \
150+
XX(TEvBlobStorage::TEvCollectGarbage) \
151+
XX(TEvBlobStorage::TEvStatus) \
152+
XX(TEvBlobStorage::TEvPatch) \
153+
XX(TEvBlobStorage::TEvAssimilate) \
154+
//
155+
156+
#define DSPROXY_ENUM_DISK_EVENTS(XX) \
157+
XX(TEvBlobStorage::TEvVMovedPatch) \
158+
XX(TEvBlobStorage::TEvVPatchStart) \
159+
XX(TEvBlobStorage::TEvVPatchDiff) \
160+
XX(TEvBlobStorage::TEvVPatchXorDiff) \
161+
XX(TEvBlobStorage::TEvVPut) \
162+
XX(TEvBlobStorage::TEvVMultiPut) \
163+
XX(TEvBlobStorage::TEvVGet) \
164+
XX(TEvBlobStorage::TEvVBlock) \
165+
XX(TEvBlobStorage::TEvVGetBlock) \
166+
XX(TEvBlobStorage::TEvVCollectGarbage) \
167+
XX(TEvBlobStorage::TEvVGetBarrier) \
168+
XX(TEvBlobStorage::TEvVStatus) \
169+
XX(TEvBlobStorage::TEvVAssimilate) \
170+
//
171+
144172
inline void SetExecutionRelay(IEventBase& ev, std::shared_ptr<TEvBlobStorage::TExecutionRelay> executionRelay) {
145173
switch (const ui32 type = ev.Type()) {
146174
#define XX(T) \
147-
case TEvBlobStorage::Ev##T: \
148-
static_cast<TEvBlobStorage::TEv##T&>(ev).ExecutionRelay = std::move(executionRelay); \
175+
case T::EventType: \
176+
static_cast<T&>(ev).ExecutionRelay = std::move(executionRelay); \
149177
break; \
150-
case TEvBlobStorage::Ev##T##Result: \
151-
static_cast<TEvBlobStorage::TEv##T##Result&>(ev).ExecutionRelay = std::move(executionRelay); \
178+
case T##Result::EventType: \
179+
static_cast<T##Result&>(ev).ExecutionRelay = std::move(executionRelay); \
152180
break; \
153181
//
154182

155-
XX(Put)
156-
XX(Get)
157-
XX(Block)
158-
XX(Discover)
159-
XX(Range)
160-
XX(CollectGarbage)
161-
XX(Status)
162-
XX(Patch)
163-
XX(Assimilate)
183+
DSPROXY_ENUM_EVENTS(XX)
164184
#undef XX
165185

166186
default:
@@ -220,96 +240,26 @@ class TBlobStorageGroupRequestActor : public TActor<TBlobStorageGroupRequestActo
220240

221241
void BootstrapImpl();
222242

223-
template<typename T>
224-
void CountEvent(const T &ev) const {
225-
ERequestType request = GetRequestType();
226-
Mon->CountEvent(request, ev);
227-
}
243+
void CountEvent(IEventBase *ev, ui32 type) const;
244+
245+
void CountPut(ui32 bufferBytes);
228246

229247
TActorId GetVDiskActorId(const TVDiskIdShort &shortId) const;
230248

249+
bool CheckForTermErrors(bool suppressCommonErrors, const NProtoBuf::Message& record, ui32 type,
250+
NKikimrProto::EReplyStatus status, TVDiskID vdiskId, const NKikimrBlobStorage::TGroupInfo *group,
251+
bool& setErrorAndPostpone, bool& setRaceToError);
231252
bool ProcessEvent(TAutoPtr<IEventHandle>& ev, bool suppressCommonErrors = false);
232253

233-
template<typename TEv>
234-
void CountPut(const std::unique_ptr<TEv>& ev) {
235-
++GeneratedSubrequests;
236-
GeneratedSubrequestBytes += ev->GetBufferBytes();
237-
}
238-
239-
template<typename TEv>
240-
void CountPuts(const TDeque<std::unique_ptr<TEv>>& q) {
241-
for (const auto& item : q) {
242-
CountPut(item);
243-
}
244-
}
245-
246-
template<typename... TOptions>
247-
void CountPuts(const TDeque<std::variant<TOptions...>>& q) {
248-
for (const auto& item : q) {
249-
std::visit([&](auto& item) { CountPut(item); }, item);
250-
}
251-
}
252-
253-
template<typename T>
254-
void SendToQueue(std::unique_ptr<T> event, ui64 cookie, bool timeStatsEnabled = false) {
255-
if constexpr (
256-
!std::is_same_v<T, TEvBlobStorage::TEvVGetBlock>
257-
&& !std::is_same_v<T, TEvBlobStorage::TEvVBlock>
258-
&& !std::is_same_v<T, TEvBlobStorage::TEvVStatus>
259-
&& !std::is_same_v<T, TEvBlobStorage::TEvVCollectGarbage>
260-
&& !std::is_same_v<T, TEvBlobStorage::TEvVAssimilate>
261-
) {
262-
const ui64 cyclesPerUs = NHPTimer::GetCyclesPerSecond() / 1000000;
263-
event->Record.MutableTimestamps()->SetSentByDSProxyUs(GetCycleCountFast() / cyclesPerUs);
264-
}
265-
266-
if constexpr (!std::is_same_v<T, TEvBlobStorage::TEvVStatus> && !std::is_same_v<T, TEvBlobStorage::TEvVAssimilate>) {
267-
event->MessageRelevanceTracker = MessageRelevanceTracker;
268-
ui64 cost;
269-
if constexpr (std::is_same_v<T, TEvBlobStorage::TEvVMultiPut>) {
270-
bool internalQueue;
271-
cost = CostModel->GetCost(*event, &internalQueue);
272-
} else {
273-
cost = CostModel->GetCost(*event);
274-
}
275-
*PoolCounters->DSProxyDiskCostCounter += cost;
276-
277-
LOG_TRACE_S(TActivationContext::AsActorContext(), NKikimrServices::BS_REQUEST_COST,
278-
"DSProxy Request Type# " << TypeName(*event) << " Cost# " << cost);
279-
}
254+
void SendToQueue(std::unique_ptr<IEventBase> event, ui64 cookie, bool timeStatsEnabled = false);
280255

281-
const TActorId queueId = GroupQueues->Send(*this, Info->GetTopology(), std::move(event), cookie, Span.GetTraceId(),
282-
timeStatsEnabled);
283-
++RequestsInFlight;
284-
}
285-
286-
void SendToQueues(TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &vGets, bool timeStatsEnabled);
287-
288-
template<typename TEvent>
289-
void SendToQueues(TDeque<std::unique_ptr<TEvent>> &events, bool timeStatsEnabled) {
290-
for (auto& request : events) {
291-
ui64 messageCookie = request->Record.GetCookie();
292-
CountEvent(*request);
293-
TLogoBlobID id = GetBlobId(request);
294-
TVDiskID vDiskId = VDiskIDFromVDiskID(request->Record.GetVDiskID());
295-
LWTRACK(DSProxyPutVPutIsSent, request->Orbit, Info->GetFailDomainOrderNumber(vDiskId),
296-
Info->GroupID.GetRawId(), id.Channel(), id.PartId(), id.ToString(), id.BlobSize());
297-
SendToQueue(std::move(request), messageCookie, timeStatsEnabled);
298-
}
299-
}
300-
301-
template<typename TPtr>
302-
void ProcessReplyFromQueue(const TPtr& /*ev*/) {
303-
Y_ABORT_UNLESS(RequestsInFlight);
304-
--RequestsInFlight;
305-
CheckPostponedQueue();
306-
}
256+
void ProcessReplyFromQueue(IEventBase *ev);
307257

308-
TLogoBlobID GetBlobId(std::unique_ptr<TEvBlobStorage::TEvVPut> &ev);
309-
TLogoBlobID GetBlobId(std::unique_ptr<TEvBlobStorage::TEvVMultiPut> &ev);
310-
TLogoBlobID GetBlobId(std::unique_ptr<TEvBlobStorage::TEvVMovedPatch> &ev);
311-
TLogoBlobID GetBlobId(std::unique_ptr<TEvBlobStorage::TEvVPatchStart> &ev);
312-
TLogoBlobID GetBlobId(std::unique_ptr<TEvBlobStorage::TEvVPatchDiff> &ev);
258+
static TLogoBlobID GetBlobId(TEvBlobStorage::TEvVPut& ev);
259+
static TLogoBlobID GetBlobId(TEvBlobStorage::TEvVMultiPut& ev);
260+
static TLogoBlobID GetBlobId(TEvBlobStorage::TEvVMovedPatch& ev);
261+
static TLogoBlobID GetBlobId(TEvBlobStorage::TEvVPatchStart& ev);
262+
static TLogoBlobID GetBlobId(TEvBlobStorage::TEvVPatchDiff& ev);
313263

314264
void SendToProxy(std::unique_ptr<IEventBase> event, ui64 cookie = 0, NWilson::TTraceId traceId = {});
315265
void SendResponseAndDie(std::unique_ptr<IEventBase>&& ev, TBlobStorageGroupProxyTimeStats *timeStats, TActorId source, ui64 cookie);

ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ class TBlobStorageGroupAssimilateRequest : public TBlobStorageGroupRequestActor
327327
}
328328

329329
void Handle(TEvBlobStorage::TEvVAssimilateResult::TPtr ev) {
330-
ProcessReplyFromQueue(ev);
330+
ProcessReplyFromQueue(ev->Get());
331331

332332
const auto& record = ev->Get()->Record;
333333
const TVDiskID vdiskId = VDiskIDFromVDiskID(record.GetVDiskID());

ydb/core/blobstorage/dsproxy/dsproxy_block.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ class TBlobStorageGroupBlockRequest : public TBlobStorageGroupRequestActor {
2222
TGroupQuorumTracker QuorumTracker;
2323

2424
void Handle(TEvBlobStorage::TEvVBlockResult::TPtr &ev) {
25-
ProcessReplyFromQueue(ev);
25+
ProcessReplyFromQueue(ev->Get());
2626
const NKikimrBlobStorage::TEvVBlockResult &record = ev->Get()->Record;
2727
Y_ABORT_UNLESS(record.HasStatus());
2828
const NKikimrProto::EReplyStatus status = record.GetStatus();
@@ -47,7 +47,7 @@ class TBlobStorageGroupBlockRequest : public TBlobStorageGroupRequestActor {
4747
}
4848

4949
void Handle(TEvBlobStorage::TEvVStatusResult::TPtr &ev) {
50-
ProcessReplyFromQueue(ev);
50+
ProcessReplyFromQueue(ev->Get());
5151
const auto& record = ev->Get()->Record;
5252
if (record.HasStatus() && record.HasVDiskID()) {
5353
Process(record.GetStatus(), VDiskIDFromVDiskID(record.GetVDiskID()), record.HasIncarnationGuid()

ydb/core/blobstorage/dsproxy/dsproxy_collect.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class TBlobStorageGroupCollectGarbageRequest : public TBlobStorageGroupRequestAc
3333
ui32 ResponsesReceived = 0;
3434

3535
void Handle(TEvBlobStorage::TEvVCollectGarbageResult::TPtr &ev) {
36-
ProcessReplyFromQueue(ev);
36+
ProcessReplyFromQueue(ev->Get());
3737
ResponsesReceived++;
3838
const NKikimrBlobStorage::TEvVCollectGarbageResult &record = ev->Get()->Record;
3939
Y_ABORT_UNLESS(record.HasStatus());
@@ -49,7 +49,7 @@ class TBlobStorageGroupCollectGarbageRequest : public TBlobStorageGroupRequestAc
4949
}
5050

5151
void Handle(TEvBlobStorage::TEvVStatusResult::TPtr &ev) {
52-
ProcessReplyFromQueue(ev);
52+
ProcessReplyFromQueue(ev->Get());
5353
ResponsesReceived++;
5454
const auto& record = ev->Get()->Record;
5555
if (record.HasStatus() && record.HasVDiskID()) {

ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ class TBlobStorageGroupDiscoverRequest : public TBlobStorageGroupRequestActor {
312312
}
313313

314314
void Handle(TEvBlobStorage::TEvVGetBlockResult::TPtr &ev) {
315-
ProcessReplyFromQueue(ev);
315+
ProcessReplyFromQueue(ev->Get());
316316

317317
TotalRecieved++;
318318
NKikimrBlobStorage::TEvVGetBlockResult &record = ev->Get()->Record;
@@ -354,8 +354,7 @@ class TBlobStorageGroupDiscoverRequest : public TBlobStorageGroupRequestActor {
354354
}
355355

356356
void HandleIgnore(TEvBlobStorage::TEvVGetResult::TPtr &ev) {
357-
ProcessReplyFromQueue(ev);
358-
CountEvent(*ev->Get());
357+
ProcessReplyFromQueue(ev->Get());
359358

360359
TotalRecieved++;
361360
NKikimrBlobStorage::TEvVGetResult &record = ev->Get()->Record;
@@ -371,8 +370,7 @@ class TBlobStorageGroupDiscoverRequest : public TBlobStorageGroupRequestActor {
371370
}
372371

373372
void Handle(TEvBlobStorage::TEvVGetResult::TPtr &ev) {
374-
ProcessReplyFromQueue(ev);
375-
CountEvent(*ev->Get());
373+
ProcessReplyFromQueue(ev->Get());
376374

377375
TotalRecieved++;
378376
NKikimrBlobStorage::TEvVGetResult &record = ev->Get()->Record;
@@ -708,7 +706,6 @@ class TBlobStorageGroupDiscoverRequest : public TBlobStorageGroupRequestActor {
708706
<< " node# " << vdisk.NodeId()
709707
<< " msg# " << msg->ToString()
710708
<< " cookie# " << cookie);
711-
CountEvent(*msg);
712709
SendToQueue(std::move(msg), cookie);
713710
TotalSent++;
714711

@@ -934,7 +931,6 @@ class TBlobStorageGroupDiscoverRequest : public TBlobStorageGroupRequestActor {
934931
<< " msg# " << msg->ToString()
935932
<< " cookie# " << cookie
936933
<< " ForceBlockedGeneration# " << msg->Record.GetForceBlockedGeneration());
937-
CountEvent(*msg);
938934
SendToQueue(std::move(msg), cookie);
939935
TotalSent++;
940936

ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -525,16 +525,14 @@ class TBlobStorageGroupMirror3dcDiscoverRequest : public TBlobStorageGroupReques
525525
for (auto& msg : Msgs) {
526526
A_LOG_DEBUG_S("DSPDM07", "sending TEvVGet# " << msg->ToString());
527527

528-
CountEvent(*msg);
529528
SendToQueue(std::move(msg), 0);
530529
++RequestsInFlight;
531530
}
532531
Msgs.clear();
533532
}
534533

535534
void Handle(TEvBlobStorage::TEvVGetResult::TPtr& ev) {
536-
ProcessReplyFromQueue(ev);
537-
CountEvent(*ev->Get());
535+
ProcessReplyFromQueue(ev->Get());
538536

539537
Y_ABORT_UNLESS(RequestsInFlight > 0);
540538
--RequestsInFlight;
@@ -684,7 +682,7 @@ class TBlobStorageGroupMirror3dcDiscoverRequest : public TBlobStorageGroupReques
684682
}
685683

686684
void Handle(TEvBlobStorage::TEvVGetBlockResult::TPtr& ev) {
687-
ProcessReplyFromQueue(ev);
685+
ProcessReplyFromQueue(ev->Get());
688686
Y_ABORT_UNLESS(RequestsInFlight > 0);
689687
--RequestsInFlight;
690688

ydb/core/blobstorage/dsproxy/dsproxy_discover_m3of4.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ class TBlobStorageGroupMirror3of4DiscoverRequest : public TBlobStorageGroupReque
165165
}
166166

167167
void Handle(TEvBlobStorage::TEvVGetResult::TPtr ev) {
168-
ProcessReplyFromQueue(ev);
168+
ProcessReplyFromQueue(ev->Get());
169169
auto& record = ev->Get()->Record;
170170
if (!record.HasStatus() || !record.HasVDiskID()) {
171171
return ReplyAndDie(NKikimrProto::ERROR, "incorrect TEvVGetResult from VDisk");

ydb/core/blobstorage/dsproxy/dsproxy_get.cpp

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,9 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor {
103103
TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> &vPuts) {
104104
ReportBytes(GetImpl.GrabBytesToReport());
105105
RequestsSent += vGets.size() + vPuts.size();
106-
CountPuts(vPuts);
106+
for (const auto& vPut : vPuts) {
107+
CountPut(vPut->GetBufferBytes());
108+
}
107109
if (vPuts.size()) {
108110
if (!IsPutStarted) {
109111
IsPutStarted = true;
@@ -136,8 +138,14 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor {
136138
}
137139
DiskCounters[orderNumber].Sent++;
138140
}
139-
SendToQueues(vGets, false);
140-
SendToQueues(vPuts, false);
141+
for (auto& ev : vGets) {
142+
const ui64 cookie = ev->Record.GetCookie();
143+
SendToQueue(std::move(ev), cookie);
144+
}
145+
for (auto& ev : vPuts) {
146+
const ui64 cookie = ev->Record.GetCookie();
147+
SendToQueue(std::move(ev), cookie);
148+
}
141149
}
142150

143151
ui32 CountDisksWithActiveRequests() {
@@ -161,8 +169,7 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor {
161169
}
162170

163171
void Handle(TEvBlobStorage::TEvVGetResult::TPtr &ev) {
164-
ProcessReplyFromQueue(ev);
165-
CountEvent(*ev->Get());
172+
ProcessReplyFromQueue(ev->Get());
166173

167174
const ui64 cyclesPerUs = NHPTimer::GetCyclesPerSecond() / 1000000;
168175
ev->Get()->Record.MutableTimestamps()->SetReceivedByDSProxyUs(GetCycleCountFast() / cyclesPerUs);
@@ -251,7 +258,7 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor {
251258
}
252259

253260
void Handle(TEvBlobStorage::TEvVPutResult::TPtr &ev) {
254-
ProcessReplyFromQueue(ev);
261+
ProcessReplyFromQueue(ev->Get());
255262
HandleVPutResult(ev);
256263
}
257264

ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,7 @@ class TBlobStorageGroupIndexRestoreGetRequest : public TBlobStorageGroupRequestA
7575
}
7676

7777
void Handle(TEvBlobStorage::TEvVGetResult::TPtr &ev) {
78-
ProcessReplyFromQueue(ev);
79-
CountEvent(*ev->Get());
78+
ProcessReplyFromQueue(ev->Get());
8079

8180
const NKikimrBlobStorage::TEvVGetResult &record = ev->Get()->Record;
8281

@@ -324,7 +323,6 @@ class TBlobStorageGroupIndexRestoreGetRequest : public TBlobStorageGroupRequestA
324323
auto sendQuery = [&] {
325324
if (vget) {
326325
const ui64 cookie = TVDiskIdShort(vd).GetRaw();
327-
CountEvent(*vget);
328326
SendToQueue(std::move(vget), cookie);
329327
vget.reset();
330328
++VGetsInFlight;

0 commit comments

Comments
 (0)