Skip to content

Commit a037131

Browse files
authored
YQ-3345 fixed WM counters and unit test (#6643)
1 parent 2d3caea commit a037131

8 files changed

+253
-123
lines changed

ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp

Lines changed: 163 additions & 89 deletions
Large diffs are not rendered by default.

ydb/core/kqp/workload_service/common/events.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ struct TEvPrivate {
3333
EvCpuQuotaRequest,
3434
EvCpuQuotaResponse,
3535
EvCpuLoadResponse,
36+
EvNodesInfoRequest,
37+
EvNodesInfoResponse,
3638

3739
EvTablesCreationFinished,
3840
EvCleanupTableResponse,
@@ -183,6 +185,17 @@ struct TEvPrivate {
183185
const NYql::TIssues Issues;
184186
};
185187

188+
struct TEvNodesInfoRequest : public NActors::TEventLocal<TEvNodesInfoRequest, EvNodesInfoRequest> {
189+
};
190+
191+
struct TEvNodesInfoResponse : public NActors::TEventLocal<TEvNodesInfoResponse, EvNodesInfoResponse> {
192+
explicit TEvNodesInfoResponse(ui32 nodeCount)
193+
: NodeCount(nodeCount)
194+
{}
195+
196+
const ui32 NodeCount;
197+
};
198+
186199
// Tables queries events
187200
struct TEvTablesCreationFinished : public NActors::TEventLocal<TEvTablesCreationFinished, EvTablesCreationFinished> {
188201
TEvTablesCreationFinished(bool success, NYql::TIssues issues)

ydb/core/kqp/workload_service/kqp_workload_service.cpp

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
#include <ydb/core/protos/console_config.pb.h>
1616

17+
#include <ydb/library/actors/interconnect/interconnect.h>
18+
1719

1820
namespace NKikimr::NKqp {
1921

@@ -34,7 +36,8 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
3436

3537
enum class EWakeUp {
3638
IdleCheck,
37-
StartCpuLoadRequest
39+
StartCpuLoadRequest,
40+
StartNodeInfoRequest
3841
};
3942

4043
public:
@@ -92,6 +95,13 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
9295
Send(ev->Sender, responseEvent.release(), IEventHandle::FlagTrackDelivery, ev->Cookie);
9396
}
9497

98+
void Handle(TEvInterconnect::TEvNodesInfo::TPtr& ev) {
99+
NodeCount = ev->Get()->Nodes.size();
100+
ScheduleNodeInfoRequest();
101+
102+
LOG_T("Updated node info, noode count: " << NodeCount);
103+
}
104+
95105
void Handle(TEvents::TEvUndelivered::TPtr& ev) const {
96106
switch (ev->Get()->SourceType) {
97107
case NConsole::TEvConfigsDispatcher::EvSetConfigSubscriptionRequest:
@@ -102,6 +112,11 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
102112
LOG_E("Failed to deliver config notification response");
103113
break;
104114

115+
case TEvInterconnect::EvListNodes:
116+
LOG_W("Failed to deliver list nodes request");
117+
ScheduleNodeInfoRequest();
118+
break;
119+
105120
default:
106121
LOG_E("Undelivered event with unexpected source type: " << ev->Get()->SourceType);
107122
break;
@@ -145,13 +160,18 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
145160
case EWakeUp::StartCpuLoadRequest:
146161
RunCpuLoadRequest();
147162
break;
163+
164+
case EWakeUp::StartNodeInfoRequest:
165+
RunNodeInfoRequest();
166+
break;
148167
}
149168
}
150169

151170
STRICT_STFUNC(MainState,
152171
sFunc(TEvents::TEvPoison, HandlePoison);
153172
sFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse, HandleSetConfigSubscriptionResponse);
154173
hFunc(NConsole::TEvConsole::TEvConfigNotificationRequest, Handle);
174+
hFunc(TEvInterconnect::TEvNodesInfo, Handle);
155175
hFunc(TEvents::TEvUndelivered, Handle);
156176

157177
hFunc(TEvPlaceRequestIntoPool, Handle);
@@ -160,6 +180,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
160180

161181
hFunc(TEvPrivate::TEvResolvePoolResponse, Handle);
162182
hFunc(TEvPrivate::TEvPlaceRequestIntoPoolResponse, Handle);
183+
hFunc(TEvPrivate::TEvNodesInfoRequest, Handle);
163184
hFunc(TEvPrivate::TEvRefreshPoolState, Handle);
164185
hFunc(TEvPrivate::TEvCpuQuotaRequest, Handle);
165186
hFunc(TEvPrivate::TEvFinishRequestInPool, Handle);
@@ -214,6 +235,10 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
214235
}
215236
}
216237

238+
void Handle(TEvPrivate::TEvNodesInfoRequest::TPtr& ev) const {
239+
Send(ev->Sender, new TEvPrivate::TEvNodesInfoResponse(NodeCount));
240+
}
241+
217242
void Handle(TEvPrivate::TEvRefreshPoolState::TPtr& ev) {
218243
const auto& event = ev->Get()->Record;
219244
const TString& database = event.GetDatabase();
@@ -333,6 +358,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
333358

334359
LOG_I("Started workload service initialization");
335360
Register(CreateCleanupTablesActor());
361+
RunNodeInfoRequest();
336362
}
337363

338364
void PrepareWorkloadServiceTables() {
@@ -420,6 +446,14 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
420446
Register(CreateCpuLoadFetcherActor(SelfId()));
421447
}
422448

449+
void ScheduleNodeInfoRequest() const {
450+
Schedule(IDLE_DURATION * 2, new TEvents::TEvWakeup(static_cast<ui64>(EWakeUp::StartCpuLoadRequest)));
451+
}
452+
453+
void RunNodeInfoRequest() const {
454+
Send(GetNameserviceActorId(), new TEvInterconnect::TEvListNodes(), IEventHandle::FlagTrackDelivery);
455+
}
456+
423457
private:
424458
void ReplyContinueError(const TActorId& replyActorId, Ydb::StatusIds::StatusCode status, const TString& message) const {
425459
ReplyContinueError(replyActorId, status, {NYql::TIssue(message)});
@@ -494,6 +528,7 @@ class TKqpWorkloadService : public TActorBootstrapped<TKqpWorkloadService> {
494528
std::unordered_set<TString> DatabasesWithDefaultPool;
495529
std::unordered_map<TString, TPoolState> PoolIdToState;
496530
std::unique_ptr<TCpuQuotaManagerState> CpuQuotaManager;
531+
ui32 NodeCount = 0;
497532

498533
NMonitoring::TDynamicCounters::TCounterPtr ActivePools;
499534
};

ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,7 @@ class TWorkloadServiceYdbSetup : public IYdbSetup {
460460
}
461461
}
462462

463+
// Coomon helpers
463464
TTestActorRuntime* GetRuntime() const override {
464465
return Server_->GetRuntime();
465466
}
@@ -491,19 +492,6 @@ class TWorkloadServiceYdbSetup : public IYdbSetup {
491492
return event;
492493
}
493494

494-
static void WaitFor(TDuration timeout, TString description, std::function<bool(TString&)> callback) {
495-
TInstant start = TInstant::Now();
496-
while (TInstant::Now() - start <= timeout) {
497-
TString errorString;
498-
if (callback(errorString)) {
499-
return;
500-
}
501-
Cerr << "Wait " << description << " " << TInstant::Now() - start << ": " << errorString << "\n";
502-
Sleep(TDuration::Seconds(1));
503-
}
504-
UNIT_ASSERT_C(false, "Waiting " << description << " timeout");
505-
}
506-
507495
NMonitoring::TDynamicCounterPtr GetWorkloadManagerCounters(ui32 nodeIndex) const {
508496
return GetServiceCounters(GetRuntime()->GetAppData(nodeIndex).Counters, "kqp")
509497
->GetSubgroup("subsystem", "workload_manager");
@@ -598,6 +586,21 @@ TIntrusivePtr<IYdbSetup> TYdbSetupSettings::Create() const {
598586
return MakeIntrusive<TWorkloadServiceYdbSetup>(*this);
599587
}
600588

589+
//// IYdbSetup
590+
591+
void IYdbSetup::WaitFor(TDuration timeout, TString description, std::function<bool(TString&)> callback) {
592+
TInstant start = TInstant::Now();
593+
while (TInstant::Now() - start <= timeout) {
594+
TString errorString;
595+
if (callback(errorString)) {
596+
return;
597+
}
598+
Cerr << "Wait " << description << " " << TInstant::Now() - start << ": " << errorString << "\n";
599+
Sleep(TDuration::Seconds(1));
600+
}
601+
UNIT_ASSERT_C(false, "Waiting " << description << " timeout. Spent time " << TInstant::Now() - start << " exceeds limit " << timeout);
602+
}
603+
601604
//// TSampleQueriess
602605

603606
void TSampleQueries::CompareYson(const TString& expected, const TString& actual) {

ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,10 @@ class IYdbSetup : public TThrRefBase {
106106
virtual void StopWorkloadService(ui64 nodeIndex = 0) const = 0;
107107
virtual void ValidateWorkloadServiceCounters(bool checkTableCounters = true, const TString& poolId = "") const = 0;
108108

109+
// Coomon helpers
109110
virtual TTestActorRuntime* GetRuntime() const = 0;
110111
virtual const TYdbSetupSettings& GetSettings() const = 0;
112+
static void WaitFor(TDuration timeout, TString description, std::function<bool(TString&)> callback);
111113
};
112114

113115
// Test queries

ydb/core/kqp/workload_service/ut/kqp_workload_service_tables_ut.cpp

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -133,22 +133,26 @@ Y_UNIT_TEST_SUITE(KqpWorkloadServiceTables) {
133133
Y_UNIT_TEST(TestLeaseExpiration) {
134134
auto ydb = TYdbSetupSettings()
135135
.ConcurrentQueryLimit(1)
136+
.QueryCancelAfter(TDuration::Zero())
136137
.Create();
137138

138139
// Create tables
139-
TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query));
140+
auto hangingRequest = ydb->ExecuteQueryAsync(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().HangUpDuringExecution(true));
141+
ydb->WaitQueryExecution(hangingRequest);
140142

141-
const TDuration leaseDuration = TDuration::Seconds(10);
142-
StartRequest(ydb, "test_session", leaseDuration);
143-
DelayRequest(ydb, "test_session", leaseDuration);
144-
CheckPoolDescription(ydb, 1, 1, leaseDuration);
143+
auto delayedRequest = ydb->ExecuteQueryAsync(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().ExecutionExpected(false));
144+
ydb->WaitPoolState({.DelayedRequests = 1, .RunningRequests = 1});
145145

146146
ydb->StopWorkloadService();
147147
ydb->WaitPoolHandlersCount(0);
148148

149149
// Check that lease expired
150-
Sleep(leaseDuration + TDuration::Seconds(5));
151-
CheckPoolDescription(ydb, 0, 0);
150+
IYdbSetup::WaitFor(TDuration::Seconds(60), "lease expiration", [ydb](TString& errorString) {
151+
auto description = ydb->GetPoolDescription(TDuration::Zero());
152+
153+
errorString = TStringBuilder() << "delayed = " << description.DelayedRequests << ", running = " << description.RunningRequests;
154+
return description.AmountRequests() == 0;
155+
});
152156
}
153157

154158
Y_UNIT_TEST(TestLeaseUpdates) {

ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -444,19 +444,16 @@ Y_UNIT_TEST_SUITE(ResourcePoolsDdl) {
444444
DROP RESOURCE POOL )" << poolId << ";"
445445
);
446446

447-
TInstant start = TInstant::Now();
448-
while (TInstant::Now() - start <= FUTURE_WAIT_TIMEOUT) {
449-
if (ydb->Navigate(TStringBuilder() << ".resource_pools/" << poolId)->ResultSet.at(0).Kind == NSchemeCache::TSchemeCacheNavigate::EKind::KindUnknown) {
450-
auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings);
451-
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::NOT_FOUND, result.GetIssues().ToString());
452-
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Resource pool " << poolId << " not found");
453-
return;
454-
}
455-
456-
Cerr << "WaitPoolDrop " << TInstant::Now() - start << "\n";
457-
Sleep(TDuration::Seconds(1));
458-
}
459-
UNIT_ASSERT_C(false, "Pool drop waiting timeout");
447+
IYdbSetup::WaitFor(FUTURE_WAIT_TIMEOUT, "pool drop", [ydb, poolId](TString& errorString) {
448+
auto kind = ydb->Navigate(TStringBuilder() << ".resource_pools/" << poolId)->ResultSet.at(0).Kind;
449+
450+
errorString = TStringBuilder() << "kind = " << kind;
451+
return kind == NSchemeCache::TSchemeCacheNavigate::EKind::KindUnknown;
452+
});
453+
454+
auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings);
455+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::NOT_FOUND, result.GetIssues().ToString());
456+
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), TStringBuilder() << "Resource pool " << poolId << " not found");
460457
}
461458

462459
Y_UNIT_TEST(TestResourcePoolAcl) {

ydb/core/kqp/workload_service/ya.make

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ PEERDIR(
1010
ydb/core/fq/libs/compute/common
1111

1212
ydb/core/kqp/workload_service/actors
13+
14+
ydb/library/actors/interconnect
1315
)
1416

1517
YQL_LAST_ABI_VERSION()

0 commit comments

Comments
 (0)