6
6
7
7
#include < ydb/library/yverify_stream/yverify_stream.h>
8
8
#include < ydb/core/util/lz4_data_generator.h>
9
+ #include < ydb/core/jaeger_tracing/throttler.h>
9
10
10
11
#include < google/protobuf/text_format.h>
11
12
@@ -445,6 +446,8 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
445
446
// There is no point in having more than 1 active garbage collection request at the moment
446
447
constexpr static ui32 MaxGarbageCollectionsInFlight = 1 ;
447
448
449
+ TIntrusivePtr<NJaegerTracing::TThrottler> TracingThrottler;
450
+
448
451
public:
449
452
TTabletWriter (TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
450
453
TLogWriterLoadTestActor& self, ui64 tabletId, ui32 channel,
@@ -453,7 +456,8 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
453
456
NKikimrBlobStorage::EGetHandleClass getHandleClass, const TRequestDispatchingSettings& readSettings,
454
457
TIntervalGenerator garbageCollectIntervalGen,
455
458
TDuration scriptedRoundDuration, TVector<TReqInfo>&& scriptedRequests,
456
- const TInitialAllocation& initialAllocation)
459
+ const TInitialAllocation& initialAllocation,
460
+ const TIntrusivePtr<NJaegerTracing::TThrottler>& tracingThrottler)
457
461
: Self(self)
458
462
, TagCounters(counters->GetSubgroup (" tag" , Sprintf(" %" PRIu64, Self.Tag)))
459
463
, Counters(TagCounters->GetSubgroup (" channel" , Sprintf(" %" PRIu32, channel)))
@@ -491,6 +495,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
491
495
, ScriptedRequests(std::move(scriptedRequests))
492
496
, InitialAllocation(initialAllocation)
493
497
, GarbageCollectIntervalGen(garbageCollectIntervalGen)
498
+ , TracingThrottler(tracingThrottler)
494
499
{
495
500
*Counters->GetCounter (" tabletId" ) = tabletId;
496
501
const auto & percCounters = Counters->GetSubgroup (" sensor" , " microseconds" );
@@ -923,7 +928,13 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
923
928
IssueReadIfPossible (ctx);
924
929
}
925
930
};
926
- SendToBSProxy (ctx, GroupId, ev.release (), Self.QueryDispatcher .ObtainCookie (std::move (writeCallback)));
931
+
932
+ NWilson::TTraceId traceId = (TracingThrottler && !TracingThrottler->Throttle ())
933
+ ? NWilson::TTraceId::NewTraceId (15 , ::Max<ui32>())
934
+ : NWilson::TTraceId{};
935
+
936
+ SendToBSProxy (ctx, GroupId, ev.release (), Self.QueryDispatcher .ObtainCookie (std::move (writeCallback)),
937
+ std::move (traceId));
927
938
const auto nowCycles = GetCycleCountFast ();
928
939
WritesInFlightTimestamps.emplace_back (writeQueryId, nowCycles);
929
940
SentTimestamp.emplace (writeQueryId, nowCycles);
@@ -1073,7 +1084,12 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
1073
1084
IssueReadIfPossible (ctx);
1074
1085
};
1075
1086
1076
- SendToBSProxy (ctx, GroupId, ev.release (), Self.QueryDispatcher .ObtainCookie (std::move (readCallback)));
1087
+ NWilson::TTraceId traceId = (TracingThrottler && !TracingThrottler->Throttle ())
1088
+ ? NWilson::TTraceId::NewTraceId (15 , ::Max<ui32>())
1089
+ : NWilson::TTraceId{};
1090
+
1091
+ SendToBSProxy (ctx, GroupId, ev.release (), Self.QueryDispatcher .ObtainCookie (std::move (readCallback)),
1092
+ std::move (traceId));
1077
1093
ReadSentTimestamp.emplace (readQueryId, GetCycleCountFast ());
1078
1094
1079
1095
ReadSettings.InFlightTracker .Request (size);
@@ -1211,6 +1227,14 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
1211
1227
1212
1228
TIntervalGenerator garbageCollectIntervalGen (profile.GetFlushIntervals ());
1213
1229
1230
+ TIntrusivePtr<NJaegerTracing::TThrottler> tracingThrottler;
1231
+
1232
+ ui32 throttlerRate = profile.GetTracingThrottlerRate ();
1233
+ if (throttlerRate) {
1234
+ tracingThrottler = MakeIntrusive<NJaegerTracing::TThrottler>(throttlerRate, profile.GetTracingThrottlerBurst (),
1235
+ TAppData::TimeProvider);
1236
+ }
1237
+
1214
1238
for (const auto & tablet : profile.GetTablets ()) {
1215
1239
auto scriptedRoundDuration = TDuration::MicroSeconds (tablet.GetScriptedCycleDurationSec () * 1e6 );
1216
1240
TVector<TReqInfo> scriptedRequests;
@@ -1256,7 +1280,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
1256
1280
getHandleClass, readSettings,
1257
1281
garbageCollectIntervalGen,
1258
1282
scriptedRoundDuration, std::move (scriptedRequests),
1259
- initialAllocation));
1283
+ initialAllocation, tracingThrottler ));
1260
1284
1261
1285
WorkersInInitialState++;
1262
1286
}
0 commit comments