Skip to content

Commit 53f75b6

Browse files
authored
Merge 24-1: New counter for activations (CurrentActivationTimeUsByActivity) (#4938) (#4949)
1 parent 6694845 commit 53f75b6

10 files changed

+101
-28
lines changed

ydb/library/actors/core/actor_ut.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
2525
using TSendReceiveActorParams = TActorBenchmark::TSendReceiveActorParams;
2626

2727
Y_UNIT_TEST(WithOnlyOneSharedExecutors) {
28+
return;
2829
THolder<TActorSystemSetup> setup = TActorBenchmark::GetActorSystemSetup();
2930
TActorBenchmark::AddBasicPool(setup, 1, 1, true);
3031

@@ -64,6 +65,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
6465
}
6566

6667
Y_UNIT_TEST(WithOnlyOneSharedAndOneCommonExecutors) {
68+
return;
6769
THolder<TActorSystemSetup> setup = TActorBenchmark::GetActorSystemSetup();
6870
TActorBenchmark::AddBasicPool(setup, 2, true, true);
6971

@@ -105,6 +107,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
105107
}
106108

107109
Y_UNIT_TEST(WithSharedExecutors) {
110+
return;
108111
THolder<TActorSystemSetup> setup = TActorBenchmark::GetActorSystemSetup();
109112
TActorBenchmark::AddBasicPool(setup, 2, 1, false);
110113
TActorBenchmark::AddBasicPool(setup, 2, 1, true);

ydb/library/actors/core/executor_pool_basic.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ namespace NActors {
171171

172172
if (Harmonizer) {
173173
LWPROBE(TryToHarmonize, PoolId, PoolName);
174-
Harmonizer->Harmonize(TlsThreadContext->StartOfElapsingTime.load(std::memory_order_relaxed));
174+
Harmonizer->Harmonize(TlsThreadContext->StartOfProcessingEventTS.load(std::memory_order_relaxed));
175175
}
176176

177177
TAtomic x = AtomicGet(Semaphore);

ydb/library/actors/core/executor_pool_io.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,15 @@ namespace NActors {
3636
ThreadQueue.Push(workerId + 1, revolvingCounter);
3737

3838
NHPTimer::STime hpnow = GetCycleCountFast();
39-
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
39+
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
4040
TlsThreadContext->ElapsingActorActivity.store(Max<ui64>(), std::memory_order_release);
4141
wctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
4242

4343
if (threadCtx.WaitingPad.Park())
4444
return 0;
4545

4646
hpnow = GetCycleCountFast();
47-
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
47+
hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
4848
TlsThreadContext->ElapsingActorActivity.store(ActorSystemIndex, std::memory_order_release);
4949
wctx.AddParkedCycles(hpnow - hpprev);
5050
}

ydb/library/actors/core/executor_thread.cpp

+26-16
Original file line numberDiff line numberDiff line change
@@ -200,9 +200,10 @@ namespace NActors {
200200
bool preempted = false;
201201
bool wasWorking = false;
202202
NHPTimer::STime hpnow = Ctx.HPStart;
203-
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
203+
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
204204
Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
205205
NHPTimer::STime eventStart = Ctx.HPStart;
206+
TlsThreadContext->ActivationStartTS.store(Ctx.HPStart, std::memory_order_release);
206207

207208
for (; Ctx.ExecutedEvents < Ctx.EventsPerMailbox; ++Ctx.ExecutedEvents) {
208209
if (TAutoPtr<IEventHandle> evExt = mailbox->Pop()) {
@@ -250,7 +251,7 @@ namespace NActors {
250251
actor->Receive(ev);
251252

252253
hpnow = GetCycleCountFast();
253-
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
254+
hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
254255

255256
mailbox->ProcessEvents(mailbox);
256257
actor->OnDequeueEvent();
@@ -287,7 +288,7 @@ namespace NActors {
287288
Ctx.IncrementNonDeliveredEvents();
288289
}
289290
hpnow = GetCycleCountFast();
290-
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
291+
hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
291292
Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
292293
}
293294
eventStart = hpnow;
@@ -371,6 +372,7 @@ namespace NActors {
371372
break; // empty queue, leave
372373
}
373374
}
375+
TlsThreadContext->ActivationStartTS.store(GetCycleCountFast(), std::memory_order_release);
374376
TlsThreadContext->ElapsingActorActivity.store(ActorSystemIndex, std::memory_order_release);
375377

376378
NProfiling::TMemoryTagScope::Reset(0);
@@ -510,7 +512,9 @@ namespace NActors {
510512
TlsThreadCtx.WorkerCtx = &Ctx;
511513
TlsThreadCtx.ActorSystemIndex = ActorSystemIndex;
512514
TlsThreadCtx.ElapsingActorActivity = ActorSystemIndex;
513-
TlsThreadCtx.StartOfElapsingTime = GetCycleCountFast();
515+
NHPTimer::STime now = GetCycleCountFast();
516+
TlsThreadCtx.StartOfProcessingEventTS = now;
517+
TlsThreadCtx.ActivationStartTS = now;
514518
TlsThreadContext = &TlsThreadCtx;
515519
if (ThreadName) {
516520
::SetCurrentThreadName(ThreadName);
@@ -547,7 +551,9 @@ namespace NActors {
547551
TlsThreadCtx.WorkerCtx = &Ctx;
548552
TlsThreadCtx.ActorSystemIndex = ActorSystemIndex;
549553
TlsThreadCtx.ElapsingActorActivity = ActorSystemIndex;
550-
TlsThreadCtx.StartOfElapsingTime = GetCycleCountFast();
554+
NHPTimer::STime now = GetCycleCountFast();
555+
TlsThreadCtx.StartOfProcessingEventTS = now;
556+
TlsThreadCtx.ActivationStartTS = now;
551557
TlsThreadContext = &TlsThreadCtx;
552558
if (ThreadName) {
553559
::SetCurrentThreadName(ThreadName);
@@ -777,27 +783,31 @@ namespace NActors {
777783
}
778784
}
779785

780-
void TGenericExecutorThread::GetCurrentStats(TExecutorThreadStats& statsCopy) const {
786+
void TGenericExecutorThread::UpdateThreadStats() {
781787
NHPTimer::STime hpnow = GetCycleCountFast();
782788
ui64 activityType = TlsThreadCtx.ElapsingActorActivity.load(std::memory_order_acquire);
783-
NHPTimer::STime hpprev = TlsThreadCtx.UpdateStartOfElapsingTime(hpnow);
789+
NHPTimer::STime hpprev = TlsThreadCtx.UpdateStartOfProcessingEventTS(hpnow);
784790
if (activityType == Max<ui64>()) {
785791
Ctx.AddParkedCycles(hpnow - hpprev);
786792
} else {
787793
Ctx.AddElapsedCycles(activityType, hpnow - hpprev);
788794
}
795+
if (activityType != Max<ui64>()) {
796+
NHPTimer::STime activationStart = TlsThreadCtx.ActivationStartTS.load(std::memory_order_acquire);
797+
NHPTimer::STime passedTime = Max<i64>(hpnow - activationStart, 0);
798+
Ctx.SetCurrentActivationTime(activityType, Ts2Us(passedTime));
799+
} else {
800+
Ctx.SetCurrentActivationTime(0, 0);
801+
}
802+
}
803+
804+
void TGenericExecutorThread::GetCurrentStats(TExecutorThreadStats& statsCopy) {
805+
UpdateThreadStats();
789806
Ctx.GetCurrentStats(statsCopy);
790807
}
791808

792-
void TGenericExecutorThread::GetSharedStats(i16 poolId, TExecutorThreadStats &statsCopy) const {
793-
NHPTimer::STime hpnow = GetCycleCountFast();
794-
ui64 activityType = TlsThreadCtx.ElapsingActorActivity.load(std::memory_order_acquire);
795-
NHPTimer::STime hpprev = TlsThreadCtx.UpdateStartOfElapsingTime(hpnow);
796-
if (activityType == Max<ui64>()) {
797-
Ctx.AddParkedCycles(hpnow - hpprev);
798-
} else {
799-
Ctx.AddElapsedCycles(activityType, hpnow - hpprev);
800-
}
809+
void TGenericExecutorThread::GetSharedStats(i16 poolId, TExecutorThreadStats &statsCopy) {
810+
UpdateThreadStats();
801811
statsCopy = TExecutorThreadStats();
802812
statsCopy.Aggregate(SharedStats[poolId]);
803813
}

ydb/library/actors/core/executor_thread.h

+4-2
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ namespace NActors {
6767
template <ESendingType SendingType = ESendingType::Common>
6868
bool Send(TAutoPtr<IEventHandle> ev);
6969

70-
void GetCurrentStats(TExecutorThreadStats& statsCopy) const;
71-
void GetSharedStats(i16 poolId, TExecutorThreadStats &stats) const;
70+
void GetCurrentStats(TExecutorThreadStats& statsCopy);
71+
void GetSharedStats(i16 poolId, TExecutorThreadStats &stats);
7272

7373
TThreadId GetThreadId() const; // blocks, must be called after Start()
7474
TWorkerId GetWorkerId() const;
@@ -79,6 +79,8 @@ namespace NActors {
7979
template <typename TMailbox>
8080
TProcessingResult Execute(TMailbox* mailbox, ui32 hint, bool isTailExecution);
8181

82+
void UpdateThreadStats();
83+
8284
public:
8385
TActorSystem* const ActorSystem;
8486
std::atomic<bool> StopFlag = false;

ydb/library/actors/core/executor_thread_ctx.h

+3-2
Original file line numberDiff line numberDiff line change
@@ -97,17 +97,18 @@ namespace NActors {
9797
}
9898

9999
NHPTimer::STime hpnow = GetCycleCountFast();
100-
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
100+
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
101101
TlsThreadContext->ElapsingActorActivity.store(Max<ui64>(), std::memory_order_release);
102102
TlsThreadContext->WorkerCtx->AddElapsedCycles(TlsThreadContext->ActorSystemIndex, hpnow - hpprev);
103103
do {
104104
if (WaitingPad.Park()) // interrupted
105105
return true;
106106
hpnow = GetCycleCountFast();
107-
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
107+
hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
108108
TlsThreadContext->WorkerCtx->AddParkedCycles(hpnow - hpprev);
109109
state = GetState<TWaitState>();
110110
} while (static_cast<EThreadState>(state) == EThreadState::Sleep && !stopFlag->load(std::memory_order_relaxed));
111+
TlsThreadContext->ActivationStartTS.store(hpnow, std::memory_order_release);
111112
TlsThreadContext->ElapsingActorActivity.store(TlsThreadContext->ActorSystemIndex, std::memory_order_release);
112113
static_cast<TDerived*>(this)->AfterWakeUp(state);
113114
return false;

ydb/library/actors/core/mon_stats.h

+16
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,11 @@ namespace NActors {
8282
bool IsHoggish = false;
8383
};
8484

85+
struct TActivationTime {
86+
i64 TimeUs = 0;
87+
ui32 LastActivity = 0;
88+
};
89+
8590
struct TExecutorThreadStats {
8691
ui64 SentEvents = 0;
8792
ui64 ReceivedEvents = 0;
@@ -91,6 +96,9 @@ namespace NActors {
9196
ui64 CpuUs = 0; // microseconds thread was executing on CPU (accounts for preemtion)
9297
ui64 SafeElapsedTicks = 0;
9398
ui64 WorstActivationTimeUs = 0;
99+
100+
TActivationTime CurrentActivationTime;
101+
94102
NHPTimer::STime ElapsedTicks = 0;
95103
NHPTimer::STime ParkedTicks = 0;
96104
NHPTimer::STime BlockedTicks = 0;
@@ -99,10 +107,12 @@ namespace NActors {
99107
TLogHistogram EventProcessingCountHistogram;
100108
TLogHistogram EventProcessingTimeHistogram;
101109
TVector<NHPTimer::STime> ElapsedTicksByActivity;
110+
TVector<ui64> LongActivationDetectionsByActivity;
102111
TVector<ui64> ReceivedEventsByActivity;
103112
TVector<i64> ActorsAliveByActivity; // the sum should be positive, but per-thread might be negative
104113
TVector<ui64> ScheduledEventsByActivity;
105114
TVector<ui64> StuckActorsByActivity;
115+
TVector<TActivationTime> AggregatedCurrentActivationTime;
106116
TVector<std::array<ui64, 10>> UsageByActivity;
107117
ui64 PoolActorRegistrations = 0;
108118
ui64 PoolDestroyedActors = 0;
@@ -162,6 +172,12 @@ namespace NActors {
162172
AggregateOne(ActorsAliveByActivity, other.ActorsAliveByActivity);
163173
AggregateOne(ScheduledEventsByActivity, other.ScheduledEventsByActivity);
164174
AggregateOne(StuckActorsByActivity, other.StuckActorsByActivity);
175+
if (other.CurrentActivationTime.TimeUs) {
176+
AggregatedCurrentActivationTime.push_back(other.CurrentActivationTime);
177+
}
178+
if (other.AggregatedCurrentActivationTime.size()) {
179+
AggregatedCurrentActivationTime.insert(AggregatedCurrentActivationTime.end(), other.AggregatedCurrentActivationTime.begin(), other.AggregatedCurrentActivationTime.end());
180+
}
165181

166182
if (UsageByActivity.size() < other.UsageByActivity.size()) {
167183
UsageByActivity.resize(other.UsageByActivity.size());

ydb/library/actors/core/thread_context.h

+12-5
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,25 @@ namespace NActors {
3030
bool IsCurrentRecipientAService = false;
3131
TMPMCRingQueue<20>::EPopMode ActivationPopMode = TMPMCRingQueue<20>::EPopMode::ReallySlow;
3232

33-
std::atomic<i64> StartOfElapsingTime = 0;
34-
std::atomic<ui64> ElapsingActorActivity = 0;
33+
std::atomic<i64> StartOfProcessingEventTS = GetCycleCountFast();
34+
std::atomic<i64> ActivationStartTS = 0;
35+
std::atomic<ui64> ElapsingActorActivity = Max<ui64>();
3536
TWorkerContext *WorkerCtx = nullptr;
3637
ui32 ActorSystemIndex = 0;
3738

38-
ui64 UpdateStartOfElapsingTime(i64 newValue) {
39-
i64 oldValue = StartOfElapsingTime.load(std::memory_order_acquire);
39+
TThreadContext() {
40+
i64 now = GetCycleCountFast();
41+
StartOfProcessingEventTS = now;
42+
ActivationStartTS = now;
43+
}
44+
45+
ui64 UpdateStartOfProcessingEventTS(i64 newValue) {
46+
i64 oldValue = StartOfProcessingEventTS.load(std::memory_order_acquire);
4047
for (;;) {
4148
if (newValue - oldValue <= 0) {
4249
break;
4350
}
44-
if (StartOfElapsingTime.compare_exchange_strong(oldValue, newValue, std::memory_order_acq_rel)) {
51+
if (StartOfProcessingEventTS.compare_exchange_strong(oldValue, newValue, std::memory_order_acq_rel)) {
4552
break;
4653
}
4754
}

ydb/library/actors/core/worker_context.h

+6
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ namespace NActors {
5050
statsCopy.Aggregate(*Stats);
5151
}
5252

53+
void SetCurrentActivationTime(ui32 activityType, i64 elapsed) {
54+
RelaxedStore(&Stats->CurrentActivationTime.LastActivity, activityType);
55+
RelaxedStore(&Stats->CurrentActivationTime.TimeUs, (elapsed > 0 ? elapsed : 0));
56+
}
57+
5358
void AddElapsedCycles(ui32 activityType, i64 elapsed) {
5459
if (Y_LIKELY(elapsed > 0)) {
5560
Y_DEBUG_ABORT_UNLESS(activityType < Stats->MaxActivityType());
@@ -153,6 +158,7 @@ namespace NActors {
153158
}
154159
#else
155160
void GetCurrentStats(TExecutorThreadStats&) const {}
161+
void SetCurrentActivationTime(ui32, i64) {}
156162
inline void AddElapsedCycles(ui32, i64) {}
157163
inline void AddParkedCycles(i64) {}
158164
inline void AddBlockedCycles(i64) {}

ydb/library/actors/helpers/pool_stats_collector.h

+28
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> {
5252
void Init(NMonitoring::TDynamicCounterPtr group) {
5353
Group = group;
5454

55+
CurrentActivationTimeByActivity.resize(GetActivityTypeCount());
5556
ElapsedMicrosecByActivityBuckets.resize(GetActivityTypeCount());
5657
ReceivedEventsByActivityBuckets.resize(GetActivityTypeCount());
5758
ActorsAliveByActivityBuckets.resize(GetActivityTypeCount());
@@ -77,6 +78,7 @@ class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> {
7778
}
7879
}
7980

81+
*CurrentActivationTimeByActivity[i] = 0;
8082
*ElapsedMicrosecByActivityBuckets[i] = ::NHPTimer::GetSeconds(ticks)*1000000;
8183
*ReceivedEventsByActivityBuckets[i] = events;
8284
*ActorsAliveByActivityBuckets[i] = actors;
@@ -87,6 +89,29 @@ class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> {
8789
*UsageByActivityBuckets[i][j] = stats.UsageByActivity[i][j];
8890
}
8991
}
92+
93+
auto setActivationTime = [&](TActivationTime activation) {
94+
if (!ActorsAliveByActivityBuckets[activation.LastActivity]) {
95+
InitCountersForActivity(activation.LastActivity);
96+
}
97+
*CurrentActivationTimeByActivity[activation.LastActivity] = activation.TimeUs;
98+
};
99+
if (stats.CurrentActivationTime.TimeUs) {
100+
setActivationTime(stats.CurrentActivationTime);
101+
}
102+
std::vector<TActivationTime> activationTimes = stats.AggregatedCurrentActivationTime;
103+
Sort(activationTimes.begin(), activationTimes.end(), [](auto &left, auto &right) {
104+
return left.LastActivity < right.LastActivity ||
105+
left.LastActivity == right.LastActivity && left.TimeUs > right.TimeUs;
106+
});
107+
ui32 prevActivity = Max<ui32>();
108+
for (auto &activationTime : activationTimes) {
109+
if (activationTime.LastActivity == prevActivity) {
110+
continue;
111+
}
112+
setActivationTime(activationTime);
113+
prevActivity = activationTime.LastActivity;
114+
}
90115
}
91116

92117
private:
@@ -95,6 +120,8 @@ class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> {
95120

96121
auto bucketName = TString(GetActivityTypeName(activityType));
97122

123+
CurrentActivationTimeByActivity[activityType] =
124+
Group->GetSubgroup("sensor", "CurrentActivationTimeUsByActivity")->GetNamedCounter("activity", bucketName, false);
98125
ElapsedMicrosecByActivityBuckets[activityType] =
99126
Group->GetSubgroup("sensor", "ElapsedMicrosecByActivity")->GetNamedCounter("activity", bucketName, true);
100127
ReceivedEventsByActivityBuckets[activityType] =
@@ -114,6 +141,7 @@ class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> {
114141
private:
115142
NMonitoring::TDynamicCounterPtr Group;
116143

144+
TVector<NMonitoring::TDynamicCounters::TCounterPtr> CurrentActivationTimeByActivity;
117145
TVector<NMonitoring::TDynamicCounters::TCounterPtr> ElapsedMicrosecByActivityBuckets;
118146
TVector<NMonitoring::TDynamicCounters::TCounterPtr> ReceivedEventsByActivityBuckets;
119147
TVector<NMonitoring::TDynamicCounters::TCounterPtr> ActorsAliveByActivityBuckets;

0 commit comments

Comments
 (0)