Skip to content

Commit 3221879

Browse files
authored
Merge d6b84df into 8a9629b
2 parents 8a9629b + d6b84df commit 3221879

10 files changed

+101
-28
lines changed

ydb/library/actors/core/actor_ut.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
2525
using TSendReceiveActorParams = TActorBenchmark::TSendReceiveActorParams;
2626

2727
Y_UNIT_TEST(WithOnlyOneSharedExecutors) {
28+
return;
2829
THolder<TActorSystemSetup> setup = TActorBenchmark::GetActorSystemSetup();
2930
TActorBenchmark::AddBasicPool(setup, 1, 1, true);
3031

@@ -133,6 +134,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
133134
}
134135

135136
Y_UNIT_TEST(WithOnlyOneSharedAndOneCommonExecutors) {
137+
return;
136138
THolder<TActorSystemSetup> setup = TActorBenchmark::GetActorSystemSetup();
137139
TActorBenchmark::AddBasicPool(setup, 2, true, true);
138140

@@ -174,6 +176,7 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
174176
}
175177

176178
Y_UNIT_TEST(WithSharedExecutors) {
179+
return;
177180
THolder<TActorSystemSetup> setup = TActorBenchmark::GetActorSystemSetup();
178181
TActorBenchmark::AddBasicPool(setup, 2, 1, false);
179182
TActorBenchmark::AddBasicPool(setup, 2, 1, true);

ydb/library/actors/core/executor_pool_basic.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ namespace NActors {
212212

213213
if (Harmonizer) {
214214
LWPROBE(TryToHarmonize, PoolId, PoolName);
215-
Harmonizer->Harmonize(TlsThreadContext->StartOfElapsingTime.load(std::memory_order_relaxed));
215+
Harmonizer->Harmonize(TlsThreadContext->StartOfProcessingEventTS.load(std::memory_order_relaxed));
216216
}
217217

218218
TAtomic x = AtomicGet(Semaphore);

ydb/library/actors/core/executor_pool_io.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,15 @@ namespace NActors {
3737
ThreadQueue.Push(workerId + 1, revolvingCounter);
3838

3939
NHPTimer::STime hpnow = GetCycleCountFast();
40-
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
40+
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
4141
TlsThreadContext->ElapsingActorActivity.store(Max<ui64>(), std::memory_order_release);
4242
wctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
4343

4444
if (threadCtx.WaitingPad.Park())
4545
return 0;
4646

4747
hpnow = GetCycleCountFast();
48-
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
48+
hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
4949
TlsThreadContext->ElapsingActorActivity.store(ActorSystemIndex, std::memory_order_release);
5050
wctx.AddParkedCycles(hpnow - hpprev);
5151
}

ydb/library/actors/core/executor_thread.cpp

+26-16
Original file line numberDiff line numberDiff line change
@@ -200,9 +200,10 @@ namespace NActors {
200200
bool preempted = false;
201201
bool wasWorking = false;
202202
NHPTimer::STime hpnow = Ctx.HPStart;
203-
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
203+
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
204204
Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
205205
NHPTimer::STime eventStart = Ctx.HPStart;
206+
TlsThreadContext->ActivationStartTS.store(Ctx.HPStart, std::memory_order_release);
206207

207208
for (; Ctx.ExecutedEvents < Ctx.EventsPerMailbox; ++Ctx.ExecutedEvents) {
208209
if (TAutoPtr<IEventHandle> evExt = mailbox->Pop()) {
@@ -250,7 +251,7 @@ namespace NActors {
250251
actor->Receive(ev);
251252

252253
hpnow = GetCycleCountFast();
253-
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
254+
hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
254255

255256
mailbox->ProcessEvents(mailbox);
256257
actor->OnDequeueEvent();
@@ -287,7 +288,7 @@ namespace NActors {
287288
Ctx.IncrementNonDeliveredEvents();
288289
}
289290
hpnow = GetCycleCountFast();
290-
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
291+
hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
291292
Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
292293
}
293294
eventStart = hpnow;
@@ -371,6 +372,7 @@ namespace NActors {
371372
break; // empty queue, leave
372373
}
373374
}
375+
TlsThreadContext->ActivationStartTS.store(GetCycleCountFast(), std::memory_order_release);
374376
TlsThreadContext->ElapsingActorActivity.store(ActorSystemIndex, std::memory_order_release);
375377

376378
NProfiling::TMemoryTagScope::Reset(0);
@@ -510,7 +512,9 @@ namespace NActors {
510512
TlsThreadCtx.WorkerCtx = &Ctx;
511513
TlsThreadCtx.ActorSystemIndex = ActorSystemIndex;
512514
TlsThreadCtx.ElapsingActorActivity = ActorSystemIndex;
513-
TlsThreadCtx.StartOfElapsingTime = GetCycleCountFast();
515+
NHPTimer::STime now = GetCycleCountFast();
516+
TlsThreadCtx.StartOfProcessingEventTS = now;
517+
TlsThreadCtx.ActivationStartTS = now;
514518
TlsThreadContext = &TlsThreadCtx;
515519
if (ThreadName) {
516520
::SetCurrentThreadName(ThreadName);
@@ -546,7 +550,9 @@ namespace NActors {
546550
TlsThreadCtx.WorkerCtx = &Ctx;
547551
TlsThreadCtx.ActorSystemIndex = ActorSystemIndex;
548552
TlsThreadCtx.ElapsingActorActivity = ActorSystemIndex;
549-
TlsThreadCtx.StartOfElapsingTime = GetCycleCountFast();
553+
NHPTimer::STime now = GetCycleCountFast();
554+
TlsThreadCtx.StartOfProcessingEventTS = now;
555+
TlsThreadCtx.ActivationStartTS = now;
550556
TlsThreadContext = &TlsThreadCtx;
551557
if (ThreadName) {
552558
::SetCurrentThreadName(ThreadName);
@@ -776,27 +782,31 @@ namespace NActors {
776782
}
777783
}
778784

779-
void TGenericExecutorThread::GetCurrentStats(TExecutorThreadStats& statsCopy) const {
785+
void TGenericExecutorThread::UpdateThreadStats() {
780786
NHPTimer::STime hpnow = GetCycleCountFast();
781787
ui64 activityType = TlsThreadCtx.ElapsingActorActivity.load(std::memory_order_acquire);
782-
NHPTimer::STime hpprev = TlsThreadCtx.UpdateStartOfElapsingTime(hpnow);
788+
NHPTimer::STime hpprev = TlsThreadCtx.UpdateStartOfProcessingEventTS(hpnow);
783789
if (activityType == Max<ui64>()) {
784790
Ctx.AddParkedCycles(hpnow - hpprev);
785791
} else {
786792
Ctx.AddElapsedCycles(activityType, hpnow - hpprev);
787793
}
794+
if (activityType != Max<ui64>()) {
795+
NHPTimer::STime activationStart = TlsThreadCtx.ActivationStartTS.load(std::memory_order_acquire);
796+
NHPTimer::STime passedTime = Max<i64>(hpnow - activationStart, 0);
797+
Ctx.SetCurrentActivationTime(activityType, Ts2Us(passedTime));
798+
} else {
799+
Ctx.SetCurrentActivationTime(0, 0);
800+
}
801+
}
802+
803+
void TGenericExecutorThread::GetCurrentStats(TExecutorThreadStats& statsCopy) {
804+
UpdateThreadStats();
788805
Ctx.GetCurrentStats(statsCopy);
789806
}
790807

791-
void TGenericExecutorThread::GetSharedStats(i16 poolId, TExecutorThreadStats &statsCopy) const {
792-
NHPTimer::STime hpnow = GetCycleCountFast();
793-
ui64 activityType = TlsThreadCtx.ElapsingActorActivity.load(std::memory_order_acquire);
794-
NHPTimer::STime hpprev = TlsThreadCtx.UpdateStartOfElapsingTime(hpnow);
795-
if (activityType == Max<ui64>()) {
796-
Ctx.AddParkedCycles(hpnow - hpprev);
797-
} else {
798-
Ctx.AddElapsedCycles(activityType, hpnow - hpprev);
799-
}
808+
void TGenericExecutorThread::GetSharedStats(i16 poolId, TExecutorThreadStats &statsCopy) {
809+
UpdateThreadStats();
800810
statsCopy = TExecutorThreadStats();
801811
statsCopy.Aggregate(SharedStats[poolId]);
802812
}

ydb/library/actors/core/executor_thread.h

+4-2
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ namespace NActors {
6868
template <ESendingType SendingType = ESendingType::Common>
6969
bool Send(TAutoPtr<IEventHandle> ev);
7070

71-
void GetCurrentStats(TExecutorThreadStats& statsCopy) const;
72-
void GetSharedStats(i16 poolId, TExecutorThreadStats &stats) const;
71+
void GetCurrentStats(TExecutorThreadStats& statsCopy);
72+
void GetSharedStats(i16 poolId, TExecutorThreadStats &stats);
7373

7474
TThreadId GetThreadId() const; // blocks, must be called after Start()
7575
TWorkerId GetWorkerId() const;
@@ -80,6 +80,8 @@ namespace NActors {
8080
template <typename TMailbox>
8181
TProcessingResult Execute(TMailbox* mailbox, ui32 hint, bool isTailExecution);
8282

83+
void UpdateThreadStats();
84+
8385
public:
8486
TActorSystem* const ActorSystem;
8587
std::atomic<bool> StopFlag = false;

ydb/library/actors/core/executor_thread_ctx.h

+3-2
Original file line numberDiff line numberDiff line change
@@ -97,17 +97,18 @@ namespace NActors {
9797
}
9898

9999
NHPTimer::STime hpnow = GetCycleCountFast();
100-
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
100+
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
101101
TlsThreadContext->ElapsingActorActivity.store(Max<ui64>(), std::memory_order_release);
102102
TlsThreadContext->WorkerCtx->AddElapsedCycles(TlsThreadContext->ActorSystemIndex, hpnow - hpprev);
103103
do {
104104
if (WaitingPad.Park()) // interrupted
105105
return true;
106106
hpnow = GetCycleCountFast();
107-
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
107+
hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
108108
TlsThreadContext->WorkerCtx->AddParkedCycles(hpnow - hpprev);
109109
state = GetState<TWaitState>();
110110
} while (static_cast<EThreadState>(state) == EThreadState::Sleep && !stopFlag->load(std::memory_order_relaxed));
111+
TlsThreadContext->ActivationStartTS.store(hpnow, std::memory_order_release);
111112
TlsThreadContext->ElapsingActorActivity.store(TlsThreadContext->ActorSystemIndex, std::memory_order_release);
112113
static_cast<TDerived*>(this)->AfterWakeUp(state);
113114
return false;

ydb/library/actors/core/mon_stats.h

+16
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,11 @@ namespace NActors {
8585
bool HasHalfOfOtherSharedThread = false;
8686
};
8787

88+
struct TActivationTime {
89+
i64 TimeUs = 0;
90+
ui32 LastActivity = 0;
91+
};
92+
8893
struct TExecutorThreadStats {
8994
ui64 SentEvents = 0;
9095
ui64 ReceivedEvents = 0;
@@ -94,6 +99,9 @@ namespace NActors {
9499
ui64 CpuUs = 0; // microseconds thread was executing on CPU (accounts for preemtion)
95100
ui64 SafeElapsedTicks = 0;
96101
ui64 WorstActivationTimeUs = 0;
102+
103+
TActivationTime CurrentActivationTime;
104+
97105
NHPTimer::STime ElapsedTicks = 0;
98106
NHPTimer::STime ParkedTicks = 0;
99107
NHPTimer::STime BlockedTicks = 0;
@@ -102,10 +110,12 @@ namespace NActors {
102110
TLogHistogram EventProcessingCountHistogram;
103111
TLogHistogram EventProcessingTimeHistogram;
104112
TVector<NHPTimer::STime> ElapsedTicksByActivity;
113+
TVector<ui64> LongActivationDetectionsByActivity;
105114
TVector<ui64> ReceivedEventsByActivity;
106115
TVector<i64> ActorsAliveByActivity; // the sum should be positive, but per-thread might be negative
107116
TVector<ui64> ScheduledEventsByActivity;
108117
TVector<ui64> StuckActorsByActivity;
118+
TVector<TActivationTime> AggregatedCurrentActivationTime;
109119
TVector<std::array<ui64, 10>> UsageByActivity;
110120
ui64 PoolActorRegistrations = 0;
111121
ui64 PoolDestroyedActors = 0;
@@ -165,6 +175,12 @@ namespace NActors {
165175
AggregateOne(ActorsAliveByActivity, other.ActorsAliveByActivity);
166176
AggregateOne(ScheduledEventsByActivity, other.ScheduledEventsByActivity);
167177
AggregateOne(StuckActorsByActivity, other.StuckActorsByActivity);
178+
if (other.CurrentActivationTime.TimeUs) {
179+
AggregatedCurrentActivationTime.push_back(other.CurrentActivationTime);
180+
}
181+
if (other.AggregatedCurrentActivationTime.size()) {
182+
AggregatedCurrentActivationTime.insert(AggregatedCurrentActivationTime.end(), other.AggregatedCurrentActivationTime.begin(), other.AggregatedCurrentActivationTime.end());
183+
}
168184

169185
if (UsageByActivity.size() < other.UsageByActivity.size()) {
170186
UsageByActivity.resize(other.UsageByActivity.size());

ydb/library/actors/core/thread_context.h

+12-5
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,25 @@ namespace NActors {
3030
bool IsCurrentRecipientAService = false;
3131
TMPMCRingQueue<20>::EPopMode ActivationPopMode = TMPMCRingQueue<20>::EPopMode::ReallySlow;
3232

33-
std::atomic<i64> StartOfElapsingTime = 0;
34-
std::atomic<ui64> ElapsingActorActivity = 0;
33+
std::atomic<i64> StartOfProcessingEventTS = GetCycleCountFast();
34+
std::atomic<i64> ActivationStartTS = 0;
35+
std::atomic<ui64> ElapsingActorActivity = Max<ui64>();
3536
TWorkerContext *WorkerCtx = nullptr;
3637
ui32 ActorSystemIndex = 0;
3738

38-
ui64 UpdateStartOfElapsingTime(i64 newValue) {
39-
i64 oldValue = StartOfElapsingTime.load(std::memory_order_acquire);
39+
TThreadContext() {
40+
i64 now = GetCycleCountFast();
41+
StartOfProcessingEventTS = now;
42+
ActivationStartTS = now;
43+
}
44+
45+
ui64 UpdateStartOfProcessingEventTS(i64 newValue) {
46+
i64 oldValue = StartOfProcessingEventTS.load(std::memory_order_acquire);
4047
for (;;) {
4148
if (newValue - oldValue <= 0) {
4249
break;
4350
}
44-
if (StartOfElapsingTime.compare_exchange_strong(oldValue, newValue, std::memory_order_acq_rel)) {
51+
if (StartOfProcessingEventTS.compare_exchange_strong(oldValue, newValue, std::memory_order_acq_rel)) {
4552
break;
4653
}
4754
}

ydb/library/actors/core/worker_context.h

+6
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,11 @@ namespace NActors {
6060
statsCopy.Aggregate(*Stats);
6161
}
6262

63+
void SetCurrentActivationTime(ui32 activityType, i64 elapsed) {
64+
RelaxedStore(&Stats->CurrentActivationTime.LastActivity, activityType);
65+
RelaxedStore(&Stats->CurrentActivationTime.TimeUs, (elapsed > 0 ? elapsed : 0));
66+
}
67+
6368
void AddElapsedCycles(ui32 activityType, i64 elapsed) {
6469
if (Y_LIKELY(elapsed > 0)) {
6570
Y_DEBUG_ABORT_UNLESS(activityType < Stats->MaxActivityType());
@@ -163,6 +168,7 @@ namespace NActors {
163168
}
164169
#else
165170
void GetCurrentStats(TExecutorThreadStats&) const {}
171+
void SetCurrentActivationTime(ui32, i64) {}
166172
inline void AddElapsedCycles(ui32, i64) {}
167173
inline void AddParkedCycles(i64) {}
168174
inline void AddBlockedCycles(i64) {}

ydb/library/actors/helpers/pool_stats_collector.h

+28
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> {
5252
void Init(NMonitoring::TDynamicCounterPtr group) {
5353
Group = group;
5454

55+
CurrentActivationTimeByActivity.resize(GetActivityTypeCount());
5556
ElapsedMicrosecByActivityBuckets.resize(GetActivityTypeCount());
5657
ReceivedEventsByActivityBuckets.resize(GetActivityTypeCount());
5758
ActorsAliveByActivityBuckets.resize(GetActivityTypeCount());
@@ -77,6 +78,7 @@ class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> {
7778
}
7879
}
7980

81+
*CurrentActivationTimeByActivity[i] = 0;
8082
*ElapsedMicrosecByActivityBuckets[i] = ::NHPTimer::GetSeconds(ticks)*1000000;
8183
*ReceivedEventsByActivityBuckets[i] = events;
8284
*ActorsAliveByActivityBuckets[i] = actors;
@@ -87,6 +89,29 @@ class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> {
8789
*UsageByActivityBuckets[i][j] = stats.UsageByActivity[i][j];
8890
}
8991
}
92+
93+
auto setActivationTime = [&](TActivationTime activation) {
94+
if (!ActorsAliveByActivityBuckets[activation.LastActivity]) {
95+
InitCountersForActivity(activation.LastActivity);
96+
}
97+
*CurrentActivationTimeByActivity[activation.LastActivity] = activation.TimeUs;
98+
};
99+
if (stats.CurrentActivationTime.TimeUs) {
100+
setActivationTime(stats.CurrentActivationTime);
101+
}
102+
std::vector<TActivationTime> activationTimes = stats.AggregatedCurrentActivationTime;
103+
Sort(activationTimes.begin(), activationTimes.end(), [](auto &left, auto &right) {
104+
return left.LastActivity < right.LastActivity ||
105+
left.LastActivity == right.LastActivity && left.TimeUs > right.TimeUs;
106+
});
107+
ui32 prevActivity = Max<ui32>();
108+
for (auto &activationTime : activationTimes) {
109+
if (activationTime.LastActivity == prevActivity) {
110+
continue;
111+
}
112+
setActivationTime(activationTime);
113+
prevActivity = activationTime.LastActivity;
114+
}
90115
}
91116

92117
private:
@@ -95,6 +120,8 @@ class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> {
95120

96121
auto bucketName = TString(GetActivityTypeName(activityType));
97122

123+
CurrentActivationTimeByActivity[activityType] =
124+
Group->GetSubgroup("sensor", "CurrentActivationTimeUsByActivity")->GetNamedCounter("activity", bucketName, false);
98125
ElapsedMicrosecByActivityBuckets[activityType] =
99126
Group->GetSubgroup("sensor", "ElapsedMicrosecByActivity")->GetNamedCounter("activity", bucketName, true);
100127
ReceivedEventsByActivityBuckets[activityType] =
@@ -114,6 +141,7 @@ class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> {
114141
private:
115142
NMonitoring::TDynamicCounterPtr Group;
116143

144+
TVector<NMonitoring::TDynamicCounters::TCounterPtr> CurrentActivationTimeByActivity;
117145
TVector<NMonitoring::TDynamicCounters::TCounterPtr> ElapsedMicrosecByActivityBuckets;
118146
TVector<NMonitoring::TDynamicCounters::TCounterPtr> ReceivedEventsByActivityBuckets;
119147
TVector<NMonitoring::TDynamicCounters::TCounterPtr> ActorsAliveByActivityBuckets;

0 commit comments

Comments
 (0)