Skip to content

Commit 17b416b

Browse files
authored
Merge 3894a95 into ba306c3
2 parents ba306c3 + 3894a95 commit 17b416b

File tree

4 files changed

+118
-37
lines changed

4 files changed

+118
-37
lines changed

ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp

+112-36
Original file line numberDiff line numberDiff line change
@@ -48,25 +48,37 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
4848

4949
TCommonCounters(NMonitoring::TDynamicCounterPtr counters, const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig)
5050
: CountersRoot(counters)
51-
, CountersSubgroup(counters->GetSubgroup("pool", CanonizePath(TStringBuilder() << database << "/" << poolId)))
51+
, CountersSubgroup(counters ? counters->GetSubgroup("pool", CanonizePath(TStringBuilder() << database << "/" << poolId)) : nullptr)
5252
{
5353
Register();
5454
UpdateConfigCounters(poolConfig);
5555
}
5656

57+
explicit operator bool() const {
58+
return !!CountersRoot;
59+
}
60+
5761
void CollectRequestLatency(TInstant continueTime) {
58-
if (continueTime) {
62+
if (continueTime && CountersRoot) {
5963
RequestsLatencyMs->Collect((TInstant::Now() - continueTime).MilliSeconds());
6064
}
6165
}
6266

6367
void UpdateConfigCounters(const NResourcePool::TPoolSettings& poolConfig) {
68+
if (!CountersRoot) {
69+
return;
70+
}
71+
6472
InFlightLimit->Set(std::max(poolConfig.ConcurrentQueryLimit, 0));
6573
QueueSizeLimit->Set(std::max(poolConfig.QueueSize, 0));
6674
LoadCpuThreshold->Set(std::max(poolConfig.DatabaseLoadCpuThreshold, 0.0));
6775
}
6876

6977
void OnCleanup() {
78+
if (!CountersRoot) {
79+
return;
80+
}
81+
7082
ActivePoolHandlers->Dec();
7183

7284
InFlightLimit->Set(0);
@@ -76,6 +88,10 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
7688

7789
private:
7890
void Register() {
91+
if (!CountersRoot) {
92+
return;
93+
}
94+
7995
ActivePoolHandlers = CountersRoot->GetCounter("ActivePoolHandlers", false);
8096
ActivePoolHandlers->Inc();
8197

@@ -202,7 +218,9 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
202218
}
203219

204220
TRequest* request = &LocalSessions.insert({sessionId, TRequest(workerActorId, sessionId)}).first->second;
205-
Counters.LocalDelayedRequests->Inc();
221+
if (Counters) {
222+
Counters.LocalDelayedRequests->Inc();
223+
}
206224

207225
UpdatePoolConfig(ev->Get()->PoolConfig);
208226
UpdateSchemeboardSubscription(ev->Get()->PathId);
@@ -303,25 +321,35 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
303321
LocalInFlight++;
304322
request->Started = true;
305323
request->ContinueTime = TInstant::Now();
306-
Counters.LocalInFly->Inc();
307-
Counters.ContinueOk->Inc();
308-
Counters.DelayedTimeMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds());
324+
if (Counters) {
325+
Counters.LocalInFly->Inc();
326+
Counters.ContinueOk->Inc();
327+
Counters.DelayedTimeMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds());
328+
}
309329
LOG_D("Reply continue success to " << request->WorkerActorId << ", session id: " << request->SessionId << ", local in flight: " << LocalInFlight);
310330
} else {
311331
if (status == Ydb::StatusIds::OVERLOADED) {
312-
Counters.ContinueOverloaded->Inc();
332+
if (Counters) {
333+
Counters.ContinueOverloaded->Inc();
334+
}
313335
LOG_I("Reply overloaded to " << request->WorkerActorId << ", session id: " << request->SessionId << ", issues: " << issues.ToOneLineString());
314336
} else if (status == Ydb::StatusIds::CANCELLED) {
315-
Counters.Cancelled->Inc();
337+
if (Counters) {
338+
Counters.Cancelled->Inc();
339+
}
316340
LOG_I("Reply cancelled to " << request->WorkerActorId << ", session id: " << request->SessionId << ", issues: " << issues.ToOneLineString());
317341
} else {
318-
Counters.ContinueError->Inc();
342+
if (Counters) {
343+
Counters.ContinueError->Inc();
344+
}
319345
LOG_W("Reply continue error " << status << " to " << request->WorkerActorId << ", session id: " << request->SessionId << ", issues: " << issues.ToOneLineString());
320346
}
321347
RemoveRequest(request);
322348
}
323349

324-
Counters.LocalDelayedRequests->Dec();
350+
if (Counters) {
351+
Counters.LocalDelayedRequests->Dec();
352+
}
325353
}
326354

327355
void FinalReply(TRequest* request, Ydb::StatusIds::StatusCode status, const TString& message) {
@@ -340,8 +368,10 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
340368

341369
if (request->Started) {
342370
LocalInFlight--;
343-
Counters.LocalInFly->Dec();
344-
} else {
371+
if (Counters) {
372+
Counters.LocalInFly->Dec();
373+
}
374+
} else if (Counters) {
345375
Counters.LocalDelayedRequests->Dec();
346376
}
347377

@@ -428,11 +458,15 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
428458
this->Send(request->WorkerActorId, new TEvCleanupResponse(status, issues));
429459

430460
if (status == Ydb::StatusIds::SUCCESS) {
431-
Counters.CleanupOk->Inc();
432-
Counters.CollectRequestLatency(request->ContinueTime);
461+
if (Counters) {
462+
Counters.CleanupOk->Inc();
463+
Counters.CollectRequestLatency(request->ContinueTime);
464+
}
433465
LOG_D("Reply cleanup success to " << request->WorkerActorId << ", session id: " << request->SessionId << ", local in flight: " << LocalInFlight);
434466
} else {
435-
Counters.CleanupError->Inc();
467+
if (Counters) {
468+
Counters.CleanupError->Inc();
469+
}
436470
LOG_W("Reply cleanup error " << status << " to " << request->WorkerActorId << ", session id: " << request->SessionId << ", issues: " << issues.ToOneLineString());
437471
}
438472
}
@@ -442,8 +476,10 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
442476
ev->Record.MutableRequest()->SetSessionId(request->SessionId);
443477
this->Send(MakeKqpProxyID(this->SelfId().NodeId()), ev.release());
444478

445-
Counters.Cancelled->Inc();
446-
Counters.CollectRequestLatency(request->ContinueTime);
479+
if (Counters) {
480+
Counters.Cancelled->Inc();
481+
Counters.CollectRequestLatency(request->ContinueTime);
482+
}
447483
LOG_I("Cancel request for worker " << request->WorkerActorId << ", session id: " << request->SessionId << ", local in flight: " << LocalInFlight);
448484
}
449485

@@ -547,34 +583,54 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase<TFifoPoolHandlerActor
547583
using TBase = TPoolHandlerActorBase<TFifoPoolHandlerActor>;
548584

549585
struct TCounters {
586+
const NMonitoring::TDynamicCounterPtr CountersSubgroup;
587+
550588
// Fifo pool counters
551589
NMonitoring::TDynamicCounters::TCounterPtr PendingRequestsCount;
552590
NMonitoring::TDynamicCounters::TCounterPtr FinishingRequestsCount;
553591
NMonitoring::TDynamicCounters::TCounterPtr GlobalInFly;
554592
NMonitoring::TDynamicCounters::TCounterPtr GlobalDelayedRequests;
555593
NMonitoring::THistogramPtr PoolStateUpdatesBacklogMs;
556594

557-
TCounters(NMonitoring::TDynamicCounterPtr countersSubgroup) {
558-
Register(countersSubgroup);
595+
TCounters(NMonitoring::TDynamicCounterPtr countersSubgroup)
596+
: CountersSubgroup(countersSubgroup)
597+
{
598+
Register();
599+
}
600+
601+
explicit operator bool() const {
602+
return !!CountersSubgroup;
559603
}
560604

561605
void UpdateGlobalState(const TPoolStateDescription& description) {
606+
if (!CountersSubgroup) {
607+
return;
608+
}
609+
562610
GlobalInFly->Set(description.RunningRequests);
563611
GlobalDelayedRequests->Set(description.DelayedRequests);
564612
}
565613

566614
void OnCleanup() {
615+
if (!CountersSubgroup) {
616+
return;
617+
}
618+
567619
GlobalInFly->Set(0);
568620
GlobalDelayedRequests->Set(0);
569621
}
570622

571623
private:
572-
void Register(NMonitoring::TDynamicCounterPtr countersSubgroup) {
573-
PendingRequestsCount = countersSubgroup->GetCounter("PendingRequestsCount", false);
574-
FinishingRequestsCount = countersSubgroup->GetCounter("FinishingRequestsCount", false);
575-
GlobalInFly = countersSubgroup->GetCounter("GlobalInFly", false);
576-
GlobalDelayedRequests = countersSubgroup->GetCounter("GlobalDelayedRequests", false);
577-
PoolStateUpdatesBacklogMs = countersSubgroup->GetHistogram("PoolStateUpdatesBacklogMs", NMonitoring::LinearHistogram(20, 0, 3 * LEASE_DURATION.MillisecondsFloat() / 40));
624+
void Register() {
625+
if (!CountersSubgroup) {
626+
return;
627+
}
628+
629+
PendingRequestsCount = CountersSubgroup->GetCounter("PendingRequestsCount", false);
630+
FinishingRequestsCount = CountersSubgroup->GetCounter("FinishingRequestsCount", false);
631+
GlobalInFly = CountersSubgroup->GetCounter("GlobalInFly", false);
632+
GlobalDelayedRequests = CountersSubgroup->GetCounter("GlobalDelayedRequests", false);
633+
PoolStateUpdatesBacklogMs = CountersSubgroup->GetHistogram("PoolStateUpdatesBacklogMs", NMonitoring::LinearHistogram(20, 0, 3 * LEASE_DURATION.MillisecondsFloat() / 40));
578634
}
579635
};
580636

@@ -628,7 +684,9 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase<TFifoPoolHandlerActor
628684
}
629685

630686
PendingRequests.emplace_back(request->SessionId);
631-
FifoCounters.PendingRequestsCount->Inc();
687+
if (FifoCounters) {
688+
FifoCounters.PendingRequestsCount->Inc();
689+
}
632690

633691
if (!PreparingFinished) {
634692
this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvPrepareTablesRequest(Database, PoolId));
@@ -710,7 +768,9 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase<TFifoPoolHandlerActor
710768
});
711769

712770
PendingRequests.clear();
713-
FifoCounters.PendingRequestsCount->Set(0);
771+
if (FifoCounters) {
772+
FifoCounters.PendingRequestsCount->Set(0);
773+
}
714774
}
715775

716776
void Handle(TEvPrivate::TEvRefreshPoolStateResponse::TPtr& ev) {
@@ -723,7 +783,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase<TFifoPoolHandlerActor
723783
return;
724784
}
725785

726-
if (LastRefreshTime) {
786+
if (LastRefreshTime && FifoCounters) {
727787
FifoCounters.PoolStateUpdatesBacklogMs->Collect((TInstant::Now() - LastRefreshTime).MilliSeconds());
728788
}
729789
LastRefreshTime = TInstant::Now();
@@ -744,7 +804,9 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase<TFifoPoolHandlerActor
744804
RemoveBackRequests(PendingRequests, std::min(delayedRequests - QueueSizeLimit, PendingRequests.size()), [this](TRequest* request) {
745805
ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Too many pending requests for pool " << PoolId);
746806
});
747-
FifoCounters.PendingRequestsCount->Set(PendingRequests.size());
807+
if (FifoCounters) {
808+
FifoCounters.PendingRequestsCount->Set(PendingRequests.size());
809+
}
748810
}
749811

750812
if (PendingRequests.empty() && delayedRequestsCount > QueueSizeLimit) {
@@ -774,7 +836,9 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase<TFifoPoolHandlerActor
774836
}
775837

776838
GlobalState.DelayedRequests++;
777-
FifoCounters.GlobalDelayedRequests->Inc();
839+
if (FifoCounters) {
840+
FifoCounters.GlobalDelayedRequests->Inc();
841+
}
778842
LOG_D("succefully delayed request, session id: " << ev->Get()->SessionId);
779843

780844
DoStartDelayedRequest(GetLoadCpuThreshold());
@@ -849,12 +913,16 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase<TFifoPoolHandlerActor
849913
request->UsedCpuQuota = !!GetLoadCpuThreshold();
850914
requestFound = true;
851915
GlobalState.RunningRequests++;
852-
FifoCounters.GlobalInFly->Inc();
916+
if (FifoCounters) {
917+
FifoCounters.GlobalInFly->Inc();
918+
}
853919
ReplyContinue(request);
854920
} else {
855921
// Request was dropped due to lease expiration
856922
PendingRequests.emplace_front(request->SessionId);
857-
FifoCounters.PendingRequestsCount->Inc();
923+
if (FifoCounters) {
924+
FifoCounters.PendingRequestsCount->Inc();
925+
}
858926
}
859927
});
860928
DelayedRequests.pop_front();
@@ -947,7 +1015,9 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase<TFifoPoolHandlerActor
9471015
RunningOperation = true;
9481016
this->Register(CreateCleanupRequestsActor(this->SelfId(), Database, PoolId, FinishedRequests, Counters.CountersSubgroup));
9491017
FinishedRequests.clear();
950-
FifoCounters.FinishingRequestsCount->Set(0);
1018+
if (FifoCounters) {
1019+
FifoCounters.FinishingRequestsCount->Set(0);
1020+
}
9511021
}
9521022
}
9531023

@@ -971,7 +1041,9 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase<TFifoPoolHandlerActor
9711041

9721042
RemoveFinishedRequests(PendingRequests);
9731043
RemoveFinishedRequests(DelayedRequests);
974-
FifoCounters.PendingRequestsCount->Set(PendingRequests.size());
1044+
if (FifoCounters) {
1045+
FifoCounters.PendingRequestsCount->Set(PendingRequests.size());
1046+
}
9751047
}
9761048

9771049
void RemoveFinishedRequests(std::deque<TString>& requests) {
@@ -1012,13 +1084,17 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase<TFifoPoolHandlerActor
10121084
TString PopPendingRequest() {
10131085
TString sessionId = PendingRequests.front();
10141086
PendingRequests.pop_front();
1015-
FifoCounters.PendingRequestsCount->Dec();
1087+
if (FifoCounters) {
1088+
FifoCounters.PendingRequestsCount->Dec();
1089+
}
10161090
return sessionId;
10171091
}
10181092

10191093
void AddFinishedRequest(const TString& sessionId) {
10201094
FinishedRequests.emplace_back(sessionId);
1021-
FifoCounters.FinishingRequestsCount->Inc();
1095+
if (FifoCounters) {
1096+
FifoCounters.FinishingRequestsCount->Inc();
1097+
}
10221098
}
10231099

10241100
private:

ydb/core/kqp/workload_service/kqp_workload_service.cpp

+4-1
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
7474

7575
EnabledResourcePools = AppData()->FeatureFlags.GetEnableResourcePools();
7676
EnabledResourcePoolsOnServerless = AppData()->FeatureFlags.GetEnableResourcePoolsOnServerless();
77+
EnableResourcePoolsCounters = AppData()->FeatureFlags.GetEnableResourcePoolsCounters();
7778
if (EnabledResourcePools) {
7879
InitializeWorkloadService();
7980
}
@@ -101,6 +102,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
101102

102103
EnabledResourcePools = event.GetConfig().GetFeatureFlags().GetEnableResourcePools();
103104
EnabledResourcePoolsOnServerless = event.GetConfig().GetFeatureFlags().GetEnableResourcePoolsOnServerless();
105+
EnableResourcePoolsCounters = event.GetConfig().GetFeatureFlags().GetEnableResourcePoolsCounters();
104106
if (EnabledResourcePools) {
105107
LOG_I("Resource pools was enanbled");
106108
InitializeWorkloadService();
@@ -526,7 +528,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
526528

527529
LOG_I("Creating new handler for pool " << poolKey);
528530

529-
const auto poolHandler = Register(CreatePoolHandlerActor(database, poolId, poolConfig, Counters.Counters));
531+
const auto poolHandler = Register(CreatePoolHandlerActor(database, poolId, poolConfig, EnableResourcePoolsCounters ? Counters.Counters : nullptr));
530532
const auto poolState = &PoolIdToState.insert({poolKey, TPoolState{.PoolHandler = poolHandler, .ActorContext = ActorContext()}}).first->second;
531533

532534
Counters.ActivePools->Inc();
@@ -560,6 +562,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
560562

561563
bool EnabledResourcePools = false;
562564
bool EnabledResourcePoolsOnServerless = false;
565+
bool EnableResourcePoolsCounters = false;
563566
bool ServiceInitialized = false;
564567
bool IdleChecksStarted = false;
565568
ETablesCreationStatus TablesCreationStatus = ETablesCreationStatus::Cleanup;

ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ class TWorkloadServiceYdbSetup : public IYdbSetup {
230230
TAppConfig GetAppConfig() const {
231231
TAppConfig appConfig;
232232
appConfig.MutableFeatureFlags()->SetEnableResourcePools(Settings_.EnableResourcePools_);
233+
appConfig.MutableFeatureFlags()->SetEnableResourcePoolsCounters(true);
233234

234235
return appConfig;
235236
}

ydb/core/protos/feature_flags.proto

+1
Original file line numberDiff line numberDiff line change
@@ -151,4 +151,5 @@ message TFeatureFlags {
151151
optional bool EnableResourcePoolsOnServerless = 132 [default = false];
152152
optional bool EnableVectorIndex = 133 [default = false];
153153
optional bool EnableChangefeedsOnIndexTables = 134 [default = false];
154+
optional bool EnableResourcePoolsCounters = 135 [default = false];
154155
}

0 commit comments

Comments
 (0)