Skip to content

Commit e981908

Browse files
committed
Added resource waiting
1 parent 9f865f6 commit e981908

File tree

4 files changed

+71
-0
lines changed

4 files changed

+71
-0
lines changed

ydb/tests/tools/kqprun/src/actors.cpp

+53
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "actors.h"
22

33
#include <ydb/core/kqp/common/simple/services.h>
4+
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
45

56

67
namespace NKqpRun {
@@ -91,6 +92,54 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo
9192
TProgressCallback ProgressCallback_;
9293
};
9394

95+
class TResourceWaiterActor : public NActors::TActorBootstrapped<TResourceWaiterActor> {
96+
public:
97+
TResourceWaiterActor(NThreading::TPromise<void> promise, i32 expectedNodeCount)
98+
: ExpectedNodeCount_(expectedNodeCount)
99+
, Promise_(promise)
100+
{}
101+
102+
void Bootstrap() {
103+
GetResourceManager();
104+
WaitResourcePublish();
105+
106+
Promise_.SetValue();
107+
PassAway();
108+
}
109+
110+
private:
111+
void GetResourceManager() {
112+
while (true) {
113+
ResourceManager_ = NKikimr::NKqp::TryGetKqpResourceManager(SelfId().NodeId());
114+
if (ResourceManager_) {
115+
break;
116+
}
117+
118+
Sleep(TDuration::MilliSeconds(10));
119+
}
120+
}
121+
122+
void WaitResourcePublish() {
123+
while (true) {
124+
auto resourcesPromise = NThreading::NewPromise<i32>();
125+
ResourceManager_->RequestClusterResourcesInfo([resourcesPromise](TVector<NKikimrKqp::TKqpNodeResources>&& resources) mutable {
126+
resourcesPromise.SetValue(resources.size());
127+
});
128+
129+
if (resourcesPromise.GetFuture().GetValueSync() == ExpectedNodeCount_) {
130+
break;
131+
}
132+
Sleep(TDuration::MilliSeconds(10));
133+
}
134+
}
135+
136+
private:
137+
const i32 ExpectedNodeCount_;
138+
139+
NThreading::TPromise<void> Promise_;
140+
std::shared_ptr<NKikimr::NKqp::NRm::IKqpResourceManager> ResourceManager_;
141+
};
142+
94143
} // anonymous namespace
95144

96145
NActors::IActor* CreateRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request,
@@ -100,4 +149,8 @@ NActors::IActor* CreateRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQuer
100149
return new TRunScriptActorMock(std::move(request), promise, resultRowsLimit, resultSizeLimit, resultSets, progressCallback);
101150
}
102151

152+
NActors::IActor* CreateResourceWaiterActor(NThreading::TPromise<void> promise, i32 expectedNodeCount) {
153+
return new TResourceWaiterActor(promise, expectedNodeCount);
154+
}
155+
103156
} // namespace NKqpRun

ydb/tests/tools/kqprun/src/actors.h

+2
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,6 @@ NActors::IActor* CreateRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQuer
1111
ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets,
1212
TProgressCallback progressCallback);
1313

14+
NActors::IActor* CreateResourceWaiterActor(NThreading::TPromise<void> promise, i32 expectedNodeCount);
15+
1416
} // namespace NKqpRun

ydb/tests/tools/kqprun/src/common.h

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ constexpr char YQL_TOKEN_VARIABLE[] = "YQL_TOKEN";
1616
struct TYdbSetupSettings {
1717
i32 NodeCount = 1;
1818
TString DomainName = "Root";
19+
TDuration InitializationTimeout = TDuration::Seconds(10);
1920

2021
bool TraceOptEnabled = false;
2122
TMaybe<TString> LogOutputFile;

ydb/tests/tools/kqprun/src/ydb_setup.cpp

+15
Original file line numberDiff line numberDiff line change
@@ -163,12 +163,27 @@ class TYdbSetup::TImpl {
163163
NYql::NLog::InitLogger(NActors::CreateNullBackend());
164164
}
165165

166+
void WaitResourcePublishing() const {
167+
auto promise = NThreading::NewPromise();
168+
GetRuntime()->Register(CreateResourceWaiterActor(promise, Settings_.NodeCount));
169+
170+
try {
171+
promise.GetFuture().GetValue(Settings_.InitializationTimeout);
172+
} catch (...) {
173+
ythrow yexception() << "Failed to initialize all resources: " << CurrentExceptionMessage();
174+
}
175+
}
176+
166177
public:
167178
explicit TImpl(const TYdbSetupSettings& settings)
168179
: Settings_(settings)
169180
{
170181
InitializeYqlLogger();
171182
InitializeServer();
183+
184+
if (Settings_.NodeCount > 1) {
185+
WaitResourcePublishing();
186+
}
172187
}
173188

174189
NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr SchemeQueryRequest(const TString& query, const TString& traceId) const {

0 commit comments

Comments
 (0)