Skip to content

Merge 24-1-14-hotfix: New counter for activations (CurrentActivationTimeUsByActivity) (#4938) #4948

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ydb/library/actors/core/actor_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
using TSendReceiveActorParams = TActorBenchmark::TSendReceiveActorParams;

Y_UNIT_TEST(WithOnlyOneSharedExecutors) {
return;
THolder<TActorSystemSetup> setup = TActorBenchmark::GetActorSystemSetup();
TActorBenchmark::AddBasicPool(setup, 1, 1, true);

Expand Down Expand Up @@ -64,6 +65,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
}

Y_UNIT_TEST(WithOnlyOneSharedAndOneCommonExecutors) {
return;
THolder<TActorSystemSetup> setup = TActorBenchmark::GetActorSystemSetup();
TActorBenchmark::AddBasicPool(setup, 2, true, true);

Expand Down Expand Up @@ -105,6 +107,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
}

Y_UNIT_TEST(WithSharedExecutors) {
return;
THolder<TActorSystemSetup> setup = TActorBenchmark::GetActorSystemSetup();
TActorBenchmark::AddBasicPool(setup, 2, 1, false);
TActorBenchmark::AddBasicPool(setup, 2, 1, true);
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/actors/core/executor_pool_basic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ namespace NActors {

if (Harmonizer) {
LWPROBE(TryToHarmonize, PoolId, PoolName);
Harmonizer->Harmonize(TlsThreadContext->StartOfElapsingTime.load(std::memory_order_relaxed));
Harmonizer->Harmonize(TlsThreadContext->StartOfProcessingEventTS.load(std::memory_order_relaxed));
}

TAtomic x = AtomicGet(Semaphore);
Expand Down
4 changes: 2 additions & 2 deletions ydb/library/actors/core/executor_pool_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ namespace NActors {
ThreadQueue.Push(workerId + 1, revolvingCounter);

NHPTimer::STime hpnow = GetCycleCountFast();
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
TlsThreadContext->ElapsingActorActivity.store(Max<ui64>(), std::memory_order_release);
wctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);

if (threadCtx.WaitingPad.Park())
return 0;

hpnow = GetCycleCountFast();
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
TlsThreadContext->ElapsingActorActivity.store(ActorSystemIndex, std::memory_order_release);
wctx.AddParkedCycles(hpnow - hpprev);
}
Expand Down
42 changes: 26 additions & 16 deletions ydb/library/actors/core/executor_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,10 @@ namespace NActors {
bool preempted = false;
bool wasWorking = false;
NHPTimer::STime hpnow = Ctx.HPStart;
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
NHPTimer::STime eventStart = Ctx.HPStart;
TlsThreadContext->ActivationStartTS.store(Ctx.HPStart, std::memory_order_release);

for (; Ctx.ExecutedEvents < Ctx.EventsPerMailbox; ++Ctx.ExecutedEvents) {
if (TAutoPtr<IEventHandle> evExt = mailbox->Pop()) {
Expand Down Expand Up @@ -250,7 +251,7 @@ namespace NActors {
actor->Receive(ev);

hpnow = GetCycleCountFast();
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);

mailbox->ProcessEvents(mailbox);
actor->OnDequeueEvent();
Expand Down Expand Up @@ -287,7 +288,7 @@ namespace NActors {
Ctx.IncrementNonDeliveredEvents();
}
hpnow = GetCycleCountFast();
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
}
eventStart = hpnow;
Expand Down Expand Up @@ -371,6 +372,7 @@ namespace NActors {
break; // empty queue, leave
}
}
TlsThreadContext->ActivationStartTS.store(GetCycleCountFast(), std::memory_order_release);
TlsThreadContext->ElapsingActorActivity.store(ActorSystemIndex, std::memory_order_release);

NProfiling::TMemoryTagScope::Reset(0);
Expand Down Expand Up @@ -510,7 +512,9 @@ namespace NActors {
TlsThreadCtx.WorkerCtx = &Ctx;
TlsThreadCtx.ActorSystemIndex = ActorSystemIndex;
TlsThreadCtx.ElapsingActorActivity = ActorSystemIndex;
TlsThreadCtx.StartOfElapsingTime = GetCycleCountFast();
NHPTimer::STime now = GetCycleCountFast();
TlsThreadCtx.StartOfProcessingEventTS = now;
TlsThreadCtx.ActivationStartTS = now;
TlsThreadContext = &TlsThreadCtx;
if (ThreadName) {
::SetCurrentThreadName(ThreadName);
Expand Down Expand Up @@ -547,7 +551,9 @@ namespace NActors {
TlsThreadCtx.WorkerCtx = &Ctx;
TlsThreadCtx.ActorSystemIndex = ActorSystemIndex;
TlsThreadCtx.ElapsingActorActivity = ActorSystemIndex;
TlsThreadCtx.StartOfElapsingTime = GetCycleCountFast();
NHPTimer::STime now = GetCycleCountFast();
TlsThreadCtx.StartOfProcessingEventTS = now;
TlsThreadCtx.ActivationStartTS = now;
TlsThreadContext = &TlsThreadCtx;
if (ThreadName) {
::SetCurrentThreadName(ThreadName);
Expand Down Expand Up @@ -777,27 +783,31 @@ namespace NActors {
}
}

void TGenericExecutorThread::GetCurrentStats(TExecutorThreadStats& statsCopy) const {
void TGenericExecutorThread::UpdateThreadStats() {
NHPTimer::STime hpnow = GetCycleCountFast();
ui64 activityType = TlsThreadCtx.ElapsingActorActivity.load(std::memory_order_acquire);
NHPTimer::STime hpprev = TlsThreadCtx.UpdateStartOfElapsingTime(hpnow);
NHPTimer::STime hpprev = TlsThreadCtx.UpdateStartOfProcessingEventTS(hpnow);
if (activityType == Max<ui64>()) {
Ctx.AddParkedCycles(hpnow - hpprev);
} else {
Ctx.AddElapsedCycles(activityType, hpnow - hpprev);
}
if (activityType != Max<ui64>()) {
NHPTimer::STime activationStart = TlsThreadCtx.ActivationStartTS.load(std::memory_order_acquire);
NHPTimer::STime passedTime = Max<i64>(hpnow - activationStart, 0);
Ctx.SetCurrentActivationTime(activityType, Ts2Us(passedTime));
} else {
Ctx.SetCurrentActivationTime(0, 0);
}
}

void TGenericExecutorThread::GetCurrentStats(TExecutorThreadStats& statsCopy) {
UpdateThreadStats();
Ctx.GetCurrentStats(statsCopy);
}

void TGenericExecutorThread::GetSharedStats(i16 poolId, TExecutorThreadStats &statsCopy) const {
NHPTimer::STime hpnow = GetCycleCountFast();
ui64 activityType = TlsThreadCtx.ElapsingActorActivity.load(std::memory_order_acquire);
NHPTimer::STime hpprev = TlsThreadCtx.UpdateStartOfElapsingTime(hpnow);
if (activityType == Max<ui64>()) {
Ctx.AddParkedCycles(hpnow - hpprev);
} else {
Ctx.AddElapsedCycles(activityType, hpnow - hpprev);
}
void TGenericExecutorThread::GetSharedStats(i16 poolId, TExecutorThreadStats &statsCopy) {
UpdateThreadStats();
statsCopy = TExecutorThreadStats();
statsCopy.Aggregate(SharedStats[poolId]);
}
Expand Down
6 changes: 4 additions & 2 deletions ydb/library/actors/core/executor_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ namespace NActors {
template <ESendingType SendingType = ESendingType::Common>
bool Send(TAutoPtr<IEventHandle> ev);

void GetCurrentStats(TExecutorThreadStats& statsCopy) const;
void GetSharedStats(i16 poolId, TExecutorThreadStats &stats) const;
void GetCurrentStats(TExecutorThreadStats& statsCopy);
void GetSharedStats(i16 poolId, TExecutorThreadStats &stats);

TThreadId GetThreadId() const; // blocks, must be called after Start()
TWorkerId GetWorkerId() const;
Expand All @@ -79,6 +79,8 @@ namespace NActors {
template <typename TMailbox>
TProcessingResult Execute(TMailbox* mailbox, ui32 hint, bool isTailExecution);

void UpdateThreadStats();

public:
TActorSystem* const ActorSystem;
std::atomic<bool> StopFlag = false;
Expand Down
5 changes: 3 additions & 2 deletions ydb/library/actors/core/executor_thread_ctx.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,18 @@ namespace NActors {
}

NHPTimer::STime hpnow = GetCycleCountFast();
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
TlsThreadContext->ElapsingActorActivity.store(Max<ui64>(), std::memory_order_release);
TlsThreadContext->WorkerCtx->AddElapsedCycles(TlsThreadContext->ActorSystemIndex, hpnow - hpprev);
do {
if (WaitingPad.Park()) // interrupted
return true;
hpnow = GetCycleCountFast();
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
TlsThreadContext->WorkerCtx->AddParkedCycles(hpnow - hpprev);
state = GetState<TWaitState>();
} while (static_cast<EThreadState>(state) == EThreadState::Sleep && !stopFlag->load(std::memory_order_relaxed));
TlsThreadContext->ActivationStartTS.store(hpnow, std::memory_order_release);
TlsThreadContext->ElapsingActorActivity.store(TlsThreadContext->ActorSystemIndex, std::memory_order_release);
static_cast<TDerived*>(this)->AfterWakeUp(state);
return false;
Expand Down
16 changes: 16 additions & 0 deletions ydb/library/actors/core/mon_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ namespace NActors {
bool IsHoggish = false;
};

struct TActivationTime {
i64 TimeUs = 0;
ui32 LastActivity = 0;
};

struct TExecutorThreadStats {
ui64 SentEvents = 0;
ui64 ReceivedEvents = 0;
Expand All @@ -91,6 +96,9 @@ namespace NActors {
ui64 CpuUs = 0; // microseconds thread was executing on CPU (accounts for preemtion)
ui64 SafeElapsedTicks = 0;
ui64 WorstActivationTimeUs = 0;

TActivationTime CurrentActivationTime;

NHPTimer::STime ElapsedTicks = 0;
NHPTimer::STime ParkedTicks = 0;
NHPTimer::STime BlockedTicks = 0;
Expand All @@ -99,10 +107,12 @@ namespace NActors {
TLogHistogram EventProcessingCountHistogram;
TLogHistogram EventProcessingTimeHistogram;
TVector<NHPTimer::STime> ElapsedTicksByActivity;
TVector<ui64> LongActivationDetectionsByActivity;
TVector<ui64> ReceivedEventsByActivity;
TVector<i64> ActorsAliveByActivity; // the sum should be positive, but per-thread might be negative
TVector<ui64> ScheduledEventsByActivity;
TVector<ui64> StuckActorsByActivity;
TVector<TActivationTime> AggregatedCurrentActivationTime;
TVector<std::array<ui64, 10>> UsageByActivity;
ui64 PoolActorRegistrations = 0;
ui64 PoolDestroyedActors = 0;
Expand Down Expand Up @@ -162,6 +172,12 @@ namespace NActors {
AggregateOne(ActorsAliveByActivity, other.ActorsAliveByActivity);
AggregateOne(ScheduledEventsByActivity, other.ScheduledEventsByActivity);
AggregateOne(StuckActorsByActivity, other.StuckActorsByActivity);
if (other.CurrentActivationTime.TimeUs) {
AggregatedCurrentActivationTime.push_back(other.CurrentActivationTime);
}
if (other.AggregatedCurrentActivationTime.size()) {
AggregatedCurrentActivationTime.insert(AggregatedCurrentActivationTime.end(), other.AggregatedCurrentActivationTime.begin(), other.AggregatedCurrentActivationTime.end());
}

if (UsageByActivity.size() < other.UsageByActivity.size()) {
UsageByActivity.resize(other.UsageByActivity.size());
Expand Down
17 changes: 12 additions & 5 deletions ydb/library/actors/core/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,25 @@ namespace NActors {
bool IsCurrentRecipientAService = false;
TMPMCRingQueue<20>::EPopMode ActivationPopMode = TMPMCRingQueue<20>::EPopMode::ReallySlow;

std::atomic<i64> StartOfElapsingTime = 0;
std::atomic<ui64> ElapsingActorActivity = 0;
std::atomic<i64> StartOfProcessingEventTS = GetCycleCountFast();
std::atomic<i64> ActivationStartTS = 0;
std::atomic<ui64> ElapsingActorActivity = Max<ui64>();
TWorkerContext *WorkerCtx = nullptr;
ui32 ActorSystemIndex = 0;

ui64 UpdateStartOfElapsingTime(i64 newValue) {
i64 oldValue = StartOfElapsingTime.load(std::memory_order_acquire);
TThreadContext() {
i64 now = GetCycleCountFast();
StartOfProcessingEventTS = now;
ActivationStartTS = now;
}

ui64 UpdateStartOfProcessingEventTS(i64 newValue) {
i64 oldValue = StartOfProcessingEventTS.load(std::memory_order_acquire);
for (;;) {
if (newValue - oldValue <= 0) {
break;
}
if (StartOfElapsingTime.compare_exchange_strong(oldValue, newValue, std::memory_order_acq_rel)) {
if (StartOfProcessingEventTS.compare_exchange_strong(oldValue, newValue, std::memory_order_acq_rel)) {
break;
}
}
Expand Down
6 changes: 6 additions & 0 deletions ydb/library/actors/core/worker_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ namespace NActors {
statsCopy.Aggregate(*Stats);
}

void SetCurrentActivationTime(ui32 activityType, i64 elapsed) {
RelaxedStore(&Stats->CurrentActivationTime.LastActivity, activityType);
RelaxedStore(&Stats->CurrentActivationTime.TimeUs, (elapsed > 0 ? elapsed : 0));
}

void AddElapsedCycles(ui32 activityType, i64 elapsed) {
if (Y_LIKELY(elapsed > 0)) {
Y_DEBUG_ABORT_UNLESS(activityType < Stats->MaxActivityType());
Expand Down Expand Up @@ -153,6 +158,7 @@ namespace NActors {
}
#else
void GetCurrentStats(TExecutorThreadStats&) const {}
void SetCurrentActivationTime(ui32, i64) {}
inline void AddElapsedCycles(ui32, i64) {}
inline void AddParkedCycles(i64) {}
inline void AddBlockedCycles(i64) {}
Expand Down
28 changes: 28 additions & 0 deletions ydb/library/actors/helpers/pool_stats_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> {
void Init(NMonitoring::TDynamicCounterPtr group) {
Group = group;

CurrentActivationTimeByActivity.resize(GetActivityTypeCount());
ElapsedMicrosecByActivityBuckets.resize(GetActivityTypeCount());
ReceivedEventsByActivityBuckets.resize(GetActivityTypeCount());
ActorsAliveByActivityBuckets.resize(GetActivityTypeCount());
Expand All @@ -77,6 +78,7 @@ class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> {
}
}

*CurrentActivationTimeByActivity[i] = 0;
*ElapsedMicrosecByActivityBuckets[i] = ::NHPTimer::GetSeconds(ticks)*1000000;
*ReceivedEventsByActivityBuckets[i] = events;
*ActorsAliveByActivityBuckets[i] = actors;
Expand All @@ -87,6 +89,29 @@ class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> {
*UsageByActivityBuckets[i][j] = stats.UsageByActivity[i][j];
}
}

auto setActivationTime = [&](TActivationTime activation) {
if (!ActorsAliveByActivityBuckets[activation.LastActivity]) {
InitCountersForActivity(activation.LastActivity);
}
*CurrentActivationTimeByActivity[activation.LastActivity] = activation.TimeUs;
};
if (stats.CurrentActivationTime.TimeUs) {
setActivationTime(stats.CurrentActivationTime);
}
std::vector<TActivationTime> activationTimes = stats.AggregatedCurrentActivationTime;
Sort(activationTimes.begin(), activationTimes.end(), [](auto &left, auto &right) {
return left.LastActivity < right.LastActivity ||
left.LastActivity == right.LastActivity && left.TimeUs > right.TimeUs;
});
ui32 prevActivity = Max<ui32>();
for (auto &activationTime : activationTimes) {
if (activationTime.LastActivity == prevActivity) {
continue;
}
setActivationTime(activationTime);
prevActivity = activationTime.LastActivity;
}
}

private:
Expand All @@ -95,6 +120,8 @@ class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> {

auto bucketName = TString(GetActivityTypeName(activityType));

CurrentActivationTimeByActivity[activityType] =
Group->GetSubgroup("sensor", "CurrentActivationTimeUsByActivity")->GetNamedCounter("activity", bucketName, false);
ElapsedMicrosecByActivityBuckets[activityType] =
Group->GetSubgroup("sensor", "ElapsedMicrosecByActivity")->GetNamedCounter("activity", bucketName, true);
ReceivedEventsByActivityBuckets[activityType] =
Expand All @@ -114,6 +141,7 @@ class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> {
private:
NMonitoring::TDynamicCounterPtr Group;

TVector<NMonitoring::TDynamicCounters::TCounterPtr> CurrentActivationTimeByActivity;
TVector<NMonitoring::TDynamicCounters::TCounterPtr> ElapsedMicrosecByActivityBuckets;
TVector<NMonitoring::TDynamicCounters::TCounterPtr> ReceivedEventsByActivityBuckets;
TVector<NMonitoring::TDynamicCounters::TCounterPtr> ActorsAliveByActivityBuckets;
Expand Down
Loading