Skip to content

Add tracing to load actor #3395

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions ydb/core/load_test/group_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <ydb/library/yverify_stream/yverify_stream.h>
#include <ydb/core/util/lz4_data_generator.h>
#include <ydb/core/jaeger_tracing/throttler.h>

#include <google/protobuf/text_format.h>

Expand Down Expand Up @@ -445,6 +446,8 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
// There is no point in having more than 1 active garbage collection request at the moment
constexpr static ui32 MaxGarbageCollectionsInFlight = 1;

TIntrusivePtr<NJaegerTracing::TThrottler> TracingThrottler;

public:
TTabletWriter(TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
TLogWriterLoadTestActor& self, ui64 tabletId, ui32 channel,
Expand All @@ -453,7 +456,8 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
NKikimrBlobStorage::EGetHandleClass getHandleClass, const TRequestDispatchingSettings& readSettings,
TIntervalGenerator garbageCollectIntervalGen,
TDuration scriptedRoundDuration, TVector<TReqInfo>&& scriptedRequests,
const TInitialAllocation& initialAllocation)
const TInitialAllocation& initialAllocation,
const TIntrusivePtr<NJaegerTracing::TThrottler>& tracingThrottler)
: Self(self)
, TagCounters(counters->GetSubgroup("tag", Sprintf("%" PRIu64, Self.Tag)))
, Counters(TagCounters->GetSubgroup("channel", Sprintf("%" PRIu32, channel)))
Expand Down Expand Up @@ -491,6 +495,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
, ScriptedRequests(std::move(scriptedRequests))
, InitialAllocation(initialAllocation)
, GarbageCollectIntervalGen(garbageCollectIntervalGen)
, TracingThrottler(tracingThrottler)
{
*Counters->GetCounter("tabletId") = tabletId;
const auto& percCounters = Counters->GetSubgroup("sensor", "microseconds");
Expand Down Expand Up @@ -923,7 +928,13 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
IssueReadIfPossible(ctx);
}
};
SendToBSProxy(ctx, GroupId, ev.release(), Self.QueryDispatcher.ObtainCookie(std::move(writeCallback)));

NWilson::TTraceId traceId = (TracingThrottler && !TracingThrottler->Throttle())
? NWilson::TTraceId::NewTraceId(15, ::Max<ui32>())
: NWilson::TTraceId{};

SendToBSProxy(ctx, GroupId, ev.release(), Self.QueryDispatcher.ObtainCookie(std::move(writeCallback)),
std::move(traceId));
const auto nowCycles = GetCycleCountFast();
WritesInFlightTimestamps.emplace_back(writeQueryId, nowCycles);
SentTimestamp.emplace(writeQueryId, nowCycles);
Expand Down Expand Up @@ -1073,7 +1084,12 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
IssueReadIfPossible(ctx);
};

SendToBSProxy(ctx, GroupId, ev.release(), Self.QueryDispatcher.ObtainCookie(std::move(readCallback)));
NWilson::TTraceId traceId = (TracingThrottler && !TracingThrottler->Throttle())
? NWilson::TTraceId::NewTraceId(15, ::Max<ui32>())
: NWilson::TTraceId{};

SendToBSProxy(ctx, GroupId, ev.release(), Self.QueryDispatcher.ObtainCookie(std::move(readCallback)),
std::move(traceId));
ReadSentTimestamp.emplace(readQueryId, GetCycleCountFast());

ReadSettings.InFlightTracker.Request(size);
Expand Down Expand Up @@ -1211,6 +1227,14 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo

TIntervalGenerator garbageCollectIntervalGen(profile.GetFlushIntervals());

TIntrusivePtr<NJaegerTracing::TThrottler> tracingThrottler;

ui32 throttlerRate = profile.GetTracingThrottlerRate();
if (throttlerRate) {
tracingThrottler = MakeIntrusive<NJaegerTracing::TThrottler>(throttlerRate, profile.GetTracingThrottlerBurst(),
TAppData::TimeProvider);
}

for (const auto& tablet : profile.GetTablets()) {
auto scriptedRoundDuration = TDuration::MicroSeconds(tablet.GetScriptedCycleDurationSec() * 1e6);
TVector<TReqInfo> scriptedRequests;
Expand Down Expand Up @@ -1256,7 +1280,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
getHandleClass, readSettings,
garbageCollectIntervalGen,
scriptedRoundDuration, std::move(scriptedRequests),
initialAllocation));
initialAllocation, tracingThrottler));

WorkersInInitialState++;
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/load_test/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ PEERDIR(
ydb/core/blobstorage/pdisk
ydb/core/control
ydb/core/keyvalue
ydb/core/jaeger_tracing
ydb/core/kqp/common
ydb/core/kqp/rm_service
ydb/core/tx/columnshard
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/protos/load_test.proto
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ message TEvLoadTestRequest {
optional THardRateDispatcherInfo ReadHardRateDispatcher = 16;

optional TInitialBlobAllocation InitialAllocation = 17;

optional uint32 TracingThrottlerRate = 18 [default = 0];
optional uint32 TracingThrottlerBurst = 19 [default = 0];
};
optional uint64 Tag = 1;
optional uint32 DurationSeconds = 2;
Expand Down