Skip to content

Commit 51835ea

Browse files
authored
Merge bfe24b2 into 66d836f
2 parents 66d836f + bfe24b2 commit 51835ea

19 files changed

+159
-63
lines changed

ydb/core/base/pool_stats_collector.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,9 @@ class TStatsCollectingActor : public NActors::TStatsCollectingActor {
4646
void OnWakeup(const TActorContext &ctx) override {
4747
MiniKQLPoolStats.Update();
4848

49-
TVector<std::tuple<TString, double, ui32>> pools;
49+
TVector<std::tuple<TString, double, ui32, ui32>> pools;
5050
for (const auto& pool : PoolCounters) {
51-
pools.emplace_back(pool.Name, pool.Usage, pool.Threads);
51+
pools.emplace_back(pool.Name, pool.Usage, pool.Threads, pool.LimitThreads);
5252
}
5353

5454
ctx.Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(ctx.SelfID.NodeId()), new NNodeWhiteboard::TEvWhiteboard::TEvSystemStateUpdate(pools));

ydb/core/mind/hive/tx__update_tablet_metrics.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class TTxUpdateTabletMetrics : public TTransactionBase<THive> {
5454
}
5555
TNodeInfo* node = Self->FindNode(nodeId);
5656
if (node != nullptr) {
57+
node->UpdateResourceMaximum(record.GetResourceMaximum());
5758
node->UpdateResourceTotalUsage(record);
5859
node->Statistics.SetLastAliveTimestamp(now.MilliSeconds());
5960
node->ActualizeNodeStatistics(now);

ydb/core/mind/local.cpp

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ class TLocalNodeRegistrar : public TActorBootstrapped<TLocalNodeRegistrar> {
109109
ui64 UserPoolUsage = 0; // (usage uS x threads) / sec
110110
ui64 MemUsage = 0;
111111
ui64 MemLimit = 0;
112+
ui64 CpuLimit = 0; // PotentialMaxThreadCount of UserPool
112113
double NodeUsage = 0;
113114

114115
bool SentDrainNode = false;
@@ -272,28 +273,28 @@ class TLocalNodeRegistrar : public TActorBootstrapped<TLocalNodeRegistrar> {
272273
HandlePipeDestroyed(ctx);
273274
}
274275

275-
void SendStatusOk(const TActorContext &ctx) {
276-
LOG_DEBUG_S(ctx, NKikimrServices::LOCAL, "TLocalNodeRegistrar SendStatusOk");
277-
TAutoPtr<TEvLocal::TEvStatus> eventStatus = new TEvLocal::TEvStatus(TEvLocal::TEvStatus::StatusOk);
278-
auto& record = eventStatus->Record;
279-
record.SetStartTime(StartTime.GetValue());
280-
record.MutableResourceMaximum()->CopyFrom(ResourceLimit);
281-
if (!record.GetResourceMaximum().HasCPU()) {
282-
TExecutorPoolStats poolStats;
283-
TVector<TExecutorThreadStats> statsCopy;
284-
TVector<TExecutorThreadStats> sharedStatsCopy;
285-
ctx.ExecutorThread.ActorSystem->GetPoolStats(AppData()->UserPoolId, poolStats, statsCopy, sharedStatsCopy);
286-
if (!statsCopy.empty()) {
287-
record.MutableResourceMaximum()->SetCPU(poolStats.CurrentThreadCount * 1000000);
276+
void FillResourceMaximum(NKikimrTabletBase::TMetrics* record) {
277+
record->CopyFrom(ResourceLimit);
278+
if (!record->HasCPU()) {
279+
if (CpuLimit != 0) {
280+
record->SetCPU(CpuLimit);
288281
}
289282
}
290-
if (!record.GetResourceMaximum().HasMemory()) {
283+
if (!record->HasMemory()) {
291284
if (MemLimit != 0) {
292-
record.MutableResourceMaximum()->SetMemory(MemLimit);
285+
record->SetMemory(MemLimit);
293286
} else {
294-
record.MutableResourceMaximum()->SetMemory(NSystemInfo::TotalMemorySize());
287+
record->SetMemory(NSystemInfo::TotalMemorySize());
295288
}
296289
}
290+
}
291+
292+
void SendStatusOk(const TActorContext &ctx) {
293+
LOG_DEBUG_S(ctx, NKikimrServices::LOCAL, "TLocalNodeRegistrar SendStatusOk");
294+
TAutoPtr<TEvLocal::TEvStatus> eventStatus = new TEvLocal::TEvStatus(TEvLocal::TEvStatus::StatusOk);
295+
auto& record = eventStatus->Record;
296+
record.SetStartTime(StartTime.GetValue());
297+
FillResourceMaximum(record.MutableResourceMaximum());
297298
NTabletPipe::SendData(ctx, HivePipeClient, eventStatus.Release());
298299
}
299300

@@ -587,6 +588,7 @@ class TLocalNodeRegistrar : public TActorBootstrapped<TLocalNodeRegistrar> {
587588
record.MutableTotalResourceUsage()->SetMemory(MemUsage);
588589
}
589590
record.SetTotalNodeUsage(NodeUsage);
591+
FillResourceMaximum(record.MutableResourceMaximum());
590592
NTabletPipe::SendData(ctx, HivePipeClient, event.Release());
591593
SendTabletMetricsTime = ctx.Now();
592594
} else {
@@ -649,7 +651,8 @@ class TLocalNodeRegistrar : public TActorBootstrapped<TLocalNodeRegistrar> {
649651
const NKikimrWhiteboard::TSystemStateInfo& info = record.GetSystemStateInfo(0);
650652
if (static_cast<ui32>(info.PoolStatsSize()) > AppData()->UserPoolId) {
651653
const auto& poolStats(info.GetPoolStats(AppData()->UserPoolId));
652-
UserPoolUsage = poolStats.usage() * poolStats.threads() * 1000000; // uS
654+
CpuLimit = poolStats.limit() * 1'000'000; // microseconds
655+
UserPoolUsage = poolStats.usage() * CpuLimit; // microseconds
653656
}
654657

655658
// Note: we use allocated memory because MemoryUsed(AnonRSS) has lag

ydb/core/node_whiteboard/node_whiteboard.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,12 +361,13 @@ struct TEvWhiteboard{
361361
}
362362
}
363363

364-
TEvSystemStateUpdate(const TVector<std::tuple<TString, double, ui32>>& poolStats) {
364+
TEvSystemStateUpdate(const TVector<std::tuple<TString, double, ui32, ui32>>& poolStats) {
365365
for (const auto& row : poolStats) {
366366
auto& pb = *Record.AddPoolStats();
367367
pb.SetName(std::get<0>(row));
368368
pb.SetUsage(std::get<1>(row));
369369
pb.SetThreads(std::get<2>(row));
370+
pb.SetLimit(std::get<3>(row));
370371
}
371372
}
372373

ydb/core/protos/hive.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ message TEvTabletMetrics {
246246
repeated TTabletMetrics TabletMetrics = 1;
247247
optional NKikimrTabletBase.TMetrics TotalResourceUsage = 2;
248248
optional double TotalNodeUsage = 3;
249+
optional NKikimrTabletBase.TMetrics ResourceMaximum = 4;
249250
}
250251

251252
message TEvReassignTablet {

ydb/core/protos/node_whiteboard.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,7 @@ message TSystemStateInfo {
277277
optional string Name = 1;
278278
optional double Usage = 2 [(InsignificantChangePercent) = 30];
279279
optional uint32 Threads = 3;
280+
optional uint32 Limit = 4;
280281
}
281282

282283
message TEndpoint {

ydb/library/actors/core/actorsystem.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,4 +338,13 @@ namespace NActors {
338338
CpuManager->Cleanup();
339339
Scheduler.Destroy();
340340
}
341+
342+
void TActorSystem::GetExecutorPoolState(i16 poolId, TExecutorPoolState &state) const {
343+
CpuManager->GetExecutorPoolState(poolId, state);
344+
}
345+
346+
void TActorSystem::GetExecutorPoolStates(std::vector<TExecutorPoolState> &states) const {
347+
CpuManager->GetExecutorPoolStates(states);
348+
}
349+
341350
}

ydb/library/actors/core/actorsystem.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,5 +306,8 @@ namespace NActors {
306306
return CpuManager->GetBasicExecutorPools();
307307
}
308308

309+
void GetExecutorPoolState(i16 poolId, TExecutorPoolState &state) const;
310+
void GetExecutorPoolStates(std::vector<TExecutorPoolState> &states) const;
311+
309312
};
310313
}

ydb/library/actors/core/cpu_manager.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "cpu_manager.h"
22
#include "executor_pool_jail.h"
3+
#include "mon_stats.h"
34
#include "probes.h"
45

56
#include "executor_pool_basic.h"
@@ -172,4 +173,17 @@ namespace NActors {
172173
}
173174
}
174175

176+
void TCpuManager::GetExecutorPoolState(i16 poolId, TExecutorPoolState &state) const {
177+
if (static_cast<ui32>(poolId) < ExecutorPoolCount) {
178+
Executors[poolId]->GetExecutorPoolState(state);
179+
}
180+
}
181+
182+
void TCpuManager::GetExecutorPoolStates(std::vector<TExecutorPoolState> &states) const {
183+
states.resize(ExecutorPoolCount);
184+
for (i16 poolId = 0; poolId < static_cast<ui16>(ExecutorPoolCount); ++poolId) {
185+
GetExecutorPoolState(poolId, states[poolId]);
186+
}
187+
}
188+
175189
}

ydb/library/actors/core/cpu_manager.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "harmonizer.h"
66
#include "executor_pool.h"
77
#include "executor_pool_shared.h"
8+
#include "mon_stats.h"
89
#include <memory>
910

1011
namespace NActors {
@@ -47,6 +48,8 @@ namespace NActors {
4748
}
4849

4950
void GetPoolStats(ui32 poolId, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy, TVector<TExecutorThreadStats>& sharedStatsCopy) const;
51+
void GetExecutorPoolState(i16 poolId, TExecutorPoolState &state) const;
52+
void GetExecutorPoolStates(std::vector<TExecutorPoolState> &states) const;
5053

5154
THarmonizerStats GetHarmonizerStats() const {
5255
if (Harmonizer) {

ydb/library/actors/core/executor_pool.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ namespace NActors {
99
struct TMailboxHeader;
1010
struct TWorkerContext;
1111
struct TExecutorPoolStats;
12+
struct TExecutorPoolState;
1213
struct TExecutorThreadStats;
1314
class TExecutorPoolJail;
1415
class ISchedulerCookie;
@@ -108,6 +109,10 @@ namespace NActors {
108109
Y_UNUSED(statsCopy);
109110
}
110111

112+
virtual void GetExecutorPoolState(TExecutorPoolState &poolState) const {
113+
Y_UNUSED(poolState);
114+
}
115+
111116
virtual TString GetName() const {
112117
return TString();
113118
}

ydb/library/actors/core/executor_pool_basic.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include "mailbox.h"
1010
#include "thread_context.h"
1111
#include <atomic>
12+
#include <memory>
1213
#include <ydb/library/actors/util/affinity.h>
1314
#include <ydb/library/actors/util/datetime.h>
1415

@@ -425,6 +426,19 @@ namespace NActors {
425426
}
426427
}
427428

429+
void TBasicExecutorPool::GetExecutorPoolState(TExecutorPoolState &poolState) const {
430+
if (Harmonizer) {
431+
TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId);
432+
poolState.UsedCpu = stats.AvgConsumedCpu;
433+
poolState.PossibleMaxLimit = stats.PotentialMaxThreadCount;
434+
} else {
435+
poolState.PossibleMaxLimit = poolState.MaxLimit;
436+
}
437+
poolState.CurrentLimit = GetThreadCount();
438+
poolState.MaxLimit = GetMaxThreadCount();
439+
poolState.MinLimit = GetDefaultThreadCount();
440+
}
441+
428442
void TBasicExecutorPool::Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) {
429443
TAffinityGuard affinityGuard(Affinity());
430444

ydb/library/actors/core/executor_pool_basic.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@ namespace NActors {
251251
void Shutdown() override;
252252

253253
void GetCurrentStats(TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const override;
254+
void GetExecutorPoolState(TExecutorPoolState &poolState) const override;
254255
TString GetName() const override {
255256
return PoolName;
256257
}

ydb/library/actors/core/executor_pool_io.cpp

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,16 @@ namespace NActors {
1212
, PoolName(poolName)
1313
{}
1414

15-
TIOExecutorPool::TIOExecutorPool(const TIOExecutorPoolConfig& cfg)
15+
TIOExecutorPool::TIOExecutorPool(const TIOExecutorPoolConfig& cfg, IHarmonizer *harmonizer)
1616
: TIOExecutorPool(
1717
cfg.PoolId,
1818
cfg.Threads,
1919
cfg.PoolName,
2020
new TAffinity(cfg.Affinity)
2121
)
22-
{}
22+
{
23+
Harmonizer = harmonizer;
24+
}
2325

2426
TIOExecutorPool::~TIOExecutorPool() {
2527
Threads.Destroy();
@@ -148,6 +150,17 @@ namespace NActors {
148150
}
149151
}
150152

153+
void TIOExecutorPool::GetExecutorPoolState(TExecutorPoolState &poolState) const {
154+
if (Harmonizer) {
155+
TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId);
156+
poolState.UsedCpu = stats.AvgConsumedCpu;
157+
}
158+
poolState.CurrentLimit = PoolThreads;
159+
poolState.MaxLimit = PoolThreads;
160+
poolState.MinLimit = PoolThreads;
161+
poolState.PossibleMaxLimit = PoolThreads;
162+
}
163+
151164
TString TIOExecutorPool::GetName() const {
152165
return PoolName;
153166
}

ydb/library/actors/core/executor_pool_io.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include "actorsystem.h"
44
#include "executor_thread.h"
55
#include "executor_thread_ctx.h"
6+
#include "harmonizer.h"
67
#include "scheduler_queue.h"
78
#include "executor_pool_base.h"
89
#include <ydb/library/actors/actor_type/indexes.h>
@@ -20,12 +21,13 @@ namespace NActors {
2021

2122
THolder<NSchedulerQueue::TQueueType> ScheduleQueue;
2223
TTicketLock ScheduleLock;
24+
IHarmonizer *Harmonizer = nullptr;
2325

2426
const TString PoolName;
2527
const ui32 ActorSystemIndex = NActors::TActorTypeOperator::GetActorSystemIndex();
2628
public:
2729
TIOExecutorPool(ui32 poolId, ui32 threads, const TString& poolName = "", TAffinity* affinity = nullptr);
28-
explicit TIOExecutorPool(const TIOExecutorPoolConfig& cfg);
30+
explicit TIOExecutorPool(const TIOExecutorPoolConfig& cfg, IHarmonizer *harmonizer = nullptr);
2931
~TIOExecutorPool();
3032

3133
ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) override;
@@ -42,6 +44,7 @@ namespace NActors {
4244
void Shutdown() override;
4345

4446
void GetCurrentStats(TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const override;
47+
void GetExecutorPoolState(TExecutorPoolState &poolState) const override;
4548
TString GetName() const override;
4649
};
4750
}

0 commit comments

Comments
 (0)