Skip to content

Commit 9324f59

Browse files
authored
KqpRun added CPU monitoring and yaml configs (#6675)
1 parent 209069f commit 9324f59

File tree

9 files changed

+47
-12
lines changed

9 files changed

+47
-12
lines changed

ydb/core/testlib/actors/test_runtime.cpp

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <ydb/core/base/appdata.h>
44
#include <ydb/core/base/blobstorage.h>
55
#include <ydb/core/base/counters.h>
6+
#include <ydb/core/base/pool_stats_collector.h>
67
#include <ydb/core/mon/sync_http_mon.h>
78
#include <ydb/core/mon/async_http_mon.h>
89
#include <ydb/core/mon_alloc/profiler.h>
@@ -43,6 +44,10 @@ namespace NActors {
4344
InitNodes();
4445
}
4546

47+
void TTestActorRuntime::SetupStatsCollectors() {
48+
NeedStatsCollectors = true;
49+
}
50+
4651
TTestActorRuntime::TTestActorRuntime(THeSingleSystemEnv d)
4752
: TPortManager(false)
4853
, TTestActorRuntimeBase{d}
@@ -212,7 +217,14 @@ namespace NActors {
212217
return MonPorts[nodeIndex];
213218
}
214219

215-
void TTestActorRuntime::InitActorSystemSetup(TActorSystemSetup& /*setup*/) {
220+
void TTestActorRuntime::InitActorSystemSetup(TActorSystemSetup& setup, TNodeDataBase* node) {
221+
if (NeedMonitoring && NeedStatsCollectors) {
222+
NActors::IActor* statsCollector = NKikimr::CreateStatsCollector(1, setup, node->DynamicCounters);
223+
setup.LocalServices.push_back({
224+
TActorId(),
225+
NActors::TActorSetupCmd(statsCollector, NActors::TMailboxType::HTSwap, node->GetAppData<NKikimr::TAppData>()->SystemPoolId)
226+
});
227+
}
216228
}
217229

218230
NKikimr::TAppData& TTestActorRuntime::GetAppData(ui32 nodeIndex) {

ydb/core/testlib/actors/test_runtime.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ namespace NActors {
6262

6363
void AddAppDataInit(std::function<void(ui32, NKikimr::TAppData&)> callback);
6464
virtual void Initialize(TEgg);
65+
void SetupStatsCollectors();
6566

6667
ui16 GetMonPort(ui32 nodeIndex = 0) const;
6768

@@ -111,7 +112,7 @@ namespace NActors {
111112
private:
112113
void Initialize() override;
113114
TIntrusivePtr<::NMonitoring::TDynamicCounters> GetCountersForComponent(TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, const char* component) override;
114-
void InitActorSystemSetup(TActorSystemSetup& setup) override;
115+
void InitActorSystemSetup(TActorSystemSetup& setup, TNodeDataBase* node) override;
115116

116117
TNodeData* GetNodeById(size_t idx) override {
117118
return static_cast<TNodeData*>(TTestActorRuntimeBase::GetNodeById(idx));
@@ -129,5 +130,6 @@ namespace NActors {
129130
TVector<ui16> MonPorts;
130131
TActorId SleepEdgeActor;
131132
TVector<std::function<void(ui32, NKikimr::TAppData&)>> AppDataInit_;
133+
bool NeedStatsCollectors = false;
132134
};
133135
} // namespace NActors

ydb/core/testlib/test_client.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,9 @@ namespace Tests {
225225

226226
NKikimr::SetupChannelProfiles(app);
227227

228+
if (Settings->NeedStatsCollectors) {
229+
Runtime->SetupStatsCollectors();
230+
}
228231
Runtime->SetupMonitoring(Settings->MonitoringPortOffset, Settings->MonitoringTypeAsync);
229232
Runtime->SetLogBackend(Settings->LogBackend);
230233

ydb/core/testlib/test_client.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ namespace Tests {
109109
int GrpcMaxMessageSize = 0; // 0 - default (4_MB), -1 - no limit
110110
ui16 MonitoringPortOffset = 0;
111111
bool MonitoringTypeAsync = false;
112+
bool NeedStatsCollectors = false;
112113
NKikimrProto::TAuthConfig AuthConfig;
113114
NKikimrPQ::TPQConfig PQConfig;
114115
NKikimrPQ::TPQClusterDiscoveryConfig PQClusterDiscoveryConfig;
@@ -164,6 +165,7 @@ namespace Tests {
164165
TServerSettings& SetGrpcPort(ui16 value) { GrpcPort = value; return *this; }
165166
TServerSettings& SetGrpcMaxMessageSize(int value) { GrpcMaxMessageSize = value; return *this; }
166167
TServerSettings& SetMonitoringPortOffset(ui16 value, bool monitoringTypeAsync = false) { MonitoringPortOffset = value; MonitoringTypeAsync = monitoringTypeAsync; return *this; }
168+
TServerSettings& SetNeedStatsCollectors(bool value) { NeedStatsCollectors = value; return *this; }
167169
TServerSettings& SetSupportsRedirect(bool value) { SupportsRedirect = value; return *this; }
168170
TServerSettings& SetTracePath(const TString& value) { TracePath = value; return *this; }
169171
TServerSettings& SetDomain(ui32 value) { Domain = value; return *this; }

ydb/library/actors/testlib/test_runtime.cpp

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1688,14 +1688,20 @@ namespace NActors {
16881688
THolder<TActorSystemSetup> setup(new TActorSystemSetup);
16891689
setup->NodeId = FirstNodeId + nodeIndex;
16901690

1691+
IHarmonizer* harmonizer = nullptr;
1692+
if (node) {
1693+
node->Harmonizer.reset(MakeHarmonizer(GetCycleCountFast()));
1694+
harmonizer = node->Harmonizer.get();
1695+
}
1696+
16911697
if (UseRealThreads) {
16921698
setup->ExecutorsCount = 5;
16931699
setup->Executors.Reset(new TAutoPtr<IExecutorPool>[5]);
1694-
setup->Executors[0].Reset(new TBasicExecutorPool(0, 2, 20));
1695-
setup->Executors[1].Reset(new TBasicExecutorPool(1, 2, 20));
1696-
setup->Executors[2].Reset(new TIOExecutorPool(2, 1));
1697-
setup->Executors[3].Reset(new TBasicExecutorPool(3, 2, 20));
1698-
setup->Executors[4].Reset(new TBasicExecutorPool(4, 1, 20));
1700+
setup->Executors[0].Reset(new TBasicExecutorPool(0, 2, 20, "System", harmonizer));
1701+
setup->Executors[1].Reset(new TBasicExecutorPool(1, 2, 20, "User", harmonizer));
1702+
setup->Executors[2].Reset(new TIOExecutorPool(2, 1, "IO"));
1703+
setup->Executors[3].Reset(new TBasicExecutorPool(3, 2, 20, "Batch", harmonizer));
1704+
setup->Executors[4].Reset(new TBasicExecutorPool(4, 1, 20, "IC", harmonizer));
16991705
setup->Scheduler.Reset(new TBasicSchedulerThread(TSchedulerConfig(512, 100)));
17001706
} else {
17011707
setup->ExecutorsCount = 1;
@@ -1704,7 +1710,7 @@ namespace NActors {
17041710
setup->Executors[0].Reset(new TExecutorPoolStub(this, nodeIndex, node, 0));
17051711
}
17061712

1707-
InitActorSystemSetup(*setup);
1713+
InitActorSystemSetup(*setup, node);
17081714

17091715
return setup;
17101716
}
@@ -1714,7 +1720,9 @@ namespace NActors {
17141720

17151721
node->ExecutorPools.resize(setup->ExecutorsCount);
17161722
for (ui32 i = 0; i < setup->ExecutorsCount; ++i) {
1717-
node->ExecutorPools[i] = setup->Executors[i].Get();
1723+
IExecutorPool* executor = setup->Executors[i].Get();
1724+
node->ExecutorPools[i] = executor;
1725+
node->Harmonizer->AddPool(executor);
17181726
}
17191727

17201728
const auto& interconnectCounters = GetCountersForComponent(node->DynamicCounters, "interconnect");

ydb/library/actors/testlib/test_runtime.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -618,8 +618,8 @@ namespace NActors {
618618

619619
THolder<TActorSystemSetup> MakeActorSystemSetup(ui32 nodeIndex, TNodeDataBase* node);
620620
THolder<TActorSystem> MakeActorSystem(ui32 nodeIndex, TNodeDataBase* node);
621-
virtual void InitActorSystemSetup(TActorSystemSetup& setup) {
622-
Y_UNUSED(setup);
621+
virtual void InitActorSystemSetup(TActorSystemSetup& setup, TNodeDataBase* node) {
622+
Y_UNUSED(setup, node);
623623
}
624624

625625
private:
@@ -703,6 +703,7 @@ namespace NActors {
703703
THolder<IExecutorPool> SchedulerPool;
704704
TVector<IExecutorPool*> ExecutorPools;
705705
THolder<TExecutorThread> ExecutorThread;
706+
std::unique_ptr<IHarmonizer> Harmonizer;
706707
};
707708

708709
struct INodeFactory {

ydb/tests/tools/kqprun/kqprun.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
#include <ydb/core/base/backtrace.h>
1515

16+
#include <ydb/library/yaml_config/yaml_config.h>
1617
#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
1718
#include <ydb/library/yql/providers/yt/gateway/file/yql_yt_file.h>
1819
#include <ydb/library/yql/providers/yt/gateway/file/yql_yt_file_comp_nodes.h>
@@ -305,7 +306,10 @@ class TMain : public TMainClassArgs {
305306
.DefaultValue("./configuration/app_config.conf")
306307
.Handler1([this](const NLastGetopt::TOptsParser* option) {
307308
TString file(option->CurValOrDef());
308-
if (!google::protobuf::TextFormat::ParseFromString(LoadFile(file), &RunnerOptions.YdbSettings.AppConfig)) {
309+
if (file.EndsWith(".yaml")) {
310+
auto document = NKikimr::NFyaml::TDocument::Parse(LoadFile(file));
311+
RunnerOptions.YdbSettings.AppConfig = NKikimr::NYamlConfig::YamlToProto(document.Root());
312+
} else if (!google::protobuf::TextFormat::ParseFromString(LoadFile(file), &RunnerOptions.YdbSettings.AppConfig)) {
309313
ythrow yexception() << "Bad format of app configuration";
310314
}
311315
});

ydb/tests/tools/kqprun/src/actors.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#pragma once
2+
13
#include <ydb/core/kqp/common/events/events.h>
24
#include <ydb/core/kqp/executer_actor/kqp_executer.h>
35

ydb/tests/tools/kqprun/src/ydb_setup.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ class TYdbSetup::TImpl {
141141
if (Settings_.MonitoringEnabled) {
142142
serverSettings.InitKikimrRunConfig();
143143
serverSettings.SetMonitoringPortOffset(Settings_.MonitoringPortOffset);
144+
serverSettings.SetNeedStatsCollectors(true);
144145
}
145146

146147
return serverSettings;

0 commit comments

Comments
 (0)