Skip to content

Commit 8ccfd2a

Browse files
authored
Merge d60772c into 732bf12
2 parents 732bf12 + d60772c commit 8ccfd2a

File tree

8 files changed

+55
-21
lines changed

8 files changed

+55
-21
lines changed

ydb/core/kqp/executer_actor/kqp_executer_impl.h

+9-6
Original file line numberDiff line numberDiff line change
@@ -684,7 +684,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
684684
if (statusCode == Ydb::StatusIds::INTERNAL_ERROR) {
685685
InternalError(issues);
686686
} else if (statusCode == Ydb::StatusIds::TIMEOUT) {
687-
TimeoutError(ev->Sender);
687+
TimeoutError(ev->Sender, issues);
688688
} else {
689689
RuntimeError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), issues);
690690
}
@@ -1706,29 +1706,32 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17061706
ReplyErrorAndDie(status, &issues);
17071707
}
17081708

1709-
void TimeoutError(TActorId abortSender) {
1709+
void TimeoutError(TActorId abortSender, NYql::TIssues issues) {
17101710
if (AlreadyReplied) {
17111711
LOG_E("Timeout when we already replied - not good" << Endl << TBackTrace().PrintToString() << Endl);
17121712
return;
17131713
}
17141714

17151715
const auto status = NYql::NDqProto::StatusIds::TIMEOUT;
1716-
const TString message = "Request timeout exceeded";
1716+
if (issues.Empty()) {
1717+
issues.AddIssue("Request timeout exceeded");
1718+
}
17171719

1718-
TerminateComputeActors(Ydb::StatusIds::TIMEOUT, message);
1720+
TerminateComputeActors(Ydb::StatusIds::TIMEOUT, issues);
17191721

17201722
AlreadyReplied = true;
17211723

1722-
LOG_E("Abort execution: " << NYql::NDqProto::StatusIds_StatusCode_Name(status) << "," << message);
1724+
LOG_E("Abort execution: " << NYql::NDqProto::StatusIds_StatusCode_Name(status) << ", " << issues.ToOneLineString());
17231725
if (ExecuterSpan) {
17241726
ExecuterSpan.EndError(TStringBuilder() << NYql::NDqProto::StatusIds_StatusCode_Name(status));
17251727
}
17261728

17271729
ResponseEv->Record.MutableResponse()->SetStatus(Ydb::StatusIds::TIMEOUT);
1730+
NYql::IssuesToMessage(issues, ResponseEv->Record.MutableResponse()->MutableIssues());
17281731

17291732
// TEvAbortExecution can come from either ComputeActor or SessionActor (== Target).
17301733
if (abortSender != Target) {
1731-
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(status, message);
1734+
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(status, issues);
17321735
this->Send(Target, abortEv.Release());
17331736
}
17341737

ydb/core/kqp/proxy_service/kqp_proxy_service.cpp

+6-4
Original file line numberDiff line numberDiff line change
@@ -148,12 +148,14 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
148148
struct TEvOnRequestTimeout: public TEventLocal<TEvOnRequestTimeout, EEv::EvOnRequestTimeout> {
149149
ui64 RequestId;
150150
TDuration Timeout;
151+
TDuration InitialTimeout;
151152
NYql::NDqProto::StatusIds::StatusCode Status;
152153
int Round;
153154

154155
TEvOnRequestTimeout(ui64 requestId, TDuration timeout, NYql::NDqProto::StatusIds::StatusCode status, int round)
155156
: RequestId(requestId)
156157
, Timeout(timeout)
158+
, InitialTimeout(timeout)
157159
, Status(status)
158160
, Round(round)
159161
{}
@@ -1283,9 +1285,9 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
12831285

12841286
const TKqpSessionInfo* info = LocalSessions->FindPtr(reqInfo->SessionId);
12851287
if (msg->Round == 0 && info) {
1286-
TString message = TStringBuilder()
1287-
<< "request's " << (msg->Status == NYql::NDqProto::StatusIds::TIMEOUT ? "timeout" : "cancelAfter")
1288-
<< " exceeded";
1288+
TString message = msg->Status == NYql::NDqProto::StatusIds::TIMEOUT
1289+
? (TStringBuilder() << "Request timeout " << msg->Timeout.MilliSeconds() << "ms exceeded")
1290+
: (TStringBuilder() << "Request canceled after " << msg->Timeout.MilliSeconds() << "ms");
12891291

12901292
Send(info->WorkerId, new TEvKqp::TEvAbortExecution(msg->Status, message));
12911293

@@ -1297,7 +1299,7 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
12971299
}
12981300
} else {
12991301
TString message = TStringBuilder()
1300-
<< "Query did not complete within specified timeout, session id " << reqInfo->SessionId;
1302+
<< "Query did not complete within specified timeout " << msg->InitialTimeout.MilliSeconds() << "ms, session id " << reqInfo->SessionId;
13011303
ReplyProcessError(NYql::NDq::DqStatusToYdbStatus(msg->Status), message, requestId);
13021304
}
13031305
}

ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp

+14
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <ydb/library/ydb_issue/issue_helpers.h>
44
#include <ydb/core/kqp/common/events/events.h>
55
#include <ydb/core/kqp/common/kqp.h>
6+
#include <ydb/core/kqp/common/kqp_timeouts.h>
67
#include <ydb/core/kqp/executer_actor/kqp_executer.h>
78
#include <ydb/core/kqp/proxy_service/kqp_script_executions.h>
89
#include <ydb/core/kqp/proxy_service/proto/result_set_meta.pb.h>
@@ -216,6 +217,12 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
216217
WaitFinalizationRequest = true;
217218
RunState = IsExecuting() ? ERunState::Finishing : RunState;
218219

220+
if (RunState == ERunState::Cancelling) {
221+
NYql::TIssue cancelIssue("Request was canceled by user");
222+
cancelIssue.SetCode(NYql::DEFAULT_ERROR, NYql::TSeverityIds::S_INFO);
223+
Issues.AddIssue(std::move(cancelIssue));
224+
}
225+
219226
auto scriptFinalizeRequest = std::make_unique<TEvScriptFinalizeRequest>(
220227
GetFinalizationStatusFromRunState(), ExecutionId, Database, Status, GetExecStatusFromStatusCode(Status),
221228
Issues, std::move(QueryStats), std::move(QueryPlan), std::move(QueryAst), LeaseGeneration
@@ -424,6 +431,13 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
424431
const auto& issueMessage = record.GetResponse().GetQueryIssues();
425432
NYql::IssuesFromMessage(issueMessage, Issues);
426433

434+
if (record.GetYdbStatus() == Ydb::StatusIds::TIMEOUT) {
435+
const TDuration timeout = GetQueryTimeout(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT, Request.GetRequest().GetTimeoutMs(), {}, QueryServiceConfig);
436+
NYql::TIssue timeoutIssue(TStringBuilder() << "Current request timeout is " << timeout.MilliSeconds() << "ms");
437+
timeoutIssue.SetCode(NYql::DEFAULT_ERROR, NYql::TSeverityIds::S_INFO);
438+
Issues.AddIssue(std::move(timeoutIssue));
439+
}
440+
427441
if (record.GetResponse().HasQueryPlan()) {
428442
QueryPlan = record.GetResponse().GetQueryPlan();
429443
}

ydb/core/kqp/session_actor/kqp_query_state.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ std::unique_ptr<TEvTxProxySchemeCache::TEvNavigateKeySet> TKqpQueryState::BuildN
138138

139139

140140
bool TKqpQueryState::SaveAndCheckCompileResult(TEvKqp::TEvCompileResponse* ev) {
141+
CompilationRunning = false;
141142
CompileResult = ev->CompileResult;
142143
YQL_ENSURE(CompileResult);
143144
MaxReadType = CompileResult->MaxReadType;

ydb/core/kqp/session_actor/kqp_query_state.h

+1
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ class TKqpQueryState : public TNonCopyable {
128128
bool KeepSession = false;
129129
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
130130
NActors::TMonotonic StartedAt;
131+
bool CompilationRunning = false;
131132

132133
THashMap<NKikimr::TTableId, ui64> TableVersions;
133134

ydb/core/kqp/session_actor/kqp_session_actor.cpp

+14-9
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
524524

525525
void CompileQuery() {
526526
YQL_ENSURE(QueryState);
527+
QueryState->CompilationRunning = true;
527528
auto ev = QueryState->BuildCompileRequest(CompilationCookie, GUCSettings);
528529
LOG_D("Sending CompileQuery request");
529530

@@ -1528,16 +1529,22 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
15281529
TString logMsg = TStringBuilder() << "got TEvAbortExecution in " << CurrentStateFuncName();
15291530
LOG_I(logMsg << ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode()) << " send to: " << ExecuterId);
15301531

1531-
TString reason = TStringBuilder() << "Request timeout exceeded, cancelling after "
1532-
<< (AppData()->MonotonicTimeProvider->Now() - QueryState->StartedAt).MilliSeconds()
1533-
<< " milliseconds.";
1532+
auto issues = ev->Get()->GetIssues();
1533+
TStringBuilder reason = TStringBuilder() << "Cancelling after " << (AppData()->MonotonicTimeProvider->Now() - QueryState->StartedAt).MilliSeconds() << "ms";
1534+
if (QueryState->CompilationRunning) {
1535+
reason << " during compilation";
1536+
} else if (ExecuterId) {
1537+
reason << " during execution";
1538+
} else {
1539+
reason << " in " << CurrentStateFuncName();
1540+
}
1541+
issues.AddIssue(reason);
15341542

15351543
if (ExecuterId) {
1536-
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(msg.GetStatusCode(), reason);
1544+
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(msg.GetStatusCode(), issues);
15371545
Send(ExecuterId, abortEv.Release(), IEventHandle::FlagTrackDelivery);
15381546
} else {
1539-
const auto& issues = ev->Get()->GetIssues();
1540-
ReplyQueryError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), logMsg, MessageFromIssues(issues));
1547+
ReplyQueryError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), "", MessageFromIssues(issues));
15411548
}
15421549
}
15431550

@@ -2279,9 +2286,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
22792286

22802287
void Handle(TEvKqp::TEvCancelQueryRequest::TPtr& ev) {
22812288
{
2282-
auto abort = MakeHolder<NYql::NDq::TEvDq::TEvAbortExecution>();
2283-
abort->Record.SetStatusCode(NYql::NDqProto::StatusIds::CANCELLED);
2284-
abort->Record.AddIssues()->set_message("Canceled");
2289+
auto abort = MakeHolder<NYql::NDq::TEvDq::TEvAbortExecution>(NYql::NDqProto::StatusIds::CANCELLED, "Request was canceled");
22852290
Send(SelfId(), abort.Release());
22862291
}
22872292

ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ struct TSampleQueries {
131131
template <typename TResult>
132132
static void CheckCancelled(const TResult& result) {
133133
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::CANCELLED, result.GetIssues().ToString());
134-
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Request timeout exceeded, cancelling after");
134+
UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Request was canceled");
135135
}
136136

137137
template <typename TResult>

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

+9-1
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
109109
public:
110110
void Bootstrap() {
111111
try {
112+
StartTime = TInstant::Now();
112113
{
113114
TStringBuilder prefixBuilder;
114115
prefixBuilder << "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", task: " << Task.GetId() << ". ";
@@ -1049,8 +1050,14 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
10491050
);
10501051
}
10511052

1053+
TStringBuilder reason = TStringBuilder() << "Task execution timeout ";
1054+
if (RuntimeSettings.Timeout) {
1055+
reason << RuntimeSettings.Timeout->MilliSeconds() << "ms ";
1056+
}
1057+
reason << "exceeded, terminating after " << (TInstant::Now() - StartTime).MilliSeconds() << "ms";
1058+
10521059
State = NDqProto::COMPUTE_STATE_FAILURE;
1053-
ReportStateAndMaybeDie(NYql::NDqProto::StatusIds::TIMEOUT, {TIssue("timeout exceeded")}, true);
1060+
ReportStateAndMaybeDie(NYql::NDqProto::StatusIds::TIMEOUT, {TIssue(reason)}, true);
10541061
break;
10551062
}
10561063
case EEvWakeupTag::PeriodicStatsTag: {
@@ -1928,6 +1935,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
19281935
NWilson::TSpan ComputeActorSpan;
19291936
TDuration SourceCpuTime;
19301937
private:
1938+
TInstant StartTime;
19311939
bool Running = true;
19321940
TInstant LastSendStatsTime;
19331941
bool PassExceptions = false;

0 commit comments

Comments
 (0)