Skip to content

Commit c0da302

Browse files
authored
Merge 9a1d4d5 into 65efb82
2 parents 65efb82 + 9a1d4d5 commit c0da302

18 files changed

+152
-10
lines changed

ydb/core/base/pool_stats_collector.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,9 @@ class TStatsCollectingActor : public NActors::TStatsCollectingActor {
4646
void OnWakeup(const TActorContext &ctx) override {
4747
MiniKQLPoolStats.Update();
4848

49-
TVector<std::tuple<TString, double, ui32>> pools;
49+
TVector<std::tuple<TString, double, ui32, ui32>> pools;
5050
for (const auto& pool : PoolCounters) {
51-
pools.emplace_back(pool.Name, pool.Usage, pool.Threads);
51+
pools.emplace_back(pool.Name, pool.Usage, pool.Threads, pool.LimitThreads);
5252
}
5353

5454
ctx.Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(ctx.SelfID.NodeId()), new NNodeWhiteboard::TEvWhiteboard::TEvSystemStateUpdate(pools));

ydb/core/mind/local.cpp

+30-7
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,9 @@ class TLocalNodeRegistrar : public TActorBootstrapped<TLocalNodeRegistrar> {
111111
ui64 MemLimit = 0;
112112
double NodeUsage = 0;
113113

114+
TInstant LastUpdate;
115+
std::vector<TExecutorPoolState> PreviousStates;
116+
114117
bool SentDrainNode = false;
115118
bool DrainResultReceived = false;
116119
i32 PrevEstimate = 0;
@@ -278,15 +281,35 @@ class TLocalNodeRegistrar : public TActorBootstrapped<TLocalNodeRegistrar> {
278281
auto& record = eventStatus->Record;
279282
record.SetStartTime(StartTime.GetValue());
280283
record.MutableResourceMaximum()->CopyFrom(ResourceLimit);
284+
std::vector<NActors::TExecutorPoolState> poolStates;
285+
ctx.ExecutorThread.ActorSystem->GetExecutorPoolStates(poolStates);
286+
287+
TDuration passedTime = ctx.Now() - LastUpdate;
288+
LastUpdate = ctx.Now();
289+
290+
auto *actorSystemInfo = record.MutableActorSystemInfo();
291+
double cores = 0;
292+
for (ui8 poolId = 0; poolId < poolStates.size(); ++poolId) {
293+
auto &poolState = poolStates[poolId];
294+
if (poolId != AppData()->IOPoolId) {
295+
cores += poolState.MinLimit;
296+
}
297+
auto *poolInfo = actorSystemInfo->AddPools();
298+
double passedElapsedUs = poolState.ElapsedUs;
299+
if (PreviousStates.size()) {
300+
passedElapsedUs -= PreviousStates[poolId].ElapsedUs;
301+
}
302+
poolInfo->SetUsedCores(passedElapsedUs / passedTime.MicroSeconds());
303+
poolInfo->SetCurrentLimit(poolState.CurrentLimit);
304+
poolInfo->SetPossibleMaxLimit(poolState.PossibleMaxLimit);
305+
}
306+
actorSystemInfo->SetCores(cores);
281307
if (!record.GetResourceMaximum().HasCPU()) {
282-
TExecutorPoolStats poolStats;
283-
TVector<TExecutorThreadStats> statsCopy;
284-
TVector<TExecutorThreadStats> sharedStatsCopy;
285-
ctx.ExecutorThread.ActorSystem->GetPoolStats(AppData()->UserPoolId, poolStats, statsCopy, sharedStatsCopy);
286-
if (!statsCopy.empty()) {
287-
record.MutableResourceMaximum()->SetCPU(poolStats.CurrentThreadCount * 1000000);
308+
if (!poolStates.empty()) {
309+
record.MutableResourceMaximum()->SetCPU(poolStates[AppData()->UserPoolId].PossibleMaxLimit * 1000000);
288310
}
289311
}
312+
PreviousStates.swap(poolStates);
290313
if (!record.GetResourceMaximum().HasMemory()) {
291314
if (MemLimit != 0) {
292315
record.MutableResourceMaximum()->SetMemory(MemLimit);
@@ -649,7 +672,7 @@ class TLocalNodeRegistrar : public TActorBootstrapped<TLocalNodeRegistrar> {
649672
const NKikimrWhiteboard::TSystemStateInfo& info = record.GetSystemStateInfo(0);
650673
if (static_cast<ui32>(info.PoolStatsSize()) > AppData()->UserPoolId) {
651674
const auto& poolStats(info.GetPoolStats(AppData()->UserPoolId));
652-
UserPoolUsage = poolStats.usage() * poolStats.threads() * 1000000; // uS
675+
UserPoolUsage = poolStats.usage() * poolStats.limit() * 1000000; // uS
653676
}
654677

655678
// Note: we use allocated memory because MemoryUsed(AnonRSS) has lag

ydb/core/node_whiteboard/node_whiteboard.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -361,12 +361,13 @@ struct TEvWhiteboard{
361361
}
362362
}
363363

364-
TEvSystemStateUpdate(const TVector<std::tuple<TString, double, ui32>>& poolStats) {
364+
TEvSystemStateUpdate(const TVector<std::tuple<TString, double, ui32, ui32>>& poolStats) {
365365
for (const auto& row : poolStats) {
366366
auto& pb = *Record.AddPoolStats();
367367
pb.SetName(std::get<0>(row));
368368
pb.SetUsage(std::get<1>(row));
369369
pb.SetThreads(std::get<2>(row));
370+
pb.SetLimit(std::get<3>(row));
370371
}
371372
}
372373

ydb/core/protos/local.proto

+11
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,16 @@ message TLocalConfig {
3232
repeated NKikimrSchemeOp.TResourceProfile ResourceProfiles = 1;
3333
}
3434

35+
message TActorSystemInfo {
36+
message TPoolInfo {
37+
optional double UsedCores = 1;
38+
optional double CurrentLimit = 2;
39+
optional double PossibleMaxLimit = 3;
40+
}
41+
repeated TPoolInfo Pools = 1;
42+
optional double Cores = 2;
43+
}
44+
3545
message TEvPing {
3646
optional fixed64 HiveId = 1;
3747
optional uint32 HiveGeneration = 2;
@@ -53,6 +63,7 @@ message TEvStatus {
5363
optional uint64 AvailableWeight = 5;
5464
optional NKikimrTabletBase.TMetrics ResourceMaximum = 8;
5565
optional uint64 StartTime = 7;
66+
optional TActorSystemInfo ActorSystemInfo = 9;
5667
}
5768

5869
enum EBootMode {

ydb/core/protos/node_whiteboard.proto

+1
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,7 @@ message TSystemStateInfo {
277277
optional string Name = 1;
278278
optional double Usage = 2 [(InsignificantChangePercent) = 30];
279279
optional uint32 Threads = 3;
280+
optional uint32 Limit = 4;
280281
}
281282

282283
message TEndpoint {

ydb/library/actors/core/actorsystem.cpp

+9
Original file line numberDiff line numberDiff line change
@@ -338,4 +338,13 @@ namespace NActors {
338338
CpuManager->Cleanup();
339339
Scheduler.Destroy();
340340
}
341+
342+
void TActorSystem::GetExecutorPoolState(i16 poolId, TExecutorPoolState &state) const {
343+
CpuManager->GetExecutorPoolState(poolId, state);
344+
}
345+
346+
void TActorSystem::GetExecutorPoolStates(std::vector<TExecutorPoolState> &states) const {
347+
CpuManager->GetExecutorPoolStates(states);
348+
}
349+
341350
}

ydb/library/actors/core/actorsystem.h

+3
Original file line numberDiff line numberDiff line change
@@ -306,5 +306,8 @@ namespace NActors {
306306
return CpuManager->GetBasicExecutorPools();
307307
}
308308

309+
void GetExecutorPoolState(i16 poolId, TExecutorPoolState &state) const;
310+
void GetExecutorPoolStates(std::vector<TExecutorPoolState> &states) const;
311+
309312
};
310313
}

ydb/library/actors/core/cpu_manager.cpp

+19
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "cpu_manager.h"
22
#include "executor_pool_jail.h"
3+
#include "mon_stats.h"
34
#include "probes.h"
45

56
#include "executor_pool_basic.h"
@@ -172,4 +173,22 @@ namespace NActors {
172173
}
173174
}
174175

176+
void GetExecutorPoolState(i16 poolId, TExecutorPoolState &state) const {
177+
if (poolId < ExecutorPoolCount) {
178+
Executors[poolId]->GetExecutorPoolState(state);
179+
}
180+
if (Shared) {
181+
TExecutorThreadStats sharedStats;
182+
Shared->GetExecutorPoolState(poolId, sharedStats);
183+
stats.Aggregate(sharedStats);
184+
}
185+
}
186+
187+
void GetExecutorPoolStates(std::vector<TExecutorPoolState> &states) const {
188+
states.resize(ExecutorPoolCount);
189+
for (i16 poolId = 0; poolId < ExecutorPoolCount; ++poolId) {
190+
GetExecutorPoolState(poolId, states[poolId]);
191+
}
192+
}
193+
175194
}

ydb/library/actors/core/cpu_manager.h

+3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "harmonizer.h"
66
#include "executor_pool.h"
77
#include "executor_pool_shared.h"
8+
#include "mon_stats.h"
89
#include <memory>
910

1011
namespace NActors {
@@ -47,6 +48,8 @@ namespace NActors {
4748
}
4849

4950
void GetPoolStats(ui32 poolId, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy, TVector<TExecutorThreadStats>& sharedStatsCopy) const;
51+
void GetExecutorPoolState(i16 poolId, TExecutorPoolState &state) const;
52+
void GetExecutorPoolStates(std::vector<TExecutorPoolState> &states) const;
5053

5154
THarmonizerStats GetHarmonizerStats() const {
5255
if (Harmonizer) {

ydb/library/actors/core/executor_pool.h

+5
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ namespace NActors {
99
struct TMailboxHeader;
1010
struct TWorkerContext;
1111
struct TExecutorPoolStats;
12+
struct TExecutorPoolState;
1213
struct TExecutorThreadStats;
1314
class TExecutorPoolJail;
1415
class ISchedulerCookie;
@@ -108,6 +109,10 @@ namespace NActors {
108109
Y_UNUSED(statsCopy);
109110
}
110111

112+
virtual void GetExecutorPoolState(TExecutorPoolState &poolState) {
113+
Y_UNUSED(poolState);
114+
}
115+
111116
virtual TString GetName() const {
112117
return TString();
113118
}

ydb/library/actors/core/executor_pool_basic.cpp

+21
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include "mailbox.h"
1010
#include "thread_context.h"
1111
#include <atomic>
12+
#include <memory>
1213
#include <ydb/library/actors/util/affinity.h>
1314
#include <ydb/library/actors/util/datetime.h>
1415

@@ -425,6 +426,26 @@ namespace NActors {
425426
}
426427
}
427428

429+
void TBasicExecutorPool::GetExecutorPoolState(TExecutorPoolState &poolState) const {
430+
ui64 ticks = 0;
431+
for (i16 i = 0; i < PoolThreads; ++i) {
432+
TExecutorThreadStats stats;
433+
Threads[i].Thread->GetCurrentStatsForHarmonizer(stats);
434+
ticks += stats.SafeElapsedTicks;
435+
}
436+
poolState.ElapsedUs = Ts2Us(ticks);
437+
poolState.CurrentLimit = GetThreadCount();
438+
poolState.MaxLimit = GetMaxThreadCount();
439+
poolState.MinLimit = GetDefaultThreadCoun();
440+
poolState.PossibleLimit = PoolThreads;
441+
if (Harmonizer) {
442+
TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId);
443+
poolState.PossibleLimit = stats.PotentialMaxThreadCount;
444+
} else {
445+
poolState.PossibleLimit = poolState.MaxLimit;
446+
}
447+
}
448+
428449
void TBasicExecutorPool::Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) {
429450
TAffinityGuard affinityGuard(Affinity());
430451

ydb/library/actors/core/executor_pool_basic.h

+1
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@ namespace NActors {
251251
void Shutdown() override;
252252

253253
void GetCurrentStats(TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const override;
254+
void GetExecutorPoolState(TExecutorPoolState &poolState) const override;
254255
TString GetName() const override {
255256
return PoolName;
256257
}

ydb/library/actors/core/executor_pool_io.cpp

+14
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,20 @@ namespace NActors {
148148
}
149149
}
150150

151+
void TIOExecutorPool::GetExecutorPoolState(TExecutorPoolState &poolState) const {
152+
ui64 ticks = 0;
153+
for (i16 i = 0; i < PoolThreads; ++i) {
154+
TExecutorThreadStats stats;
155+
Threads[i].Thread->GetCurrentStatsForHarmonizer(stats);
156+
ticks += stats.SafeElapsedTicks;
157+
}
158+
poolState.ElapsedUs = Ts2Us(ticks);
159+
poolState.CurrentLimit = PoolThreads;
160+
poolState.MaxLimit = PoolThreads;
161+
poolState.MinLimit = PoolThreads;
162+
poolState.PossibleLimit = PoolThreads;
163+
}
164+
151165
TString TIOExecutorPool::GetName() const {
152166
return PoolName;
153167
}

ydb/library/actors/core/executor_pool_io.h

+1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ namespace NActors {
4242
void Shutdown() override;
4343

4444
void GetCurrentStats(TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const override;
45+
void GetExecutorPoolState(TExecutorPoolState &poolState) const override;
4546
TString GetName() const override;
4647
};
4748
}

ydb/library/actors/core/executor_pool_shared.cpp

+10
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,16 @@ void TSharedExecutorPool::GetSharedStats(i16 poolId, std::vector<TExecutorThread
149149
}
150150
}
151151

152+
void TSharedExecutorPool::GetExecutorPoolState(i16 poolId, TExecutorPoolState &poolState) const {
153+
ui64 ticks = 0;
154+
for (i16 i = 0; i < PoolThreads; ++i) {
155+
TExecutorThreadStats stats;
156+
Threads[i].Thread->GetCurrentStatsForHarmonizer(poolId, stats);
157+
ticks += stats.SafeElapsedTicks;
158+
}
159+
poolState.ElapsedUs = Ts2Us(ticks);
160+
}
161+
152162
void TSharedExecutorPool::GetSharedStatsForHarmonizer(i16 poolId, std::vector<TExecutorThreadStats>& statsCopy) {
153163
statsCopy.resize(SharedThreadCount + 1);
154164
for (i16 i = 0; i < SharedThreadCount; ++i) {

ydb/library/actors/core/executor_pool_shared.h

+1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ namespace NActors {
4848
i16 GetSharedThreadCount() const;
4949

5050
TSharedPoolState GetState() const;
51+
void GetExecutorPoolState(i16 pool, TExecutorPoolState &poolState) const;
5152

5253
private:
5354
TSharedPoolState State;

ydb/library/actors/core/mon_stats.h

+16
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,22 @@ namespace NActors {
4040
ui64 Buckets[65];
4141
};
4242

43+
struct TExecutorPoolState {
44+
double ElapsedUs = 0;
45+
double CurrentLimit = 0;
46+
double PossibleMaxLimit = 0;
47+
double MaxLimit = 0;
48+
double MinLimit = 0;
49+
50+
void Aggregate(const TExecutorPoolState& other) {
51+
ElapsedUs += other.ElapsedUs;
52+
CurrentLimit += other.CurrentLimit;
53+
PossibleMaxLimit += other.PossibleMaxLimit;
54+
MaxLimit += other.MaxLimit;
55+
MinLimit += other.MinLimit;
56+
}
57+
};
58+
4359
struct TExecutorPoolStats {
4460
ui64 MaxUtilizationTime = 0;
4561
ui64 IncreasingThreadsByNeedyState = 0;

ydb/library/actors/helpers/pool_stats_collector.h

+3
Original file line numberDiff line numberDiff line change
@@ -214,13 +214,15 @@ class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> {
214214
THPTimer UsageTimer;
215215
TString Name;
216216
double Threads;
217+
double LimitThreads;
217218

218219
void Init(NMonitoring::TDynamicCounters* group, const TString& poolName, ui32 threads) {
219220
LastElapsedSeconds = 0;
220221
Usage = 0;
221222
UsageTimer.Reset();
222223
Name = poolName;
223224
Threads = threads;
225+
LimitThreads = threads;
224226

225227
PoolGroup = group->GetSubgroup("execpool", poolName);
226228

@@ -374,6 +376,7 @@ class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> {
374376
Y_UNUSED(stats);
375377
#endif
376378
Threads = poolStats.CurrentThreadCount;
379+
LimitThreads = poolStats.PotentialMaxThreadCount;
377380
}
378381
};
379382

0 commit comments

Comments
 (0)