Skip to content

Commit 45597d3

Browse files
committed
YQ WM improved overload issues (#8437)
1 parent 7335582 commit 45597d3

File tree

6 files changed

+61
-39
lines changed

6 files changed

+61
-39
lines changed

ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,14 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
6666
LoadCpuThreshold->Set(std::max(poolConfig.DatabaseLoadCpuThreshold, 0.0));
6767
}
6868

69-
void OnCleanup() {
69+
void OnCleanup(bool resetConfigCounters) {
7070
ActivePoolHandlers->Dec();
7171

72-
InFlightLimit->Set(0);
73-
QueueSizeLimit->Set(0);
74-
LoadCpuThreshold->Set(0);
72+
if (resetConfigCounters) {
73+
InFlightLimit->Set(0);
74+
QueueSizeLimit->Set(0);
75+
LoadCpuThreshold->Set(0);
76+
}
7577
}
7678

7779
private:
@@ -136,7 +138,7 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
136138
STRICT_STFUNC(StateFuncBase,
137139
// Workload service events
138140
sFunc(TEvents::TEvPoison, HandlePoison);
139-
sFunc(TEvPrivate::TEvStopPoolHandler, HandleStop);
141+
hFunc(TEvPrivate::TEvStopPoolHandler, Handle);
140142
hFunc(TEvPrivate::TEvResolvePoolResponse, Handle);
141143
hFunc(TEvPrivate::TEvUpdatePoolSubscription, Handle);
142144

@@ -160,7 +162,7 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
160162

161163
SendPoolInfoUpdate(std::nullopt, std::nullopt, Subscribers);
162164

163-
Counters.OnCleanup();
165+
Counters.OnCleanup(ResetCountersOnStrop);
164166

165167
TBase::PassAway();
166168
}
@@ -171,8 +173,9 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
171173
this->PassAway();
172174
}
173175

174-
void HandleStop() {
176+
void Handle(TEvPrivate::TEvStopPoolHandler::TPtr& ev) {
175177
LOG_I("Got stop pool handler request, waiting for " << LocalSessions.size() << " requests");
178+
ResetCountersOnStrop = ev->Get()->ResetCounters;
176179
if (LocalSessions.empty()) {
177180
PassAway();
178181
} else {
@@ -332,7 +335,7 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
332335
if (!request->Started && request->State != TRequest::EState::Finishing) {
333336
if (request->State == TRequest::EState::Canceling && status == Ydb::StatusIds::SUCCESS) {
334337
status = Ydb::StatusIds::CANCELLED;
335-
issues.AddIssue(TStringBuilder() << "Delay deadline exceeded in pool " << PoolId);
338+
issues.AddIssue(TStringBuilder() << "Request was delayed during " << TInstant::Now() - request->StartTime << ", that is larger than delay deadline " << PoolConfig.QueryCancelAfter << " in pool " << PoolId << ", request was canceled");
336339
}
337340
ReplyContinue(request, status, issues);
338341
return;
@@ -515,6 +518,7 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
515518
ui64 LocalInFlight = 0;
516519
std::unordered_map<TString, TRequest> LocalSessions;
517520
bool StopHandler = false; // Stop than all requests finished
521+
bool ResetCountersOnStrop = true;
518522
};
519523

520524

@@ -622,8 +626,13 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase<TFifoPoolHandlerActor
622626
}
623627

624628
void OnScheduleRequest(TRequest* request) override {
625-
if (PendingRequests.size() >= MAX_PENDING_REQUESTS || SaturationSub(GetLocalSessionsCount() - GetLocalInFlight(), InFlightLimit) > QueueSizeLimit) {
626-
ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Too many pending requests for pool " << PoolId);
629+
if (PendingRequests.size() >= MAX_PENDING_REQUESTS) {
630+
ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Request was rejected, number of local pending requests is " << PendingRequests.size() << ", that is larger than allowed limit " << MAX_PENDING_REQUESTS);
631+
return;
632+
}
633+
634+
if (SaturationSub(GetLocalSessionsCount() - GetLocalInFlight(), InFlightLimit) > QueueSizeLimit) {
635+
ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Request was rejected, number of local pending/delayed requests is " << GetLocalSessionsCount() - GetLocalInFlight() << ", that is larger than allowed limit " << QueueSizeLimit << " (including concurrent query limit " << InFlightLimit << ") for pool " << PoolId);
627636
return;
628637
}
629638

@@ -742,15 +751,15 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase<TFifoPoolHandlerActor
742751

743752
if (const ui64 delayedRequests = SaturationSub(GlobalState.AmountRequests() + PendingRequests.size(), InFlightLimit); delayedRequests > QueueSizeLimit) {
744753
RemoveBackRequests(PendingRequests, std::min(delayedRequests - QueueSizeLimit, PendingRequests.size()), [this](TRequest* request) {
745-
ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Too many pending requests for pool " << PoolId);
754+
ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Request was rejected, number of local pending requests is " << PendingRequests.size() << ", number of global delayed/running requests is " << GlobalState.AmountRequests() << ", sum of them is larger than allowed limit " << QueueSizeLimit << " (including concurrent query limit " << InFlightLimit << ") for pool " << PoolId);
746755
});
747756
FifoCounters.PendingRequestsCount->Set(PendingRequests.size());
748757
}
749758

750759
if (PendingRequests.empty() && delayedRequestsCount > QueueSizeLimit) {
751-
RemoveBackRequests(DelayedRequests, delayedRequestsCount - QueueSizeLimit, [this](TRequest* request) {
760+
RemoveBackRequests(DelayedRequests, delayedRequestsCount - QueueSizeLimit, [this, delayedRequestsCount](TRequest* request) {
752761
AddFinishedRequest(request->SessionId);
753-
ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Too many pending requests for pool " << PoolId);
762+
ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Request was rejected, number of local delayed requests is " << delayedRequestsCount << ", that is larger than allowed limit " << QueueSizeLimit << " for pool " << PoolId);
754763
});
755764
}
756765

@@ -787,9 +796,10 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase<TFifoPoolHandlerActor
787796
if (!ev->Get()->QuotaAccepted) {
788797
LOG_D("Skipped request start due to load cpu threshold");
789798
if (static_cast<EStartRequestCase>(ev->Cookie) == EStartRequestCase::Pending) {
790-
ForEachUnfinished(DelayedRequests.begin(), DelayedRequests.end(), [this](TRequest* request) {
799+
NYql::TIssues issues = GroupIssues(ev->Get()->Issues, TStringBuilder() << "Request was rejected, failed to request CPU quota for pool " << PoolId << ", current CPU threshold is " << 100.0 * ev->Get()->MaxClusterLoad << "%");
800+
ForEachUnfinished(DelayedRequests.begin(), DelayedRequests.end(), [this, issues](TRequest* request) {
791801
AddFinishedRequest(request->SessionId);
792-
ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Too many pending requests for pool " << PoolId);
802+
ReplyContinue(request, Ydb::StatusIds::OVERLOADED, issues);
793803
});
794804
}
795805
RefreshState();

ydb/core/kqp/workload_service/common/events.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,11 @@ struct TEvPrivate {
166166
};
167167

168168
struct TEvStopPoolHandler : public NActors::TEventLocal<TEvStopPoolHandler, EvStopPoolHandler> {
169+
explicit TEvStopPoolHandler(bool resetCounters)
170+
: ResetCounters(resetCounters)
171+
{}
172+
173+
const bool ResetCounters;
169174
};
170175

171176
struct TEvCancelRequest : public NActors::TEventLocal<TEvCancelRequest, EvCancelRequest> {
@@ -196,11 +201,15 @@ struct TEvPrivate {
196201
};
197202

198203
struct TEvCpuQuotaResponse : public NActors::TEventLocal<TEvCpuQuotaResponse, EvCpuQuotaResponse> {
199-
explicit TEvCpuQuotaResponse(bool quotaAccepted)
204+
explicit TEvCpuQuotaResponse(bool quotaAccepted, double maxClusterLoad, NYql::TIssues issues)
200205
: QuotaAccepted(quotaAccepted)
206+
, MaxClusterLoad(maxClusterLoad)
207+
, Issues(std::move(issues))
201208
{}
202209

203210
const bool QuotaAccepted;
211+
const double MaxClusterLoad;
212+
const NYql::TIssues Issues;
204213
};
205214

206215
struct TEvCpuLoadResponse : public NActors::TEventLocal<TEvCpuLoadResponse, EvCpuLoadResponse> {

ydb/core/kqp/workload_service/kqp_workload_service.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
381381

382382
if (auto poolState = GetPoolState(database, poolId)) {
383383
if (poolState->NewPoolHandler) {
384-
Send(*poolState->NewPoolHandler, new TEvPrivate::TEvStopPoolHandler());
384+
Send(*poolState->NewPoolHandler, new TEvPrivate::TEvStopPoolHandler(false));
385385
}
386386
poolState->NewPoolHandler = ev->Get()->NewHandler;
387387
poolState->UpdateHandler();
@@ -443,7 +443,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
443443
for (const auto& [poolKey, poolState] : PoolIdToState) {
444444
if (!poolState.InFlightRequests && TInstant::Now() - poolState.LastUpdateTime > IDLE_DURATION) {
445445
CpuQuotaManager->CleanupHandler(poolState.PoolHandler);
446-
Send(poolState.PoolHandler, new TEvPrivate::TEvStopPoolHandler());
446+
Send(poolState.PoolHandler, new TEvPrivate::TEvStopPoolHandler(true));
447447
poolsToDelete.emplace_back(poolKey);
448448
}
449449
}

ydb/core/kqp/workload_service/kqp_workload_service_impl.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ struct TPoolState {
121121
return;
122122
}
123123

124-
ActorContext.Send(PoolHandler, new TEvPrivate::TEvStopPoolHandler());
124+
ActorContext.Send(PoolHandler, new TEvPrivate::TEvStopPoolHandler(false));
125125
PoolHandler = *NewPoolHandler;
126126
NewPoolHandler = std::nullopt;
127127
InFlightRequests = 0;
@@ -160,7 +160,7 @@ struct TCpuQuotaManagerState {
160160
auto response = CpuQuotaManager.RequestCpuQuota(0.0, maxClusterLoad);
161161

162162
bool quotaAccepted = response.Status == NYdb::EStatus::SUCCESS;
163-
ActorContext.Send(poolHandler, new TEvPrivate::TEvCpuQuotaResponse(quotaAccepted), 0, coockie);
163+
ActorContext.Send(poolHandler, new TEvPrivate::TEvCpuQuotaResponse(quotaAccepted, maxClusterLoad, std::move(response.Issues)), 0, coockie);
164164

165165
// Schedule notification
166166
if (!quotaAccepted) {

ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,12 +121,6 @@ struct TSampleQueries {
121121
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
122122
}
123123

124-
template <typename TResult>
125-
static void CheckOverloaded(const TResult& result, const TString& poolId) {
126-
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::OVERLOADED, result.GetIssues().ToString());
127-
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Too many pending requests for pool " << poolId);
128-
}
129-
130124
template <typename TResult>
131125
static void CheckCancelled(const TResult& result) {
132126
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::CANCELLED, result.GetIssues().ToString());

ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,10 @@ Y_UNIT_TEST_SUITE(KqpWorkloadService) {
5858
}
5959
UNIT_ASSERT_C(firstRequest.HasValue(), "One of two requests shoud be rejected");
6060
UNIT_ASSERT_C(!secondRequest.HasValue(), "One of two requests shoud be placed in pool");
61-
TSampleQueries::CheckOverloaded(firstRequest.GetResult(), ydb->GetSettings().PoolId_);
61+
62+
auto result = firstRequest.GetResult();
63+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::OVERLOADED, result.GetIssues().ToOneLineString());
64+
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Request was rejected, number of local pending requests is 2, number of global delayed/running requests is 1, sum of them is larger than allowed limit 1 (including concurrent query limit 1) for pool " << ydb->GetSettings().PoolId_);
6265

6366
return secondRequest;
6467
}
@@ -114,10 +117,9 @@ Y_UNIT_TEST_SUITE(KqpWorkloadService) {
114117
auto hangingRequest = ydb->ExecuteQueryAsync(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().HangUpDuringExecution(true));
115118
ydb->WaitQueryExecution(hangingRequest);
116119

117-
TSampleQueries::CheckOverloaded(
118-
ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().ExecutionExpected(false)),
119-
ydb->GetSettings().PoolId_
120-
);
120+
auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().ExecutionExpected(false));
121+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::OVERLOADED, result.GetIssues().ToOneLineString());
122+
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Request was rejected, number of local pending requests is 1, number of global delayed/running requests is 1, sum of them is larger than allowed limit 0 (including concurrent query limit 1) for pool " << ydb->GetSettings().PoolId_);
121123

122124
ydb->ContinueQueryExecution(hangingRequest);
123125
TSampleQueries::TSelect42::CheckResult(hangingRequest.GetResult());
@@ -142,10 +144,9 @@ Y_UNIT_TEST_SUITE(KqpWorkloadService) {
142144
ydb->WaitQueryExecution(asyncResult);
143145
}
144146

145-
TSampleQueries::CheckOverloaded(
146-
ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().ExecutionExpected(false)),
147-
ydb->GetSettings().PoolId_
148-
);
147+
auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().ExecutionExpected(false));
148+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::OVERLOADED, result.GetIssues().ToOneLineString());
149+
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Request was rejected, number of local pending requests is 1, number of global delayed/running requests is " << inFlight << ", sum of them is larger than allowed limit 0 (including concurrent query limit " << inFlight << ") for pool " << ydb->GetSettings().PoolId_);
149150

150151
for (const auto& asyncResult : asyncResults) {
151152
ydb->ContinueQueryExecution(asyncResult);
@@ -230,7 +231,8 @@ Y_UNIT_TEST_SUITE(KqpWorkloadService) {
230231

231232
auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().ExecutionExpected(false));
232233
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::CANCELLED, result.GetIssues().ToString());
233-
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Delay deadline exceeded in pool " << ydb->GetSettings().PoolId_);
234+
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Request was delayed during");
235+
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << ", that is larger than delay deadline 10.000000s in pool " << ydb->GetSettings().PoolId_ << ", request was canceled");
234236
}
235237

236238
Y_UNIT_TEST(TestCpuLoadThresholdRefresh) {
@@ -289,7 +291,9 @@ Y_UNIT_TEST_SUITE(KqpWorkloadServiceDistributed) {
289291
ydb->WaitPoolState({.DelayedRequests = 1, .RunningRequests = 1});
290292

291293
// Check distributed queue size
292-
TSampleQueries::CheckOverloaded(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().NodeIndex(0)), ydb->GetSettings().PoolId_);
294+
auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().NodeIndex(0));
295+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::OVERLOADED, result.GetIssues().ToOneLineString());
296+
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Request was rejected, number of local pending requests is 1, number of global delayed/running requests is 2, sum of them is larger than allowed limit 1 (including concurrent query limit 1) for pool " << ydb->GetSettings().PoolId_);
293297

294298
ydb->ContinueQueryExecution(delayedRequest);
295299
ydb->ContinueQueryExecution(hangingRequest);
@@ -359,7 +363,9 @@ Y_UNIT_TEST_SUITE(ResourcePoolsDdl) {
359363
);
360364
ydb->WaitQueryExecution(hangingRequest);
361365

362-
TSampleQueries::CheckOverloaded(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().PoolId(poolId)), poolId);
366+
auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().PoolId(poolId));
367+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::OVERLOADED, result.GetIssues().ToOneLineString());
368+
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Request was rejected, number of local pending requests is 1, number of global delayed/running requests is 1, sum of them is larger than allowed limit 0 (including concurrent query limit 1) for pool " << poolId);
363369

364370
ydb->ContinueQueryExecution(hangingRequest);
365371
TSampleQueries::TSelect42::CheckResult(hangingRequest.GetResult());
@@ -401,7 +407,10 @@ Y_UNIT_TEST_SUITE(ResourcePoolsDdl) {
401407
QUEUE_SIZE=0
402408
);
403409
)");
404-
TSampleQueries::CheckOverloaded(delayedRequest.GetResult(), ydb->GetSettings().PoolId_);
410+
411+
auto result = delayedRequest.GetResult();
412+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::OVERLOADED, result.GetIssues().ToOneLineString());
413+
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Request was rejected, number of local delayed requests is 1, that is larger than allowed limit 0 for pool " << ydb->GetSettings().PoolId_);
405414

406415
ydb->ContinueQueryExecution(hangingRequest);
407416
TSampleQueries::TSelect42::CheckResult(hangingRequest.GetResult());

0 commit comments

Comments
 (0)