Skip to content

Commit 8f1e398

Browse files
authored
YQ kqprun pass actor system config (#7693)
1 parent bbad9a4 commit 8f1e398

File tree

12 files changed

+262
-124
lines changed

12 files changed

+262
-124
lines changed
+115
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
#include "config_helpers.h"
2+
3+
#include <ydb/library/actors/util/affinity.h>
4+
5+
6+
namespace NKikimr {
7+
8+
namespace NActorSystemConfigHelpers {
9+
10+
namespace {
11+
12+
template <class TConfig>
13+
static TCpuMask ParseAffinity(const TConfig& cfg) {
14+
TCpuMask result;
15+
if (cfg.GetCpuList()) {
16+
result = TCpuMask(cfg.GetCpuList());
17+
} else if (cfg.GetX().size() > 0) {
18+
result = TCpuMask(cfg.GetX().data(), cfg.GetX().size());
19+
} else { // use all processors
20+
TAffinity available;
21+
available.Current();
22+
result = available;
23+
}
24+
if (cfg.GetExcludeCpuList()) {
25+
result = result - TCpuMask(cfg.GetExcludeCpuList());
26+
}
27+
return result;
28+
}
29+
30+
TDuration GetSelfPingInterval(const NKikimrConfig::TActorSystemConfig& systemConfig) {
31+
return systemConfig.HasSelfPingInterval()
32+
? TDuration::MicroSeconds(systemConfig.GetSelfPingInterval())
33+
: TDuration::MilliSeconds(10);
34+
}
35+
36+
NActors::EASProfile ConvertActorSystemProfile(NKikimrConfig::TActorSystemConfig::EActorSystemProfile profile) {
37+
switch (profile) {
38+
case NKikimrConfig::TActorSystemConfig::DEFAULT:
39+
return NActors::EASProfile::Default;
40+
case NKikimrConfig::TActorSystemConfig::LOW_CPU_CONSUMPTION:
41+
return NActors::EASProfile::LowCpuConsumption;
42+
case NKikimrConfig::TActorSystemConfig::LOW_LATENCY:
43+
return NActors::EASProfile::LowLatency;
44+
}
45+
}
46+
47+
} // anonymous namespace
48+
49+
void AddExecutorPool(NActors::TCpuManagerConfig& cpuManager, const NKikimrConfig::TActorSystemConfig::TExecutor& poolConfig, const NKikimrConfig::TActorSystemConfig& systemConfig, ui32 poolId, NMonitoring::TDynamicCounterPtr counters) {
50+
switch (poolConfig.GetType()) {
51+
case NKikimrConfig::TActorSystemConfig::TExecutor::BASIC: {
52+
NActors::TBasicExecutorPoolConfig basic;
53+
basic.PoolId = poolId;
54+
basic.PoolName = poolConfig.GetName();
55+
if (poolConfig.HasMaxAvgPingDeviation() && counters) {
56+
auto poolGroup = counters->GetSubgroup("execpool", basic.PoolName);
57+
auto &poolInfo = cpuManager.PingInfoByPool[poolId];
58+
poolInfo.AvgPingCounter = poolGroup->GetCounter("SelfPingAvgUs", false);
59+
poolInfo.AvgPingCounterWithSmallWindow = poolGroup->GetCounter("SelfPingAvgUsIn1s", false);
60+
TDuration maxAvgPing = GetSelfPingInterval(systemConfig) + TDuration::MicroSeconds(poolConfig.GetMaxAvgPingDeviation());
61+
poolInfo.MaxAvgPingUs = maxAvgPing.MicroSeconds();
62+
}
63+
basic.Threads = Max(poolConfig.GetThreads(), poolConfig.GetMaxThreads());
64+
basic.SpinThreshold = poolConfig.GetSpinThreshold();
65+
basic.Affinity = ParseAffinity(poolConfig.GetAffinity());
66+
basic.RealtimePriority = poolConfig.GetRealtimePriority();
67+
basic.HasSharedThread = poolConfig.GetHasSharedThread();
68+
if (poolConfig.HasTimePerMailboxMicroSecs()) {
69+
basic.TimePerMailbox = TDuration::MicroSeconds(poolConfig.GetTimePerMailboxMicroSecs());
70+
} else if (systemConfig.HasTimePerMailboxMicroSecs()) {
71+
basic.TimePerMailbox = TDuration::MicroSeconds(systemConfig.GetTimePerMailboxMicroSecs());
72+
}
73+
if (poolConfig.HasEventsPerMailbox()) {
74+
basic.EventsPerMailbox = poolConfig.GetEventsPerMailbox();
75+
} else if (systemConfig.HasEventsPerMailbox()) {
76+
basic.EventsPerMailbox = systemConfig.GetEventsPerMailbox();
77+
}
78+
basic.ActorSystemProfile = ConvertActorSystemProfile(systemConfig.GetActorSystemProfile());
79+
Y_ABORT_UNLESS(basic.EventsPerMailbox != 0);
80+
basic.MinThreadCount = poolConfig.GetMinThreads();
81+
basic.MaxThreadCount = poolConfig.GetMaxThreads();
82+
basic.DefaultThreadCount = poolConfig.GetThreads();
83+
basic.Priority = poolConfig.GetPriority();
84+
cpuManager.Basic.emplace_back(std::move(basic));
85+
break;
86+
}
87+
88+
case NKikimrConfig::TActorSystemConfig::TExecutor::IO: {
89+
NActors::TIOExecutorPoolConfig io;
90+
io.PoolId = poolId;
91+
io.PoolName = poolConfig.GetName();
92+
io.Threads = poolConfig.GetThreads();
93+
io.Affinity = ParseAffinity(poolConfig.GetAffinity());
94+
cpuManager.IO.emplace_back(std::move(io));
95+
break;
96+
}
97+
98+
default:
99+
Y_ABORT();
100+
}
101+
}
102+
103+
NActors::TSchedulerConfig CreateSchedulerConfig(const NKikimrConfig::TActorSystemConfig::TScheduler& config) {
104+
const ui64 resolution = config.HasResolution() ? config.GetResolution() : 1024;
105+
Y_DEBUG_ABORT_UNLESS((resolution & (resolution - 1)) == 0); // resolution must be power of 2
106+
const ui64 spinThreshold = config.HasSpinThreshold() ? config.GetSpinThreshold() : 0;
107+
const ui64 progressThreshold = config.HasProgressThreshold() ? config.GetProgressThreshold() : 10000;
108+
const bool useSchedulerActor = config.HasUseSchedulerActor() ? config.GetUseSchedulerActor() : false;
109+
110+
return NActors::TSchedulerConfig(resolution, spinThreshold, progressThreshold, useSchedulerActor);
111+
}
112+
113+
} // namespace NActorSystemConfigHelpers
114+
115+
} // namespace NKikimr
+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#pragma once
2+
3+
#include <ydb/core/protos/config.pb.h>
4+
5+
#include <ydb/library/actors/core/config.h>
6+
7+
8+
namespace NKikimr {
9+
10+
namespace NActorSystemConfigHelpers {
11+
12+
void AddExecutorPool(NActors::TCpuManagerConfig& cpuManager, const NKikimrConfig::TActorSystemConfig::TExecutor& poolConfig, const NKikimrConfig::TActorSystemConfig& systemConfig, ui32 poolId, NMonitoring::TDynamicCounterPtr counters);
13+
14+
NActors::TSchedulerConfig CreateSchedulerConfig(const NKikimrConfig::TActorSystemConfig::TScheduler& config);
15+
16+
} // namespace NActorSystemConfigHelpers
17+
18+
} // namespace NKikimr

ydb/core/driver_lib/run/kikimr_services_initializers.cpp

+4-97
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "auto_config_initializer.h"
2+
#include "config_helpers.h"
23
#include "config.h"
34
#include "kikimr_services_initializers.h"
45
#include "service_initializer.h"
@@ -277,42 +278,6 @@ IKikimrServicesInitializer::IKikimrServicesInitializer(const TKikimrRunConfig& r
277278

278279
// TBasicServicesInitializer
279280

280-
template <class TConfig>
281-
static TCpuMask ParseAffinity(const TConfig& cfg) {
282-
TCpuMask result;
283-
if (cfg.GetCpuList()) {
284-
result = TCpuMask(cfg.GetCpuList());
285-
} else if (cfg.GetX().size() > 0) {
286-
result = TCpuMask(cfg.GetX().data(), cfg.GetX().size());
287-
} else { // use all processors
288-
TAffinity available;
289-
available.Current();
290-
result = available;
291-
}
292-
if (cfg.GetExcludeCpuList()) {
293-
result = result - TCpuMask(cfg.GetExcludeCpuList());
294-
}
295-
return result;
296-
}
297-
298-
TDuration GetSelfPingInterval(const NKikimrConfig::TActorSystemConfig& systemConfig) {
299-
return systemConfig.HasSelfPingInterval()
300-
? TDuration::MicroSeconds(systemConfig.GetSelfPingInterval())
301-
: TDuration::MilliSeconds(10);
302-
}
303-
304-
305-
NActors::EASProfile ConvertActorSystemProfile(NKikimrConfig::TActorSystemConfig::EActorSystemProfile profile) {
306-
switch (profile) {
307-
case NKikimrConfig::TActorSystemConfig::DEFAULT:
308-
return NActors::EASProfile::Default;
309-
case NKikimrConfig::TActorSystemConfig::LOW_CPU_CONSUMPTION:
310-
return NActors::EASProfile::LowCpuConsumption;
311-
case NKikimrConfig::TActorSystemConfig::LOW_LATENCY:
312-
return NActors::EASProfile::LowLatency;
313-
}
314-
}
315-
316281
void AddExecutorPool(
317282
TCpuManagerConfig& cpuManager,
318283
const NKikimrConfig::TActorSystemConfig::TExecutor& poolConfig,
@@ -321,55 +286,7 @@ void AddExecutorPool(
321286
const NKikimr::TAppData* appData)
322287
{
323288
const auto counters = GetServiceCounters(appData->Counters, "utils");
324-
switch (poolConfig.GetType()) {
325-
case NKikimrConfig::TActorSystemConfig::TExecutor::BASIC: {
326-
TBasicExecutorPoolConfig basic;
327-
basic.PoolId = poolId;
328-
basic.PoolName = poolConfig.GetName();
329-
if (poolConfig.HasMaxAvgPingDeviation()) {
330-
auto poolGroup = counters->GetSubgroup("execpool", basic.PoolName);
331-
auto &poolInfo = cpuManager.PingInfoByPool[poolId];
332-
poolInfo.AvgPingCounter = poolGroup->GetCounter("SelfPingAvgUs", false);
333-
poolInfo.AvgPingCounterWithSmallWindow = poolGroup->GetCounter("SelfPingAvgUsIn1s", false);
334-
TDuration maxAvgPing = GetSelfPingInterval(systemConfig) + TDuration::MicroSeconds(poolConfig.GetMaxAvgPingDeviation());
335-
poolInfo.MaxAvgPingUs = maxAvgPing.MicroSeconds();
336-
}
337-
basic.Threads = Max(poolConfig.GetThreads(), poolConfig.GetMaxThreads());
338-
basic.SpinThreshold = poolConfig.GetSpinThreshold();
339-
basic.Affinity = ParseAffinity(poolConfig.GetAffinity());
340-
basic.RealtimePriority = poolConfig.GetRealtimePriority();
341-
basic.HasSharedThread = poolConfig.GetHasSharedThread();
342-
if (poolConfig.HasTimePerMailboxMicroSecs()) {
343-
basic.TimePerMailbox = TDuration::MicroSeconds(poolConfig.GetTimePerMailboxMicroSecs());
344-
} else if (systemConfig.HasTimePerMailboxMicroSecs()) {
345-
basic.TimePerMailbox = TDuration::MicroSeconds(systemConfig.GetTimePerMailboxMicroSecs());
346-
}
347-
if (poolConfig.HasEventsPerMailbox()) {
348-
basic.EventsPerMailbox = poolConfig.GetEventsPerMailbox();
349-
} else if (systemConfig.HasEventsPerMailbox()) {
350-
basic.EventsPerMailbox = systemConfig.GetEventsPerMailbox();
351-
}
352-
basic.ActorSystemProfile = ConvertActorSystemProfile(systemConfig.GetActorSystemProfile());
353-
Y_ABORT_UNLESS(basic.EventsPerMailbox != 0);
354-
basic.MinThreadCount = poolConfig.GetMinThreads();
355-
basic.MaxThreadCount = poolConfig.GetMaxThreads();
356-
basic.DefaultThreadCount = poolConfig.GetThreads();
357-
basic.Priority = poolConfig.GetPriority();
358-
cpuManager.Basic.emplace_back(std::move(basic));
359-
break;
360-
}
361-
case NKikimrConfig::TActorSystemConfig::TExecutor::IO: {
362-
TIOExecutorPoolConfig io;
363-
io.PoolId = poolId;
364-
io.PoolName = poolConfig.GetName();
365-
io.Threads = poolConfig.GetThreads();
366-
io.Affinity = ParseAffinity(poolConfig.GetAffinity());
367-
cpuManager.IO.emplace_back(std::move(io));
368-
break;
369-
}
370-
default:
371-
Y_ABORT();
372-
}
289+
NActorSystemConfigHelpers::AddExecutorPool(cpuManager, poolConfig, systemConfig, poolId, counters);
373290
}
374291

375292
static TCpuManagerConfig CreateCpuManagerConfig(const NKikimrConfig::TActorSystemConfig& config,
@@ -383,16 +300,6 @@ static TCpuManagerConfig CreateCpuManagerConfig(const NKikimrConfig::TActorSyste
383300
return cpuManager;
384301
}
385302

386-
static TSchedulerConfig CreateSchedulerConfig(const NKikimrConfig::TActorSystemConfig::TScheduler &config) {
387-
const ui64 resolution = config.HasResolution() ? config.GetResolution() : 1024;
388-
Y_DEBUG_ABORT_UNLESS((resolution & (resolution - 1)) == 0); // resolution must be power of 2
389-
const ui64 spinThreshold = config.HasSpinThreshold() ? config.GetSpinThreshold() : 0;
390-
const ui64 progressThreshold = config.HasProgressThreshold() ? config.GetProgressThreshold() : 10000;
391-
const bool useSchedulerActor = config.HasUseSchedulerActor() ? config.GetUseSchedulerActor() : false;
392-
393-
return TSchedulerConfig(resolution, spinThreshold, progressThreshold, useSchedulerActor);
394-
}
395-
396303
static bool IsServiceInitialized(NActors::TActorSystemSetup* setup, TActorId service)
397304
{
398305
for (auto &pr : setup->LocalServices)
@@ -601,7 +508,7 @@ void TBasicServicesInitializer::InitializeServices(NActors::TActorSystemSetup* s
601508
setup->CpuManager = CreateCpuManagerConfig(systemConfig, appData);
602509
setup->MonitorStuckActors = systemConfig.GetMonitorStuckActors();
603510

604-
auto schedulerConfig = CreateSchedulerConfig(systemConfig.GetScheduler());
511+
auto schedulerConfig = NActorSystemConfigHelpers::CreateSchedulerConfig(systemConfig.GetScheduler());
605512
schedulerConfig.MonCounters = GetServiceCounters(counters, "utils");
606513
setup->Scheduler.Reset(CreateSchedulerThread(schedulerConfig));
607514
setup->LocalServices.emplace_back(MakeIoDispatcherActorId(), TActorSetupCmd(CreateIoDispatcherActor(
@@ -1265,7 +1172,7 @@ void TSchedulerActorInitializer::InitializeServices(
12651172
NActors::TActorSystemSetup* setup,
12661173
const NKikimr::TAppData* appData) {
12671174
auto& systemConfig = Config.GetActorSystemConfig();
1268-
NActors::IActor *schedulerActor = CreateSchedulerActor(CreateSchedulerConfig(systemConfig.GetScheduler()));
1175+
NActors::IActor *schedulerActor = CreateSchedulerActor(NActorSystemConfigHelpers::CreateSchedulerConfig(systemConfig.GetScheduler()));
12691176
if (schedulerActor) {
12701177
NActors::TActorSetupCmd schedulerActorCmd(schedulerActor, NActors::TMailboxType::ReadAsFilled, appData->SystemPoolId);
12711178
setup->LocalServices.emplace_back(MakeSchedulerActorId(), std::move(schedulerActorCmd));

ydb/core/driver_lib/run/ya.make

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ SRCS(
44
auto_config_initializer.cpp
55
config.cpp
66
config.h
7+
config_helpers.cpp
78
config_parser.cpp
89
config_parser.h
910
driver.h

ydb/core/testlib/actors/test_runtime.cpp

+19-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
#include <ydb/library/actors/core/executor_pool_basic.h>
1313
#include <ydb/library/actors/core/executor_pool_io.h>
14+
#include <ydb/library/actors/core/scheduler_basic.h>
1415
#include <ydb/library/actors/interconnect/interconnect_impl.h>
1516

1617
#include <ydb/core/protos/datashard_config.pb.h>
@@ -49,6 +50,11 @@ namespace NActors {
4950
NeedStatsCollectors = true;
5051
}
5152

53+
void TTestActorRuntime::SetupActorSystemConfig(const TActorSystemSetupConfig& config, const TActorSystemPools& pools) {
54+
ActorSystemSetupConfig = config;
55+
ActorSystemPools = pools;
56+
}
57+
5258
TTestActorRuntime::TTestActorRuntime(THeSingleSystemEnv d)
5359
: TPortManager(false)
5460
, TTestActorRuntimeBase{d}
@@ -131,7 +137,7 @@ namespace NActors {
131137
node->ActorSystem = MakeActorSystem(nodeIndex, node);
132138
node->ExecutorThread.Reset(new TExecutorThread(0, 0, node->ActorSystem.Get(), node->SchedulerPool.Get(), node->MailboxTable.Get(), "TestExecutor"));
133139
} else {
134-
node->AppData0.reset(new NKikimr::TAppData(0, 1, 2, 3, { }, app0->TypeRegistry, app0->FunctionRegistry, app0->FormatFactory, nullptr));
140+
node->AppData0.reset(new NKikimr::TAppData(ActorSystemPools.SystemPoolId, ActorSystemPools.UserPoolId, ActorSystemPools.IOPoolId, ActorSystemPools.BatchPoolId, ActorSystemPools.ServicePools, app0->TypeRegistry, app0->FunctionRegistry, app0->FormatFactory, nullptr));
135141
node->ActorSystem = MakeActorSystem(nodeIndex, node);
136142
}
137143
node->LogSettings->MessagePrefix = " node " + ToString(nodeId);
@@ -219,6 +225,18 @@ namespace NActors {
219225
}
220226

221227
void TTestActorRuntime::InitActorSystemSetup(TActorSystemSetup& setup, TNodeDataBase* node) {
228+
if (ActorSystemSetupConfig) {
229+
setup.Executors.Reset();
230+
setup.ExecutorsCount = 0;
231+
232+
setup.CpuManager = ActorSystemSetupConfig->CpuManagerConfig;
233+
setup.MonitorStuckActors = ActorSystemSetupConfig->MonitorStuckActors;
234+
235+
auto schedulerConfig = ActorSystemSetupConfig->SchedulerConfig;
236+
schedulerConfig.MonCounters = NKikimr::GetServiceCounters(node->DynamicCounters, "utils");
237+
setup.Scheduler.Reset(CreateSchedulerThread(schedulerConfig));
238+
}
239+
222240
if (NeedMonitoring && NeedStatsCollectors) {
223241
NActors::IActor* statsCollector = NKikimr::CreateStatsCollector(1, setup, node->DynamicCounters);
224242
setup.LocalServices.push_back({

ydb/core/testlib/actors/test_runtime.h

+17
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,20 @@ namespace NActors {
5353
std::vector<TIntrusivePtr<NKikimr::TControlBoard>> Icb;
5454
};
5555

56+
struct TActorSystemSetupConfig {
57+
TCpuManagerConfig CpuManagerConfig;
58+
TSchedulerConfig SchedulerConfig;
59+
bool MonitorStuckActors = false;
60+
};
61+
62+
struct TActorSystemPools {
63+
ui32 SystemPoolId = 0;
64+
ui32 UserPoolId = 1;
65+
ui32 IOPoolId = 2;
66+
ui32 BatchPoolId = 3;
67+
TMap<TString, ui32> ServicePools = {};
68+
};
69+
5670
TTestActorRuntime(THeSingleSystemEnv d);
5771
TTestActorRuntime(ui32 nodeCount, ui32 dataCenterCount, bool UseRealThreads);
5872
TTestActorRuntime(ui32 nodeCount, ui32 dataCenterCount);
@@ -63,6 +77,7 @@ namespace NActors {
6377
void AddAppDataInit(std::function<void(ui32, NKikimr::TAppData&)> callback);
6478
virtual void Initialize(TEgg);
6579
void SetupStatsCollectors();
80+
void SetupActorSystemConfig(const TActorSystemSetupConfig& config, const TActorSystemPools& pools);
6681

6782
ui16 GetMonPort(ui32 nodeIndex = 0) const;
6883

@@ -125,5 +140,7 @@ namespace NActors {
125140
TActorId SleepEdgeActor;
126141
TVector<std::function<void(ui32, NKikimr::TAppData&)>> AppDataInit_;
127142
bool NeedStatsCollectors = false;
143+
std::optional<TActorSystemSetupConfig> ActorSystemSetupConfig;
144+
TActorSystemPools ActorSystemPools;
128145
};
129146
} // namespace NActors

0 commit comments

Comments
 (0)