Skip to content

Commit 27e9095

Browse files
authored
Limit v2 compute load by CPU (#861)
* Limit v2 compute load by CPU * Mon fix * Per db load config + cpu load ajustment (cashback) * Review fixes * Clear config etc
1 parent d39b863 commit 27e9095

11 files changed

+584
-13
lines changed

ydb/core/fq/libs/compute/common/utils.cpp

+4-1
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ void EnumeratePlans(NYson::TYsonWriter& writer, NJson::TJsonValue& value, ui32&
338338
}
339339
}
340340

341-
TString GetV1StatFromV2Plan(const TString& plan) {
341+
TString GetV1StatFromV2Plan(const TString& plan, double* cpuUsage) {
342342
TStringStream out;
343343
NYson::TYsonWriter writer(&out);
344344
writer.OnBeginMap();
@@ -358,6 +358,9 @@ TString GetV1StatFromV2Plan(const TString& plan) {
358358
totals.MaxMemoryUsage.Write(writer, "MaxMemoryUsage");
359359
totals.CpuTimeUs.Write(writer, "CpuTimeUs");
360360
totals.SourceCpuTimeUs.Write(writer, "SourceCpuTimeUs");
361+
if (cpuUsage) {
362+
*cpuUsage = (totals.CpuTimeUs.Sum + totals.SourceCpuTimeUs.Sum) / 1000000.0;
363+
}
361364
totals.InputBytes.Write(writer, "InputBytes");
362365
totals.InputRows.Write(writer, "InputRows");
363366
totals.OutputBytes.Write(writer, "OutputBytes");

ydb/core/fq/libs/compute/common/utils.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ inline std::shared_ptr<NYdb::NTable::TTableClient> CreateNewTableClient(const TS
2424
tableSettings);
2525
}
2626

27-
TString GetV1StatFromV2Plan(const TString& plan);
27+
TString GetV1StatFromV2Plan(const TString& plan, double* cpuUsage = nullptr);
2828
TString GetV1StatFromV2PlanV2(const TString& plan);
2929

3030
TString FormatDurationMs(ui64 durationMs);

ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.cpp

+77-4
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ struct TDatabaseClients {
3838
TActorId ActorId;
3939
NConfig::TComputeDatabaseConfig Config;
4040
TActorId DatabasesCacheActorId;
41+
TActorId MonitoringActorId;
4142
};
4243

4344
std::optional<TClientConfig> GetClient(const TString& scope, const TString& endpoint, const TString& database) const {
@@ -306,6 +307,7 @@ class TComputeDatabaseControlPlaneServiceActor : public NActors::TActorBootstrap
306307
switch (controlPlane.type_case()) {
307308
case NConfig::TYdbComputeControlPlane::TYPE_NOT_SET:
308309
case NConfig::TYdbComputeControlPlane::kSingle:
310+
CreateSingleClientActors(controlPlane.GetSingle());
309311
break;
310312
case NConfig::TYdbComputeControlPlane::kCms:
311313
CreateCmsClientActors(controlPlane.GetCms(), controlPlane.GetDatabasesCacheReloadPeriod());
@@ -317,6 +319,16 @@ class TComputeDatabaseControlPlaneServiceActor : public NActors::TActorBootstrap
317319
Become(&TComputeDatabaseControlPlaneServiceActor::StateFunc);
318320
}
319321

322+
static NCloud::TGrpcClientSettings CreateGrpcClientSettings(const NConfig::TYdbStorageConfig& connection) {
323+
NCloud::TGrpcClientSettings settings;
324+
settings.Endpoint = connection.GetEndpoint();
325+
settings.EnableSsl = connection.GetUseSsl();
326+
if (connection.GetCertificateFile()) {
327+
settings.CertificateRootCA = StripString(TFileInput(connection.GetCertificateFile()).ReadAll());
328+
}
329+
return settings;
330+
}
331+
320332
static NCloud::TGrpcClientSettings CreateGrpcClientSettings(const NConfig::TComputeDatabaseConfig& config) {
321333
NCloud::TGrpcClientSettings settings;
322334
const auto& connection = config.GetControlPlaneConnection();
@@ -328,20 +340,45 @@ class TComputeDatabaseControlPlaneServiceActor : public NActors::TActorBootstrap
328340
return settings;
329341
}
330342

343+
void CreateSingleClientActors(const NConfig::TYdbComputeControlPlane::TSingle& singleConfig) {
344+
auto globalLoadConfig = Config.GetYdb().GetLoadControlConfig();
345+
if (globalLoadConfig.GetEnable()) {
346+
auto clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(singleConfig.GetConnection()), CredentialsProviderFactory(GetYdbCredentialSettings(singleConfig.GetConnection()))->CreateProvider()).release());
347+
MonitoringActorId = Register(CreateDatabaseMonitoringActor(clientActor, globalLoadConfig, Counters).release());
348+
}
349+
}
350+
331351
void CreateCmsClientActors(const NConfig::TYdbComputeControlPlane::TCms& cmsConfig, const TString& databasesCacheReloadPeriod) {
332352
const auto& mapping = cmsConfig.GetDatabaseMapping();
353+
auto globalLoadConfig = Config.GetYdb().GetLoadControlConfig();
333354
for (const auto& config: mapping.GetCommon()) {
334355
const auto clientActor = Register(CreateCmsGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release());
335356
const auto cacheActor = Register(CreateComputeDatabasesCacheActor(clientActor, databasesCacheReloadPeriod, Counters).release());
336-
Clients->CommonDatabaseClients.push_back({clientActor, config, cacheActor});
357+
TActorId databaseMonitoringActor;
358+
const NConfig::TLoadControlConfig& loadConfig = config.GetLoadControlConfig().GetEnable()
359+
? Config.GetYdb().GetLoadControlConfig()
360+
: globalLoadConfig;
361+
if (loadConfig.GetEnable()) {
362+
auto clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release());
363+
databaseMonitoringActor = Register(CreateDatabaseMonitoringActor(clientActor, loadConfig, Counters).release());
364+
}
365+
Clients->CommonDatabaseClients.push_back({clientActor, config, cacheActor, databaseMonitoringActor});
337366
}
338367

339368
Y_ABORT_UNLESS(Clients->CommonDatabaseClients);
340369

341370
for (const auto& [scope, config]: mapping.GetScopeToComputeDatabase()) {
342371
const auto clientActor = Register(CreateCmsGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release());
343372
const auto cacheActor = Register(CreateComputeDatabasesCacheActor(clientActor, databasesCacheReloadPeriod, Counters).release());
344-
Clients->ScopeToDatabaseClient[scope] = {clientActor, config, cacheActor};
373+
TActorId databaseMonitoringActor;
374+
const NConfig::TLoadControlConfig& loadConfig = config.GetLoadControlConfig().GetEnable()
375+
? Config.GetYdb().GetLoadControlConfig()
376+
: globalLoadConfig;
377+
if (loadConfig.GetEnable()) {
378+
auto clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release());
379+
databaseMonitoringActor = Register(CreateDatabaseMonitoringActor(clientActor, loadConfig, Counters).release());
380+
}
381+
Clients->ScopeToDatabaseClient[scope] = {clientActor, config, cacheActor, databaseMonitoringActor};
345382
}
346383
}
347384

@@ -350,20 +387,23 @@ class TComputeDatabaseControlPlaneServiceActor : public NActors::TActorBootstrap
350387
for (const auto& config: mapping.GetCommon()) {
351388
const auto clientActor = Register(CreateYdbcpGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release());
352389
const auto cacheActor = Register(CreateComputeDatabasesCacheActor(clientActor, databasesCacheReloadPeriod, Counters).release());
353-
Clients->CommonDatabaseClients.push_back({clientActor, config, cacheActor});
390+
Clients->CommonDatabaseClients.push_back({clientActor, config, cacheActor, {}});
354391
}
355392

356393
Y_ABORT_UNLESS(Clients->CommonDatabaseClients);
357394

358395
for (const auto& [scope, config]: mapping.GetScopeToComputeDatabase()) {
359396
const auto clientActor = Register(CreateYdbcpGrpcClientActor(CreateGrpcClientSettings(config), CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider()).release());
360397
const auto cacheActor = Register(CreateComputeDatabasesCacheActor(clientActor, databasesCacheReloadPeriod, Counters).release());
361-
Clients->ScopeToDatabaseClient[scope] = {clientActor, config, cacheActor};
398+
Clients->ScopeToDatabaseClient[scope] = {clientActor, config, cacheActor, {}};
362399
}
363400
}
364401

365402
STRICT_STFUNC(StateFunc,
366403
hFunc(TEvYdbCompute::TEvCreateDatabaseRequest, Handle);
404+
hFunc(TEvYdbCompute::TEvCpuLoadRequest, Handle);
405+
hFunc(TEvYdbCompute::TEvCpuQuotaRequest, Handle);
406+
hFunc(TEvYdbCompute::TEvCpuQuotaAdjust, Handle);
367407
)
368408

369409
void Handle(TEvYdbCompute::TEvCreateDatabaseRequest::TPtr& ev) {
@@ -374,7 +414,38 @@ class TComputeDatabaseControlPlaneServiceActor : public NActors::TActorBootstrap
374414
Register(new TCreateDatabaseRequestActor(Clients, SynchronizationServiceActorId, Config, ev));
375415
}
376416

417+
void Handle(TEvYdbCompute::TEvCpuLoadRequest::TPtr& ev) {
418+
auto actorId = GetMonitoringActorIdByScope(ev.Get()->Get()->Scope);
419+
if (actorId != TActorId{}) {
420+
Send(ev->Forward(actorId));
421+
} else {
422+
Send(ev->Sender, new TEvYdbCompute::TEvCpuLoadResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Cluster load monitoring disabled"}}), 0, ev->Cookie);
423+
}
424+
}
425+
426+
void Handle(TEvYdbCompute::TEvCpuQuotaRequest::TPtr& ev) {
427+
auto actorId = GetMonitoringActorIdByScope(ev.Get()->Get()->Scope);
428+
if (actorId != TActorId{}) {
429+
Send(ev->Forward(actorId));
430+
} else {
431+
Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(), 0, ev->Cookie);
432+
}
433+
}
434+
435+
void Handle(TEvYdbCompute::TEvCpuQuotaAdjust::TPtr& ev) {
436+
auto actorId = GetMonitoringActorIdByScope(ev.Get()->Get()->Scope);
437+
if (actorId != TActorId{}) {
438+
Send(ev->Forward(actorId));
439+
}
440+
}
441+
377442
private:
443+
TActorId GetMonitoringActorIdByScope(const TString& scope) {
444+
return Config.GetYdb().GetControlPlane().HasSingle()
445+
? MonitoringActorId
446+
: Clients->GetClient(scope).MonitoringActorId;
447+
}
448+
378449
TActorId SynchronizationServiceActorId;
379450
NFq::NConfig::TComputeConfig Config;
380451
std::shared_ptr<TDatabaseClients> Clients;
@@ -383,6 +454,8 @@ class TComputeDatabaseControlPlaneServiceActor : public NActors::TActorBootstrap
383454
TYqSharedResources::TPtr YqSharedResources;
384455
NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory;
385456
::NMonitoring::TDynamicCounterPtr Counters;
457+
TActorId MonitoringClientActorId;
458+
TActorId MonitoringActorId;
386459
};
387460

388461
std::unique_ptr<NActors::IActor> CreateComputeDatabaseControlPlaneServiceActor(const NFq::NConfig::TComputeConfig& config,

ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h

+4
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,8 @@ std::unique_ptr<NActors::IActor> CreateCmsGrpcClientActor(const NCloud::TGrpcCli
2828

2929
std::unique_ptr<NActors::IActor> CreateComputeDatabasesCacheActor(const NActors::TActorId& databaseClientActorId, const TString& databasesCacheReloadPeriod, const ::NMonitoring::TDynamicCounterPtr& counters);
3030

31+
std::unique_ptr<NActors::IActor> CreateMonitoringGrpcClientActor(const NCloud::TGrpcClientSettings& settings, const NYdb::TCredentialsProviderPtr& credentialsProvider);
32+
33+
std::unique_ptr<NActors::IActor> CreateDatabaseMonitoringActor(const NActors::TActorId& monitoringClientActorId, NFq::NConfig::TLoadControlConfig config, const ::NMonitoring::TDynamicCounterPtr& counters);
34+
3135
}

0 commit comments

Comments
 (0)