Skip to content

Commit db92b74

Browse files
authored
YQ-3458 fix requests starts for queue size zero (#6949)
1 parent 9275a9e commit db92b74

File tree

5 files changed

+57
-20
lines changed

5 files changed

+57
-20
lines changed

ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -577,7 +577,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase<TFifoPoolHandlerActor
577577
}
578578

579579
void OnScheduleRequest(TRequest* request) override {
580-
if (PendingRequests.size() >= MAX_PENDING_REQUESTS || GetLocalSessionsCount() - GetLocalInFlight() > QueueSizeLimit + 1) {
580+
if (PendingRequests.size() >= MAX_PENDING_REQUESTS || SaturationSub(GetLocalSessionsCount() - GetLocalInFlight(), InFlightLimit) > QueueSizeLimit) {
581581
ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Too many pending requests for pool " << PoolId);
582582
return;
583583
}
@@ -695,8 +695,8 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase<TFifoPoolHandlerActor
695695
size_t delayedRequestsCount = DelayedRequests.size();
696696
DoStartPendingRequest(GetLoadCpuThreshold());
697697

698-
if (GlobalState.DelayedRequests + PendingRequests.size() > QueueSizeLimit) {
699-
RemoveBackRequests(PendingRequests, std::min(GlobalState.DelayedRequests + PendingRequests.size() - QueueSizeLimit, PendingRequests.size()), [this](TRequest* request) {
698+
if (const ui64 delayedRequests = SaturationSub(GlobalState.AmountRequests() + PendingRequests.size(), InFlightLimit); delayedRequests > QueueSizeLimit) {
699+
RemoveBackRequests(PendingRequests, std::min(delayedRequests - QueueSizeLimit, PendingRequests.size()), [this](TRequest* request) {
700700
ReplyContinue(request, Ydb::StatusIds::OVERLOADED, TStringBuilder() << "Too many pending requests for pool " << PoolId);
701701
});
702702
FifoCounters.PendingRequestsCount->Set(PendingRequests.size());
@@ -847,7 +847,7 @@ class TFifoPoolHandlerActor : public TPoolHandlerActorBase<TFifoPoolHandlerActor
847847
}
848848

849849
bool canStartRequest = QueueSizeLimit == 0 && GlobalState.RunningRequests < InFlightLimit;
850-
canStartRequest |= !GetLoadCpuThreshold() && NodeCount && GlobalState.RunningRequests + NodeCount < InFlightLimit;
850+
canStartRequest |= !GetLoadCpuThreshold() && DelayedRequests.size() + GlobalState.DelayedRequests == 0 && NodeCount && GlobalState.RunningRequests + NodeCount < InFlightLimit;
851851
if (!PendingRequests.empty() && canStartRequest) {
852852
RunningOperation = true;
853853
const TString& sessionId = PopPendingRequest();

ydb/core/kqp/workload_service/common/helpers.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,8 @@ void ParsePoolSettings(const NKikimrSchemeOp::TResourcePoolDescription& descript
2020
}
2121
}
2222

23+
ui64 SaturationSub(ui64 x, ui64 y) {
24+
return (x > y) ? x - y : 0;
25+
}
26+
2327
} // NKikimr::NKqp::NWorkload

ydb/core/kqp/workload_service/common/helpers.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,4 +99,6 @@ NYql::TIssues GroupIssues(const NYql::TIssues& issues, const TString& message);
9999

100100
void ParsePoolSettings(const NKikimrSchemeOp::TResourcePoolDescription& description, NResourcePool::TPoolSettings& poolConfig);
101101

102+
ui64 SaturationSub(ui64 x, ui64 y);
103+
102104
} // NKikimr::NKqp::NWorkload

ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -453,9 +453,10 @@ class TWorkloadServiceYdbSetup : public IYdbSetup {
453453
auto subgroup = GetWorkloadManagerCounters(nodeIndex)
454454
->GetSubgroup("pool", CanonizePath(TStringBuilder() << Settings_.DomainName_ << "/" << (poolId ? poolId : Settings_.PoolId_)));
455455

456-
CheckCommonCounters(subgroup);
456+
const TString description = TStringBuilder() << "Node index: " << nodeIndex;
457+
CheckCommonCounters(subgroup, description);
457458
if (checkTableCounters) {
458-
CheckTableCounters(subgroup);
459+
CheckTableCounters(subgroup, description);
459460
}
460461
}
461462
}
@@ -497,21 +498,21 @@ class TWorkloadServiceYdbSetup : public IYdbSetup {
497498
->GetSubgroup("subsystem", "workload_manager");
498499
}
499500

500-
static void CheckCommonCounters(NMonitoring::TDynamicCounterPtr subgroup) {
501-
UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("LocalInFly", false)->Val(), 0);
502-
UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("LocalDelayedRequests", false)->Val(), 0);
503-
UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("ContinueOverloaded", true)->Val(), 0);
504-
UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("ContinueError", true)->Val(), 0);
505-
UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("CleanupError", true)->Val(), 0);
506-
UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("Cancelled", true)->Val(), 0);
501+
static void CheckCommonCounters(NMonitoring::TDynamicCounterPtr subgroup, const TString& description) {
502+
UNIT_ASSERT_VALUES_EQUAL_C(subgroup->GetCounter("LocalInFly", false)->Val(), 0, description);
503+
UNIT_ASSERT_VALUES_EQUAL_C(subgroup->GetCounter("LocalDelayedRequests", false)->Val(), 0, description);
504+
UNIT_ASSERT_VALUES_EQUAL_C(subgroup->GetCounter("ContinueOverloaded", true)->Val(), 0, description);
505+
UNIT_ASSERT_VALUES_EQUAL_C(subgroup->GetCounter("ContinueError", true)->Val(), 0, description);
506+
UNIT_ASSERT_VALUES_EQUAL_C(subgroup->GetCounter("CleanupError", true)->Val(), 0, description);
507+
UNIT_ASSERT_VALUES_EQUAL_C(subgroup->GetCounter("Cancelled", true)->Val(), 0, description);
507508

508-
UNIT_ASSERT_GE(subgroup->GetCounter("ContinueOk", true)->Val(), 1);
509-
UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("ContinueOk", true)->Val(), subgroup->GetCounter("CleanupOk", true)->Val());
509+
UNIT_ASSERT_GE_C(subgroup->GetCounter("ContinueOk", true)->Val(), 1, description);
510+
UNIT_ASSERT_VALUES_EQUAL_C(subgroup->GetCounter("ContinueOk", true)->Val(), subgroup->GetCounter("CleanupOk", true)->Val(), description);
510511
}
511512

512-
static void CheckTableCounters(NMonitoring::TDynamicCounterPtr subgroup) {
513-
UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("PendingRequestsCount", false)->Val(), 0);
514-
UNIT_ASSERT_VALUES_EQUAL(subgroup->GetCounter("FinishingRequestsCount", false)->Val(), 0);
513+
static void CheckTableCounters(NMonitoring::TDynamicCounterPtr subgroup, const TString& description) {
514+
UNIT_ASSERT_VALUES_EQUAL_C(subgroup->GetCounter("PendingRequestsCount", false)->Val(), 0, description);
515+
UNIT_ASSERT_VALUES_EQUAL_C(subgroup->GetCounter("FinishingRequestsCount", false)->Val(), 0, description);
515516

516517
const std::vector<std::pair<TString, bool>> tableQueries = {
517518
{"TCleanupTablesQuery", false},
@@ -524,9 +525,9 @@ class TWorkloadServiceYdbSetup : public IYdbSetup {
524525
for (const auto& [operation, runExpected] : tableQueries) {
525526
auto operationSubgroup = subgroup->GetSubgroup("operation", operation);
526527

527-
UNIT_ASSERT_VALUES_EQUAL_C(operationSubgroup->GetCounter("FinishError", true)->Val(), 0, TStringBuilder() << "Unexpected vaule for operation " << operation);
528+
UNIT_ASSERT_VALUES_EQUAL_C(operationSubgroup->GetCounter("FinishError", true)->Val(), 0, TStringBuilder() << description << ", unexpected vaule for operation " << operation);
528529
if (runExpected) {
529-
UNIT_ASSERT_GE_C(operationSubgroup->GetCounter("FinishOk", true)->Val(), 1, TStringBuilder() << "Unexpected vaule for operation " << operation);
530+
UNIT_ASSERT_GE_C(operationSubgroup->GetCounter("FinishOk", true)->Val(), 1, TStringBuilder() << description << ", unexpected vaule for operation " << operation);
530531
}
531532
}
532533
}

ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,36 @@ Y_UNIT_TEST_SUITE(KqpWorkloadService) {
123123
TSampleQueries::TSelect42::CheckResult(hangingRequest.GetResult());
124124
}
125125

126+
Y_UNIT_TEST(TestZeroQueueSizeManyQueries) {
127+
const i32 inFlight = 10;
128+
auto ydb = TYdbSetupSettings()
129+
.ConcurrentQueryLimit(inFlight)
130+
.QueueSize(0)
131+
.QueryCancelAfter(FUTURE_WAIT_TIMEOUT * inFlight)
132+
.Create();
133+
134+
auto settings = TQueryRunnerSettings().HangUpDuringExecution(true);
135+
136+
std::vector<TQueryRunnerResultAsync> asyncResults;
137+
for (size_t i = 0; i < inFlight; ++i) {
138+
asyncResults.emplace_back(ydb->ExecuteQueryAsync(TSampleQueries::TSelect42::Query, settings));
139+
}
140+
141+
for (const auto& asyncResult : asyncResults) {
142+
ydb->WaitQueryExecution(asyncResult);
143+
}
144+
145+
TSampleQueries::CheckOverloaded(
146+
ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().ExecutionExpected(false)),
147+
ydb->GetSettings().PoolId_
148+
);
149+
150+
for (const auto& asyncResult : asyncResults) {
151+
ydb->ContinueQueryExecution(asyncResult);
152+
TSampleQueries::TSelect42::CheckResult(asyncResult.GetResult());
153+
}
154+
}
155+
126156
Y_UNIT_TEST(TestQueryCancelAfterUnlimitedPool) {
127157
auto ydb = TYdbSetupSettings()
128158
.QueryCancelAfter(TDuration::Seconds(10))

0 commit comments

Comments
 (0)