Skip to content

Commit a2426cc

Browse files
authored
Move realization of pool stats collector to cpp files (#15510)
1 parent 7a338da commit a2426cc

File tree

6 files changed

+576
-470
lines changed

6 files changed

+576
-470
lines changed

ydb/core/base/pool_stats_collector.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
#include <ydb/library/actors/core/actor_bootstrapped.h>
1010
#include <ydb/library/actors/helpers/pool_stats_collector.h>
11+
#include <ydb/library/actors/helpers/collector_counters.h>
1112

1213
#include <ydb/core/graph/api/service.h>
1314
#include <ydb/core/graph/api/events.h>
@@ -23,7 +24,7 @@ class TStatsCollectingActor : public NActors::TStatsCollectingActor {
2324
::NMonitoring::TDynamicCounterPtr counters)
2425
: NActors::TStatsCollectingActor(intervalSec, setup, GetServiceCounters(counters, "utils"))
2526
{
26-
MiniKQLPoolStats.Init(Counters.Get());
27+
MiniKQLPoolStats.Init(counters.Get());
2728
}
2829

2930
private:
@@ -57,7 +58,7 @@ class TStatsCollectingActor : public NActors::TStatsCollectingActor {
5758
auto systemUpdate = std::make_unique<NNodeWhiteboard::TEvWhiteboard::TEvSystemStateUpdate>();
5859
ui32 coresTotal = 0;
5960
double coresUsed = 0;
60-
for (const auto& pool : PoolCounters) {
61+
for (const auto& pool : GetPoolCounters()) {
6162
auto& pb = *systemUpdate->Record.AddPoolStats();
6263
pb.SetName(pool.Name);
6364
pb.SetUsage(pool.Usage);
Lines changed: 315 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,315 @@
1+
#include "collector_counters.h"
2+
3+
namespace NActors {
4+
5+
// THistogramCounters
6+
7+
void THistogramCounters::Init(NMonitoring::TDynamicCounters* group, const TString& baseName, const TString& unit, ui64 maxVal) {
8+
for (size_t i = 0; (1ull<<i) <= maxVal; ++i) {
9+
TString bucketName = ToString(1ull<<i) + " " + unit;
10+
Buckets.push_back(group->GetSubgroup("sensor", baseName)->GetNamedCounter("range", bucketName, true));
11+
}
12+
Buckets.push_back(group->GetSubgroup("sensor", baseName)->GetNamedCounter("range", "INF", true));
13+
}
14+
15+
void THistogramCounters::Set(const TLogHistogram& data) {
16+
ui32 i = 0;
17+
for (;i < Y_ARRAY_SIZE(data.Buckets) && i < Buckets.size()-1; ++i)
18+
*Buckets[i] = data.Buckets[i];
19+
ui64 last = 0;
20+
for (;i < Y_ARRAY_SIZE(data.Buckets); ++i)
21+
last += data.Buckets[i];
22+
*Buckets.back() = last;
23+
}
24+
25+
void THistogramCounters::Set(const TLogHistogram& data, double factor) {
26+
ui32 i = 0;
27+
for (;i < Y_ARRAY_SIZE(data.Buckets) && i < Buckets.size()-1; ++i)
28+
*Buckets[i] = data.Buckets[i]*factor;
29+
ui64 last = 0;
30+
for (;i < Y_ARRAY_SIZE(data.Buckets); ++i)
31+
last += data.Buckets[i];
32+
*Buckets.back() = last*factor;
33+
}
34+
35+
// TActivityStats
36+
37+
void TActivityStats::Init(NMonitoring::TDynamicCounterPtr group) {
38+
Group = group;
39+
40+
CurrentActivationTimeByActivity.resize(GetActivityTypeCount());
41+
ElapsedMicrosecByActivityBuckets.resize(GetActivityTypeCount());
42+
ReceivedEventsByActivityBuckets.resize(GetActivityTypeCount());
43+
ActorsAliveByActivityBuckets.resize(GetActivityTypeCount());
44+
ScheduledEventsByActivityBuckets.resize(GetActivityTypeCount());
45+
StuckActorsByActivityBuckets.resize(GetActivityTypeCount());
46+
UsageByActivityBuckets.resize(GetActivityTypeCount());
47+
}
48+
49+
void TActivityStats::Set(const TExecutorThreadStats& stats) {
50+
for (ui32 i : xrange(stats.MaxActivityType())) {
51+
Y_ABORT_UNLESS(i < GetActivityTypeCount());
52+
ui64 ticks = stats.ElapsedTicksByActivity[i];
53+
ui64 events = stats.ReceivedEventsByActivity[i];
54+
ui64 actors = stats.ActorsAliveByActivity[i];
55+
ui64 scheduled = stats.ScheduledEventsByActivity[i];
56+
ui64 stuck = stats.StuckActorsByActivity[i];
57+
58+
if (!ActorsAliveByActivityBuckets[i]) {
59+
if (ticks || events || actors || scheduled) {
60+
InitCountersForActivity(i);
61+
} else {
62+
continue;
63+
}
64+
}
65+
66+
*CurrentActivationTimeByActivity[i] = 0;
67+
*ElapsedMicrosecByActivityBuckets[i] = ::NHPTimer::GetSeconds(ticks)*1000000;
68+
*ReceivedEventsByActivityBuckets[i] = events;
69+
*ActorsAliveByActivityBuckets[i] = actors;
70+
*ScheduledEventsByActivityBuckets[i] = scheduled;
71+
*StuckActorsByActivityBuckets[i] = stuck;
72+
73+
for (ui32 j = 0; j < 10; ++j) {
74+
*UsageByActivityBuckets[i][j] = stats.UsageByActivity[i][j];
75+
}
76+
}
77+
78+
auto setActivationTime = [&](TActivationTime activation) {
79+
if (!ActorsAliveByActivityBuckets[activation.LastActivity]) {
80+
InitCountersForActivity(activation.LastActivity);
81+
}
82+
*CurrentActivationTimeByActivity[activation.LastActivity] = activation.TimeUs;
83+
};
84+
if (stats.CurrentActivationTime.TimeUs) {
85+
setActivationTime(stats.CurrentActivationTime);
86+
}
87+
std::vector<TActivationTime> activationTimes = stats.AggregatedCurrentActivationTime;
88+
Sort(activationTimes.begin(), activationTimes.end(), [](auto &left, auto &right) {
89+
return left.LastActivity < right.LastActivity ||
90+
left.LastActivity == right.LastActivity && left.TimeUs > right.TimeUs;
91+
});
92+
ui32 prevActivity = Max<ui32>();
93+
for (auto &activationTime : activationTimes) {
94+
if (activationTime.LastActivity == prevActivity) {
95+
continue;
96+
}
97+
setActivationTime(activationTime);
98+
prevActivity = activationTime.LastActivity;
99+
}
100+
}
101+
102+
void TActivityStats::InitCountersForActivity(ui32 activityType) {
103+
Y_ABORT_UNLESS(activityType < GetActivityTypeCount());
104+
105+
auto bucketName = TString(GetActivityTypeName(activityType));
106+
107+
CurrentActivationTimeByActivity[activityType] =
108+
Group->GetSubgroup("sensor", "CurrentActivationTimeUsByActivity")->GetNamedCounter("activity", bucketName, false);
109+
ElapsedMicrosecByActivityBuckets[activityType] =
110+
Group->GetSubgroup("sensor", "ElapsedMicrosecByActivity")->GetNamedCounter("activity", bucketName, true);
111+
ReceivedEventsByActivityBuckets[activityType] =
112+
Group->GetSubgroup("sensor", "ReceivedEventsByActivity")->GetNamedCounter("activity", bucketName, true);
113+
ActorsAliveByActivityBuckets[activityType] =
114+
Group->GetSubgroup("sensor", "ActorsAliveByActivity")->GetNamedCounter("activity", bucketName, false);
115+
ScheduledEventsByActivityBuckets[activityType] =
116+
Group->GetSubgroup("sensor", "ScheduledEventsByActivity")->GetNamedCounter("activity", bucketName, true);
117+
StuckActorsByActivityBuckets[activityType] =
118+
Group->GetSubgroup("sensor", "StuckActorsByActivity")->GetNamedCounter("activity", bucketName, false);
119+
120+
for (ui32 i = 0; i < 10; ++i) {
121+
UsageByActivityBuckets[activityType][i] = Group->GetSubgroup("sensor", "UsageByActivity")->GetSubgroup("bin", ToString(i))->GetNamedCounter("activity", bucketName, false);
122+
}
123+
}
124+
125+
// TExecutorPoolCounters
126+
127+
void TExecutorPoolCounters::Init(NMonitoring::TDynamicCounters* group, const TString& poolName, ui32 threads) {
128+
LastElapsedSeconds = 0;
129+
Usage = 0;
130+
UsageTimer.Reset();
131+
Name = poolName;
132+
Threads = threads;
133+
LimitThreads = threads;
134+
DefaultThreads = threads;
135+
136+
PoolGroup = group->GetSubgroup("execpool", poolName);
137+
138+
SentEvents = PoolGroup->GetCounter("SentEvents", true);
139+
ReceivedEvents = PoolGroup->GetCounter("ReceivedEvents", true);
140+
PreemptedEvents = PoolGroup->GetCounter("PreemptedEvents", true);
141+
NonDeliveredEvents = PoolGroup->GetCounter("NonDeliveredEvents", true);
142+
DestroyedActors = PoolGroup->GetCounter("DestroyedActors", true);
143+
CpuMicrosec = PoolGroup->GetCounter("CpuMicrosec", true);
144+
ElapsedMicrosec = PoolGroup->GetCounter("ElapsedMicrosec", true);
145+
ParkedMicrosec = PoolGroup->GetCounter("ParkedMicrosec", true);
146+
EmptyMailboxActivation = PoolGroup->GetCounter("EmptyMailboxActivation", true);
147+
ActorRegistrations = PoolGroup->GetCounter("ActorRegistrations", true);
148+
ActorsAlive = PoolGroup->GetCounter("ActorsAlive", false);
149+
AllocatedMailboxes = PoolGroup->GetCounter("AllocatedMailboxes", false);
150+
MailboxPushedOutBySoftPreemption = PoolGroup->GetCounter("MailboxPushedOutBySoftPreemption", true);
151+
MailboxPushedOutByTime = PoolGroup->GetCounter("MailboxPushedOutByTime", true);
152+
MailboxPushedOutByEventCount = PoolGroup->GetCounter("MailboxPushedOutByEventCount", true);
153+
WrongWakenedThreadCount = PoolGroup->GetCounter("WrongWakenedThreadCount", true);
154+
CurrentThreadCount = PoolGroup->GetCounter("CurrentThreadCount", false);
155+
PotentialMaxThreadCount = PoolGroup->GetCounter("PotentialMaxThreadCount", false);
156+
DefaultThreadCount = PoolGroup->GetCounter("DefaultThreadCount", false);
157+
MaxThreadCount = PoolGroup->GetCounter("MaxThreadCount", false);
158+
159+
CurrentThreadCountPercent = PoolGroup->GetCounter("CurrentThreadCountPercent", false);
160+
PotentialMaxThreadCountPercent = PoolGroup->GetCounter("PotentialMaxThreadCountPercent", false);
161+
PossibleMaxThreadCountPercent = PoolGroup->GetCounter("PossibleMaxThreadCountPercent", false);
162+
DefaultThreadCountPercent = PoolGroup->GetCounter("DefaultThreadCountPercent", false);
163+
MaxThreadCountPercent = PoolGroup->GetCounter("MaxThreadCountPercent", false);
164+
165+
IsNeedy = PoolGroup->GetCounter("IsNeedy", false);
166+
IsStarved = PoolGroup->GetCounter("IsStarved", false);
167+
IsHoggish = PoolGroup->GetCounter("IsHoggish", false);
168+
HasFullOwnSharedThread = PoolGroup->GetCounter("HasFullOwnSharedThread", false);
169+
HasHalfOfOwnSharedThread = PoolGroup->GetCounter("HasHalfOfOwnSharedThread", false);
170+
HasHalfOfOtherSharedThread = PoolGroup->GetCounter("HasHalfOfOtherSharedThread", false);
171+
IncreasingThreadsByNeedyState = PoolGroup->GetCounter("IncreasingThreadsByNeedyState", true);
172+
IncreasingThreadsByExchange = PoolGroup->GetCounter("IncreasingThreadsByExchange", true);
173+
DecreasingThreadsByStarvedState = PoolGroup->GetCounter("DecreasingThreadsByStarvedState", true);
174+
DecreasingThreadsByHoggishState = PoolGroup->GetCounter("DecreasingThreadsByHoggishState", true);
175+
DecreasingThreadsByExchange = PoolGroup->GetCounter("DecreasingThreadsByExchange", true);
176+
NotEnoughCpuExecutions = PoolGroup->GetCounter("NotEnoughCpuExecutions", true);
177+
SpinningTimeUs = PoolGroup->GetCounter("SpinningTimeUs", true);
178+
SpinThresholdUs = PoolGroup->GetCounter("SpinThresholdUs", false);
179+
180+
181+
LegacyActivationTimeHistogram.Init(PoolGroup.Get(), "ActivationTime", "usec", 5*1000*1000);
182+
ActivationTimeHistogram = PoolGroup->GetHistogram(
183+
"ActivationTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1));
184+
LegacyEventDeliveryTimeHistogram.Init(PoolGroup.Get(), "EventDeliveryTime", "usec", 5*1000*1000);
185+
EventDeliveryTimeHistogram = PoolGroup->GetHistogram(
186+
"EventDeliveryTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1));
187+
LegacyEventProcessingCountHistogram.Init(PoolGroup.Get(), "EventProcessingCount", "usec", 5*1000*1000);
188+
EventProcessingCountHistogram = PoolGroup->GetHistogram(
189+
"EventProcessingCountUs", NMonitoring::ExponentialHistogram(24, 2, 1));
190+
LegacyEventProcessingTimeHistogram.Init(PoolGroup.Get(), "EventProcessingTime", "usec", 5*1000*1000);
191+
EventProcessingTimeHistogram = PoolGroup->GetHistogram(
192+
"EventProcessingTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1));
193+
194+
ActivityStats.Init(PoolGroup.Get());
195+
196+
MaxUtilizationTime = PoolGroup->GetCounter("MaxUtilizationTime", true);
197+
}
198+
199+
void TExecutorPoolCounters::Set(const TExecutorPoolStats& poolStats, const TExecutorThreadStats& stats) {
200+
#ifdef ACTORSLIB_COLLECT_EXEC_STATS
201+
double elapsedSeconds = ::NHPTimer::GetSeconds(stats.ElapsedTicks);
202+
*SentEvents = stats.SentEvents;
203+
*ReceivedEvents = stats.ReceivedEvents;
204+
*PreemptedEvents = stats.PreemptedEvents;
205+
*NonDeliveredEvents = stats.NonDeliveredEvents;
206+
*DestroyedActors = stats.PoolDestroyedActors;
207+
*EmptyMailboxActivation = stats.EmptyMailboxActivation;
208+
*CpuMicrosec = stats.CpuUs;
209+
*ElapsedMicrosec = elapsedSeconds*1000000;
210+
*ParkedMicrosec = ::NHPTimer::GetSeconds(stats.ParkedTicks)*1000000;
211+
*ActorRegistrations = stats.PoolActorRegistrations;
212+
*ActorsAlive = stats.PoolActorRegistrations - stats.PoolDestroyedActors;
213+
*AllocatedMailboxes = stats.PoolAllocatedMailboxes;
214+
*MailboxPushedOutBySoftPreemption = stats.MailboxPushedOutBySoftPreemption;
215+
*MailboxPushedOutByTime = stats.MailboxPushedOutByTime;
216+
*MailboxPushedOutByEventCount = stats.MailboxPushedOutByEventCount;
217+
*WrongWakenedThreadCount = poolStats.WrongWakenedThreadCount;
218+
*CurrentThreadCount = poolStats.CurrentThreadCount;
219+
*PotentialMaxThreadCount = poolStats.PotentialMaxThreadCount;
220+
*DefaultThreadCount = poolStats.DefaultThreadCount;
221+
*MaxThreadCount = poolStats.MaxThreadCount;
222+
223+
*CurrentThreadCountPercent = poolStats.CurrentThreadCount * 100;
224+
*PotentialMaxThreadCountPercent = poolStats.PotentialMaxThreadCount * 100;
225+
*PossibleMaxThreadCountPercent = poolStats.PotentialMaxThreadCount * 100;
226+
*DefaultThreadCountPercent = poolStats.DefaultThreadCount * 100;
227+
*MaxThreadCountPercent = poolStats.MaxThreadCount * 100;
228+
229+
*IsNeedy = poolStats.IsNeedy;
230+
*IsStarved = poolStats.IsStarved;
231+
*IsHoggish = poolStats.IsHoggish;
232+
233+
*HasFullOwnSharedThread = poolStats.HasFullOwnSharedThread;
234+
*HasHalfOfOwnSharedThread = poolStats.HasHalfOfOwnSharedThread;
235+
*HasHalfOfOtherSharedThread = poolStats.HasHalfOfOtherSharedThread;
236+
*IncreasingThreadsByNeedyState = poolStats.IncreasingThreadsByNeedyState;
237+
*IncreasingThreadsByExchange = poolStats.IncreasingThreadsByExchange;
238+
*DecreasingThreadsByStarvedState = poolStats.DecreasingThreadsByStarvedState;
239+
*DecreasingThreadsByHoggishState = poolStats.DecreasingThreadsByHoggishState;
240+
*DecreasingThreadsByExchange = poolStats.DecreasingThreadsByExchange;
241+
*NotEnoughCpuExecutions = stats.NotEnoughCpuExecutions;
242+
243+
*SpinningTimeUs = poolStats.SpinningTimeUs;
244+
*SpinThresholdUs = poolStats.SpinThresholdUs;
245+
246+
LegacyActivationTimeHistogram.Set(stats.ActivationTimeHistogram);
247+
ActivationTimeHistogram->Reset();
248+
ActivationTimeHistogram->Collect(stats.ActivationTimeHistogram);
249+
250+
LegacyEventDeliveryTimeHistogram.Set(stats.EventDeliveryTimeHistogram);
251+
EventDeliveryTimeHistogram->Reset();
252+
EventDeliveryTimeHistogram->Collect(stats.EventDeliveryTimeHistogram);
253+
254+
LegacyEventProcessingCountHistogram.Set(stats.EventProcessingCountHistogram);
255+
EventProcessingCountHistogram->Reset();
256+
EventProcessingCountHistogram->Collect(stats.EventProcessingCountHistogram);
257+
258+
double toMicrosec = 1000000 / NHPTimer::GetClockRate();
259+
LegacyEventProcessingTimeHistogram.Set(stats.EventProcessingTimeHistogram, toMicrosec);
260+
EventProcessingTimeHistogram->Reset();
261+
for (ui32 i = 0; i < stats.EventProcessingTimeHistogram.Count(); ++i) {
262+
EventProcessingTimeHistogram->Collect(
263+
stats.EventProcessingTimeHistogram.UpperBound(i),
264+
stats.EventProcessingTimeHistogram.Value(i) * toMicrosec);
265+
}
266+
267+
ActivityStats.Set(stats);
268+
269+
*MaxUtilizationTime = poolStats.MaxUtilizationTime;
270+
271+
double seconds = UsageTimer.PassedReset();
272+
273+
// TODO[serxa]: It doesn't account for contention. Use 1 - parkedTicksDelta / seconds / numThreads KIKIMR-11916
274+
Threads = poolStats.CurrentThreadCount;
275+
LimitThreads = poolStats.PotentialMaxThreadCount;
276+
const double currentUsage = LimitThreads > 0 ? ((elapsedSeconds - LastElapsedSeconds) / seconds / LimitThreads) : 0;
277+
278+
// update usage factor according to smoothness
279+
const double smoothness = 0.5;
280+
Usage = currentUsage * smoothness + Usage * (1.0 - smoothness);
281+
LastElapsedSeconds = elapsedSeconds;
282+
#else
283+
Y_UNUSED(stats);
284+
Y_UNUSED(poolStats);
285+
#endif
286+
}
287+
288+
// TActorSystemCounters
289+
290+
void TActorSystemCounters::Init(NMonitoring::TDynamicCounters* group) {
291+
Group = group;
292+
293+
MaxUsedCpuPercent = Group->GetCounter("MaxUsedCpuPercent", false);
294+
MinUsedCpuPercent = Group->GetCounter("MinUsedCpuPercent", false);
295+
MaxElapsedCpuPercent = Group->GetCounter("MaxElapsedCpuPercent", false);
296+
MinElapsedCpuPercent = Group->GetCounter("MinElapsedCpuPercent", false);
297+
AvgAwakeningTimeNs = Group->GetCounter("AvgAwakeningTimeNs", false);
298+
AvgWakingUpTimeNs = Group->GetCounter("AvgWakingUpTimeNs", false);
299+
}
300+
301+
void TActorSystemCounters::Set(const THarmonizerStats& harmonizerStats) {
302+
#ifdef ACTORSLIB_COLLECT_EXEC_STATS
303+
*MaxUsedCpuPercent = harmonizerStats.MaxUsedCpu;
304+
*MinUsedCpuPercent = harmonizerStats.MinUsedCpu;
305+
*MaxElapsedCpuPercent = harmonizerStats.MaxElapsedCpu;
306+
*MinElapsedCpuPercent = harmonizerStats.MinElapsedCpu;
307+
308+
*AvgAwakeningTimeNs = harmonizerStats.AvgAwakeningTimeUs * 1000;
309+
*AvgWakingUpTimeNs = harmonizerStats.AvgWakingUpTimeUs * 1000;
310+
#else
311+
Y_UNUSED(harmonizerStats);
312+
#endif
313+
}
314+
315+
} // NActors

0 commit comments

Comments
 (0)