Skip to content

Commit b53600a

Browse files
authored
YQ kqprun added session settings (#9589)
1 parent 4b09967 commit b53600a

File tree

8 files changed

+307
-63
lines changed

8 files changed

+307
-63
lines changed

ydb/tests/tools/kqprun/kqprun.cpp

+77-38
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ struct TExecutionOptions {
3636

3737
ui32 LoopCount = 1;
3838
TDuration LoopDelay;
39+
bool ContinueAfterFail = false;
3940

4041
bool ForgetExecution = false;
4142
std::vector<EExecutionCase> ExecutionCases;
@@ -44,6 +45,7 @@ struct TExecutionOptions {
4445
std::vector<TString> TraceIds;
4546
std::vector<TString> PoolIds;
4647
std::vector<TString> UserSIDs;
48+
std::vector<TDuration> Timeouts;
4749
ui64 ResultsRowsLimit = 0;
4850

4951
const TString DefaultTraceId = "kqprun";
@@ -87,7 +89,8 @@ struct TExecutionOptions {
8789
.TraceId = DefaultTraceId,
8890
.PoolId = "",
8991
.UserSID = BUILTIN_ACL_ROOT,
90-
.Database = ""
92+
.Database = "",
93+
.Timeout = TDuration::Zero()
9194
};
9295
}
9396

@@ -106,7 +109,8 @@ struct TExecutionOptions {
106109
.TraceId = TStringBuilder() << GetValue(index, TraceIds, DefaultTraceId) << "-" << startTime.ToString(),
107110
.PoolId = GetValue(index, PoolIds, TString()),
108111
.UserSID = GetValue(index, UserSIDs, TString(BUILTIN_ACL_ROOT)),
109-
.Database = GetValue(index, Databases, TString())
112+
.Database = GetValue(index, Databases, TString()),
113+
.Timeout = GetValue(index, Timeouts, TDuration::Zero())
110114
};
111115
}
112116

@@ -136,6 +140,7 @@ struct TExecutionOptions {
136140
checker(TraceIds.size(), "trace ids");
137141
checker(PoolIds.size(), "pool ids");
138142
checker(UserSIDs.size(), "user SIDs");
143+
checker(Timeouts.size(), "timeouts");
139144
}
140145

141146
void ValidateSchemeQueryOptions(const NKqpRun::TRunnerOptions& runnerOptions) const {
@@ -148,6 +153,10 @@ struct TExecutionOptions {
148153
}
149154

150155
void ValidateScriptExecutionOptions(const NKqpRun::TRunnerOptions& runnerOptions) const {
156+
if (runnerOptions.YdbSettings.SameSession && HasExecutionCase(EExecutionCase::AsyncQuery)) {
157+
ythrow yexception() << "Same session can not be used with async quries";
158+
}
159+
151160
// Script specific
152161
if (HasExecutionCase(EExecutionCase::GenericScript)) {
153162
return;
@@ -180,6 +189,9 @@ struct TExecutionOptions {
180189
if (runnerOptions.ScriptQueryPlanOutput) {
181190
ythrow yexception() << "Script query plan output can not be used without script/yql queries";
182191
}
192+
if (runnerOptions.YdbSettings.SameSession) {
193+
ythrow yexception() << "Same session can not be used without script/yql queries";
194+
}
183195
}
184196

185197
void ValidateAsyncOptions(const NKqpRun::TAsyncQueriesSettings& asyncQueriesSettings) const {
@@ -237,6 +249,49 @@ struct TExecutionOptions {
237249
};
238250

239251

252+
void RunArgumentQuery(size_t index, size_t queryId, TInstant startTime, const TExecutionOptions& executionOptions, NKqpRun::TKqpRunner& runner) {
253+
NColorizer::TColors colors = NColorizer::AutoColors(Cout);
254+
255+
switch (executionOptions.GetExecutionCase(index)) {
256+
case TExecutionOptions::EExecutionCase::GenericScript: {
257+
if (!runner.ExecuteScript(executionOptions.GetScriptQueryOptions(index, queryId, startTime))) {
258+
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Script execution failed";
259+
}
260+
Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Fetching script results..." << colors.Default() << Endl;
261+
if (!runner.FetchScriptResults()) {
262+
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Fetch script results failed";
263+
}
264+
if (executionOptions.ForgetExecution) {
265+
Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Forgetting script execution operation..." << colors.Default() << Endl;
266+
if (!runner.ForgetExecutionOperation()) {
267+
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Forget script execution operation failed";
268+
}
269+
}
270+
break;
271+
}
272+
273+
case TExecutionOptions::EExecutionCase::GenericQuery: {
274+
if (!runner.ExecuteQuery(executionOptions.GetScriptQueryOptions(index, queryId, startTime))) {
275+
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Query execution failed";
276+
}
277+
break;
278+
}
279+
280+
case TExecutionOptions::EExecutionCase::YqlScript: {
281+
if (!runner.ExecuteYqlScript(executionOptions.GetScriptQueryOptions(index, queryId, startTime))) {
282+
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Yql script execution failed";
283+
}
284+
break;
285+
}
286+
287+
case TExecutionOptions::EExecutionCase::AsyncQuery: {
288+
runner.ExecuteQueryAsync(executionOptions.GetScriptQueryOptions(index, queryId, startTime));
289+
break;
290+
}
291+
}
292+
}
293+
294+
240295
void RunArgumentQueries(const TExecutionOptions& executionOptions, NKqpRun::TKqpRunner& runner) {
241296
NColorizer::TColors colors = NColorizer::AutoColors(Cout);
242297

@@ -256,8 +311,7 @@ void RunArgumentQueries(const TExecutionOptions& executionOptions, NKqpRun::TKqp
256311
}
257312

258313
const TInstant startTime = TInstant::Now();
259-
const auto executionCase = executionOptions.GetExecutionCase(id);
260-
if (executionCase != TExecutionOptions::EExecutionCase::AsyncQuery) {
314+
if (executionOptions.GetExecutionCase(id) != TExecutionOptions::EExecutionCase::AsyncQuery) {
261315
Cout << colors.Yellow() << startTime.ToIsoStringLocal() << " Executing script";
262316
if (numberQueries > 1) {
263317
Cout << " " << id;
@@ -268,41 +322,17 @@ void RunArgumentQueries(const TExecutionOptions& executionOptions, NKqpRun::TKqp
268322
Cout << "..." << colors.Default() << Endl;
269323
}
270324

271-
switch (executionCase) {
272-
case TExecutionOptions::EExecutionCase::GenericScript:
273-
if (!runner.ExecuteScript(executionOptions.GetScriptQueryOptions(id, queryId, startTime))) {
274-
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Script execution failed";
275-
}
276-
Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Fetching script results..." << colors.Default() << Endl;
277-
if (!runner.FetchScriptResults()) {
278-
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Fetch script results failed";
279-
}
280-
if (executionOptions.ForgetExecution) {
281-
Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Forgetting script execution operation..." << colors.Default() << Endl;
282-
if (!runner.ForgetExecutionOperation()) {
283-
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Forget script execution operation failed";
284-
}
285-
}
286-
break;
287-
288-
case TExecutionOptions::EExecutionCase::GenericQuery:
289-
if (!runner.ExecuteQuery(executionOptions.GetScriptQueryOptions(id, queryId, startTime))) {
290-
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Query execution failed";
291-
}
292-
break;
293-
294-
case TExecutionOptions::EExecutionCase::YqlScript:
295-
if (!runner.ExecuteYqlScript(executionOptions.GetScriptQueryOptions(id, queryId, startTime))) {
296-
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Yql script execution failed";
325+
try {
326+
RunArgumentQuery(id, queryId, startTime, executionOptions, runner);
327+
} catch (const yexception& exception) {
328+
if (executionOptions.ContinueAfterFail) {
329+
Cerr << colors.Red() << CurrentExceptionMessage() << colors.Default() << Endl;
330+
} else {
331+
throw exception;
297332
}
298-
break;
299-
300-
case TExecutionOptions::EExecutionCase::AsyncQuery:
301-
runner.ExecuteQueryAsync(executionOptions.GetScriptQueryOptions(id, queryId, startTime));
302-
break;
303333
}
304334
}
305-
runner.WaitAsyncQueries();
335+
runner.FinalizeRunner();
306336

307337
if (executionOptions.HasResults()) {
308338
try {
@@ -612,9 +642,11 @@ class TMain : public TMainClassArgs {
612642
ExecutionOptions.ScriptQueryActions.emplace_back(scriptAction(choice));
613643
});
614644

615-
options.AddLongOption("timeout", "Reauests timeout in milliseconds")
645+
options.AddLongOption("timeout", "Timeout in milliseconds for -p queries")
616646
.RequiredArgument("uint")
617-
.StoreMappedResultT<ui64>(&RunnerOptions.YdbSettings.RequestsTimeout, &TDuration::MilliSeconds<ui64>);
647+
.Handler1([this](const NLastGetopt::TOptsParser* option) {
648+
ExecutionOptions.Timeouts.emplace_back(TDuration::MilliSeconds<ui64>(FromString(option->CurValOrDef())));
649+
});
618650

619651
options.AddLongOption("cancel-after", "Cancel script execution operation after specified delay in milliseconds")
620652
.RequiredArgument("uint")
@@ -632,6 +664,9 @@ class TMain : public TMainClassArgs {
632664
.RequiredArgument("uint")
633665
.DefaultValue(0)
634666
.StoreMappedResultT<ui64>(&ExecutionOptions.LoopDelay, &TDuration::MilliSeconds<ui64>);
667+
options.AddLongOption("continue-after-fail", "Don't not stop requests execution after fails")
668+
.NoArgument()
669+
.SetFlag(&ExecutionOptions.ContinueAfterFail);
635670

636671
options.AddLongOption('D', "database", "Database path for -p queries")
637672
.RequiredArgument("path")
@@ -645,6 +680,10 @@ class TMain : public TMainClassArgs {
645680
.RequiredArgument("pool-id")
646681
.EmplaceTo(&ExecutionOptions.PoolIds);
647682

683+
options.AddLongOption("same-session", "Run all -p requests in one session")
684+
.NoArgument()
685+
.SetFlag(&RunnerOptions.YdbSettings.SameSession);
686+
648687
// Cluster settings
649688

650689
options.AddLongOption('N', "node-count", "Number of nodes to create")

ydb/tests/tools/kqprun/src/actors.cpp

+113-2
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ class TAsyncQueryRunnerActor : public NActors::TActor<TAsyncQueryRunnerActor> {
179179

180180
MaxInFlight_ = std::max(MaxInFlight_, RunningRequests_.size());
181181
if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::EachQuery) {
182-
Cout << TStringBuilder() << CoutColors_.Cyan() << TInstant::Now().ToIsoStringLocal() << " Request #" << RequestId_ << " started. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << "\n";
182+
Cout << CoutColors_.Cyan() << TInstant::Now().ToIsoStringLocal() << " Request #" << RequestId_ << " started. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << "\n";
183183
}
184184

185185
RequestId_++;
@@ -193,7 +193,7 @@ class TAsyncQueryRunnerActor : public NActors::TActor<TAsyncQueryRunnerActor> {
193193
}
194194

195195
if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::Final) {
196-
Cout << TStringBuilder() << CoutColors_.Cyan() << TInstant::Now().ToIsoStringLocal() << " All async requests finished. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << "\n";
196+
Cout << CoutColors_.Cyan() << TInstant::Now().ToIsoStringLocal() << " All async requests finished. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << "\n";
197197
}
198198

199199
FinalizePromise_->SetValue();
@@ -292,6 +292,113 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite
292292
std::shared_ptr<NKikimr::NKqp::NRm::IKqpResourceManager> ResourceManager_;
293293
};
294294

295+
class TSessionHolderActor : public NActors::TActorBootstrapped<TSessionHolderActor> {
296+
public:
297+
TSessionHolderActor(TCreateSessionRequest request, NThreading::TPromise<TString> openPromise, NThreading::TPromise<void> closePromise)
298+
: TargetNode_(request.TargetNode)
299+
, TraceId_(request.Event->Record.GetTraceId())
300+
, Request_(std::move(request.Event))
301+
, OpenPromise_(openPromise)
302+
, ClosePromise_(closePromise)
303+
{}
304+
305+
void Bootstrap() {
306+
Become(&TSessionHolderActor::StateFunc);
307+
Send(NKikimr::NKqp::MakeKqpProxyID(TargetNode_), std::move(Request_));
308+
}
309+
310+
void Handle(NKikimr::NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev) {
311+
const auto& response = ev->Get()->Record;
312+
if (response.GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
313+
FailAndPassAway(TStringBuilder() << "Failed to create session, " << response.GetYdbStatus() << ", reason: " << response.GetError() << "\n");
314+
return;
315+
}
316+
317+
SessionId_ = response.GetResponse().GetSessionId();
318+
Cout << CoutColors_.Cyan() << "Created new session on node " << TargetNode_ << " with id " << SessionId_ << "\n";
319+
320+
PingSession();
321+
}
322+
323+
void PingSession() {
324+
auto event = std::make_unique<NKikimr::NKqp::TEvKqp::TEvPingSessionRequest>();
325+
event->Record.SetTraceId(TraceId_);
326+
event->Record.MutableRequest()->SetSessionId(SessionId_);
327+
NActors::ActorIdToProto(SelfId(), event->Record.MutableRequest()->MutableExtSessionCtrlActorId());
328+
329+
Send(NKikimr::NKqp::MakeKqpProxyID(TargetNode_), std::move(event));
330+
}
331+
332+
void Handle(NKikimr::NKqp::TEvKqp::TEvPingSessionResponse::TPtr& ev) {
333+
const auto& response = ev->Get()->Record;
334+
if (response.GetStatus() != Ydb::StatusIds::SUCCESS) {
335+
NYql::TIssues issues;
336+
NYql::IssuesFromMessage(response.GetIssues(), issues);
337+
FailAndPassAway(TStringBuilder() << "Failed to ping session, " << response.GetStatus() << ", reason:\n" << issues.ToString() << "\n");
338+
return;
339+
}
340+
341+
if (!OpenPromise_.HasValue()) {
342+
OpenPromise_.SetValue(SessionId_);
343+
}
344+
345+
Schedule(TDuration::Seconds(1), new NActors::TEvents::TEvWakeup());
346+
}
347+
348+
void CloseSession() {
349+
if (!SessionId_) {
350+
FailAndPassAway("Failed to close session, creation is not finished");
351+
return;
352+
}
353+
354+
auto event = std::make_unique<NKikimr::NKqp::TEvKqp::TEvCloseSessionRequest>();
355+
event->Record.SetTraceId(TraceId_);
356+
event->Record.MutableRequest()->SetSessionId(SessionId_);
357+
358+
Send(NKikimr::NKqp::MakeKqpProxyID(TargetNode_), std::move(event));
359+
}
360+
361+
void Handle(NKikimr::NKqp::TEvKqp::TEvCloseSessionResponse::TPtr& ev) {
362+
const auto& response = ev->Get()->Record;
363+
if (response.GetStatus() != Ydb::StatusIds::SUCCESS) {
364+
NYql::TIssues issues;
365+
NYql::IssuesFromMessage(response.GetIssues(), issues);
366+
FailAndPassAway(TStringBuilder() << "Failed to close session, " << response.GetStatus() << ", reason:\n" << issues.ToString() << "\n");
367+
return;
368+
}
369+
370+
ClosePromise_.SetValue();
371+
PassAway();
372+
}
373+
374+
STRICT_STFUNC(StateFunc,
375+
hFunc(NKikimr::NKqp::TEvKqp::TEvCreateSessionResponse, Handle);
376+
hFunc(NKikimr::NKqp::TEvKqp::TEvPingSessionResponse, Handle);
377+
hFunc(NKikimr::NKqp::TEvKqp::TEvCloseSessionResponse, Handle);
378+
sFunc(NActors::TEvents::TEvWakeup, PingSession);
379+
sFunc(NActors::TEvents::TEvPoison, CloseSession);
380+
)
381+
382+
private:
383+
void FailAndPassAway(const TString& error) {
384+
if (!OpenPromise_.HasValue()) {
385+
OpenPromise_.SetException(error);
386+
}
387+
ClosePromise_.SetException(error);
388+
PassAway();
389+
}
390+
391+
private:
392+
const ui32 TargetNode_;
393+
const TString TraceId_;
394+
const NColorizer::TColors CoutColors_ = NColorizer::AutoColors(Cout);
395+
396+
std::unique_ptr<NKikimr::NKqp::TEvKqp::TEvCreateSessionRequest> Request_;
397+
NThreading::TPromise<TString> OpenPromise_;
398+
NThreading::TPromise<void> ClosePromise_;
399+
TString SessionId_;
400+
};
401+
295402
} // anonymous namespace
296403

297404
NActors::IActor* CreateRunScriptActorMock(TQueryRequest request, NThreading::TPromise<TQueryResponse> promise, TProgressCallback progressCallback) {
@@ -306,4 +413,8 @@ NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise<void> promise,
306413
return new TResourcesWaiterActor(promise, expectedNodeCount);
307414
}
308415

416+
NActors::IActor* CreateSessionHolderActor(TCreateSessionRequest request, NThreading::TPromise<TString> openPromise, NThreading::TPromise<void> closePromise) {
417+
return new TSessionHolderActor(std::move(request), openPromise, closePromise);
418+
}
419+
309420
} // namespace NKqpRun

ydb/tests/tools/kqprun/src/actors.h

+7
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ struct TQueryRequest {
2020
ui64 ResultSizeLimit;
2121
};
2222

23+
struct TCreateSessionRequest {
24+
std::unique_ptr<NKikimr::NKqp::TEvKqp::TEvCreateSessionRequest> Event;
25+
ui32 TargetNode;
26+
};
27+
2328
struct TEvPrivate {
2429
enum EEv : ui32 {
2530
EvStartAsyncQuery = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
@@ -78,4 +83,6 @@ NActors::IActor* CreateAsyncQueryRunnerActor(const TAsyncQueriesSettings& settin
7883

7984
NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise<void> promise, i32 expectedNodeCount);
8085

86+
NActors::IActor* CreateSessionHolderActor(TCreateSessionRequest request, NThreading::TPromise<TString> openPromise, NThreading::TPromise<void> closePromise);
87+
8188
} // namespace NKqpRun

ydb/tests/tools/kqprun/src/common.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ struct TYdbSetupSettings {
3232
std::unordered_set<TString> SharedTenants;
3333
std::unordered_set<TString> ServerlessTenants;
3434
TDuration InitializationTimeout = TDuration::Seconds(10);
35-
TDuration RequestsTimeout;
35+
bool SameSession = false;
3636

3737
bool DisableDiskMock = false;
3838
bool UseRealPDisks = false;
@@ -94,6 +94,7 @@ struct TRequestOptions {
9494
TString PoolId;
9595
TString UserSID;
9696
TString Database;
97+
TDuration Timeout;
9798
};
9899

99100
} // namespace NKqpRun

0 commit comments

Comments
 (0)