Skip to content

Commit 4971804

Browse files
authored
Merge 64e1e8c into 1d4f8fb
2 parents 1d4f8fb + 64e1e8c commit 4971804

File tree

6 files changed

+125
-30
lines changed

6 files changed

+125
-30
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,13 +205,17 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
205205
}
206206

207207
void Finalize() {
208+
YQL_ENSURE(!AlreadyReplied);
209+
208210
if (LocksBroken) {
209211
TString message = "Transaction locks invalidated.";
210212

211213
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED,
212214
YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message));
213215
}
214216

217+
AlreadyReplied = true;
218+
215219
auto& response = *ResponseEv->Record.MutableResponse();
216220

217221
FillResponseStats(Ydb::StatusIds::SUCCESS);
@@ -278,8 +282,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
278282
ExecuterSpan.EndOk();
279283

280284
Request.Transactions.crop(0);
281-
LOG_D("Sending response to: " << Target << ", results: " << ResponseEv->ResultsSize());
282-
Send(Target, ResponseEv.release());
283285
PassAway();
284286
}
285287

@@ -319,6 +321,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
319321
return "WaitSnapshotState";
320322
} else if (func == &TThis::WaitResolveState) {
321323
return "WaitResolveState";
324+
} else if (func == &TThis::WaitShutdownState) {
325+
return "WaitShutdownState";
322326
} else {
323327
return TBase::CurrentStateFuncName();
324328
}
@@ -2596,6 +2600,76 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
25962600
}
25972601

25982602
void PassAway() override {
2603+
if (Planner) {
2604+
if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
2605+
DoShutdown();
2606+
} else {
2607+
this->Become(&TThis::WaitShutdownState);
2608+
LOG_I("Waiting for shutdown of " << Planner->GetPendingComputeTasks().size() << " tasks and "
2609+
<< Planner->GetPendingComputeActors().size() << " compute actors");
2610+
TActivationContext::Schedule(TDuration::Seconds(10), new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPoison));
2611+
}
2612+
} else {
2613+
DoShutdown();
2614+
}
2615+
}
2616+
2617+
STATEFN(WaitShutdownState) {
2618+
switch(ev->GetTypeRewrite()) {
2619+
hFunc(TEvDqCompute::TEvState, HandleShutdown);
2620+
hFunc(TEvInterconnect::TEvNodeDisconnected, HandleShutdown);
2621+
hFunc(TEvents::TEvPoison, HandleShutdown);
2622+
default:
2623+
; // ignore all other events
2624+
}
2625+
}
2626+
2627+
void HandleShutdown(TEvDqCompute::TEvState::TPtr& ev) {
2628+
if (ev->Get()->Record.GetState() == NDqProto::COMPUTE_STATE_FINISHED) {
2629+
YQL_ENSURE(Planner);
2630+
2631+
TActorId actor = ev->Sender;
2632+
ui64 taskId = ev->Get()->Record.GetTaskId();
2633+
2634+
Planner->CompletedCA(taskId, actor);
2635+
2636+
if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
2637+
DoShutdown();
2638+
}
2639+
} else {
2640+
// TODO: handle another states.
2641+
}
2642+
}
2643+
2644+
void HandleShutdown(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
2645+
const auto nodeId = ev->Get()->NodeId;
2646+
LOG_N("Node has disconnected while shutdown: " << nodeId);
2647+
2648+
YQL_ENSURE(Planner);
2649+
2650+
for (const auto& task : TasksGraph.GetTasks()) {
2651+
if (task.Meta.NodeId == nodeId && !task.Meta.Completed) {
2652+
if (task.ComputeActorId) {
2653+
Planner->CompletedCA(task.Id, task.ComputeActorId);
2654+
} else {
2655+
Planner->TaskNotStarted(task.Id);
2656+
}
2657+
}
2658+
}
2659+
2660+
if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
2661+
DoShutdown();
2662+
}
2663+
}
2664+
2665+
void HandleShutdown(TEvents::TEvPoison::TPtr& ev) {
2666+
// Self-poison means timeout - don't wait anymore.
2667+
if (ev->Sender == SelfId()) {
2668+
DoShutdown();
2669+
}
2670+
}
2671+
2672+
void DoShutdown() {
25992673
auto totalTime = TInstant::Now() - StartTime;
26002674
Counters->Counters->DataTxTotalTimeHistogram->Collect(totalTime.MilliSeconds());
26012675

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 20 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -667,7 +667,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
667667
if (statusCode == Ydb::StatusIds::INTERNAL_ERROR) {
668668
InternalError(issues);
669669
} else if (statusCode == Ydb::StatusIds::TIMEOUT) {
670-
AbortExecutionAndDie(ev->Sender, NYql::NDqProto::StatusIds::TIMEOUT, "Request timeout exceeded");
670+
TimeoutError(ev->Sender);
671671
} else {
672672
RuntimeError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), issues);
673673
}
@@ -1624,14 +1624,14 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
16241624
protected:
16251625
void TerminateComputeActors(Ydb::StatusIds::StatusCode code, const NYql::TIssues& issues) {
16261626
for (const auto& task : this->TasksGraph.GetTasks()) {
1627-
if (task.ComputeActorId) {
1627+
if (task.ComputeActorId && !task.Meta.Completed) {
16281628
LOG_I("aborting compute actor execution, message: " << issues.ToOneLineString()
16291629
<< ", compute actor: " << task.ComputeActorId << ", task: " << task.Id);
16301630

16311631
auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDq::YdbStatusToDqStatus(code), issues);
16321632
this->Send(task.ComputeActorId, ev.Release());
16331633
} else {
1634-
LOG_I("task: " << task.Id << ", does not have Compute ActorId yet");
1634+
LOG_I("task: " << task.Id << ", does not have the CA id yet or is already complete");
16351635
}
16361636
}
16371637
}
@@ -1649,7 +1649,6 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
16491649

16501650
void InternalError(const NYql::TIssues& issues) {
16511651
LOG_E(issues.ToOneLineString());
1652-
TerminateComputeActors(Ydb::StatusIds::INTERNAL_ERROR, issues);
16531652
auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::UNEXPECTED, "Internal error while executing transaction.");
16541653
for (const NYql::TIssue& i : issues) {
16551654
issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(i));
@@ -1663,15 +1662,13 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
16631662

16641663
void ReplyUnavailable(const TString& message) {
16651664
LOG_E("UNAVAILABLE: " << message);
1666-
TerminateComputeActors(Ydb::StatusIds::UNAVAILABLE, message);
16671665
auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE);
16681666
issue.AddSubIssue(new NYql::TIssue(message));
16691667
ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE, issue);
16701668
}
16711669

16721670
void RuntimeError(Ydb::StatusIds::StatusCode code, const NYql::TIssues& issues) {
16731671
LOG_E(Ydb::StatusIds_StatusCode_Name(code) << ": " << issues.ToOneLineString());
1674-
TerminateComputeActors(code, issues);
16751672
ReplyErrorAndDie(code, issues);
16761673
}
16771674

@@ -1687,11 +1684,18 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
16871684
ReplyErrorAndDie(status, &issues);
16881685
}
16891686

1690-
void AbortExecutionAndDie(TActorId abortSender, NYql::NDqProto::StatusIds::StatusCode status, const TString& message) {
1687+
void TimeoutError(TActorId abortSender) {
16911688
if (AlreadyReplied) {
16921689
return;
16931690
}
16941691

1692+
const auto status = NYql::NDqProto::StatusIds::TIMEOUT;
1693+
const TString message = "Request timeout exceeded";
1694+
1695+
TerminateComputeActors(Ydb::StatusIds::TIMEOUT, message);
1696+
1697+
AlreadyReplied = true;
1698+
16951699
LOG_E("Abort execution: " << NYql::NDqProto::StatusIds_StatusCode_Name(status) << "," << message);
16961700
if (ExecuterSpan) {
16971701
ExecuterSpan.EndError(TStringBuilder() << NYql::NDqProto::StatusIds_StatusCode_Name(status));
@@ -1701,16 +1705,13 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17011705

17021706
// TEvAbortExecution can come from either ComputeActor or SessionActor (== Target).
17031707
if (abortSender != Target) {
1704-
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(status, "Request timeout exceeded");
1708+
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(status, message);
17051709
this->Send(Target, abortEv.Release());
17061710
}
17071711

1708-
AlreadyReplied = true;
17091712
LOG_E("Sending timeout response to: " << Target);
1710-
this->Send(Target, ResponseEv.release());
17111713

17121714
Request.Transactions.crop(0);
1713-
TerminateComputeActors(Ydb::StatusIds::TIMEOUT, message);
17141715
this->PassAway();
17151716
}
17161717

@@ -1749,14 +1750,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17491750
return;
17501751
}
17511752

1752-
if (Planner) {
1753-
for (auto computeActor : Planner->GetPendingComputeActors()) {
1754-
LOG_D("terminate compute actor " << computeActor.first);
1755-
1756-
auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDq::YdbStatusToDqStatus(status), "Terminate execution");
1757-
this->Send(computeActor.first, ev.Release());
1758-
}
1759-
}
1753+
TerminateComputeActors(status, "Terminate execution");
17601754

17611755
AlreadyReplied = true;
17621756
auto& response = *ResponseEv->Record.MutableResponse();
@@ -1782,7 +1776,6 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17821776
ExecuterStateSpan.EndError(response.DebugString());
17831777

17841778
Request.Transactions.crop(0);
1785-
this->Send(Target, ResponseEv.release());
17861779
this->PassAway();
17871780
}
17881781

@@ -1852,6 +1845,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
18521845

18531846
protected:
18541847
void PassAway() override {
1848+
YQL_ENSURE(ResponseEv);
1849+
this->Send(Target, ResponseEv.release());
1850+
18551851
for (auto channelPair: ResultChannelProxies) {
18561852
LOG_D("terminate result channel " << channelPair.first << " proxy at " << channelPair.second->SelfId());
18571853

@@ -1872,12 +1868,11 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
18721868

18731869
if (KqpTableResolverId) {
18741870
this->Send(KqpTableResolverId, new TEvents::TEvPoison);
1875-
this->Send(this->SelfId(), new TEvents::TEvPoison);
1876-
LOG_T("Terminate, become ZombieState");
1877-
this->Become(&TKqpExecuterBase::ZombieState);
1878-
} else {
1879-
IActor::PassAway();
18801871
}
1872+
1873+
this->Send(this->SelfId(), new TEvents::TEvPoison);
1874+
LOG_T("Terminate, become ZombieState");
1875+
this->Become(&TKqpExecuterBase::ZombieState);
18811876
}
18821877

18831878
STATEFN(ZombieState) {

ydb/core/kqp/executer_actor/kqp_planner.cpp

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,19 @@ void TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) {
554554
YQL_ENSURE(it != PendingComputeActors.end());
555555
LastStats.emplace_back(std::move(it->second));
556556
PendingComputeActors.erase(it);
557-
return;
557+
558+
LOG_I("Compute actor has finished execution: " << computeActor.ToString());
559+
}
560+
561+
void TKqpPlanner::TaskNotStarted(ui64 taskId) {
562+
// NOTE: should be invoked only while shutting down - when node is disconnected.
563+
564+
auto& task = TasksGraph.GetTask(taskId);
565+
566+
YQL_ENSURE(!task.ComputeActorId);
567+
YQL_ENSURE(!task.Meta.Completed);
568+
569+
PendingComputeTasks.erase(taskId);
558570
}
559571

560572
TProgressStat::TEntry TKqpPlanner::CalculateConsumptionUpdate() {

ydb/core/kqp/executer_actor/kqp_planner.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ class TKqpPlanner {
7474
std::unique_ptr<IEventHandle> AssignTasksToNodes();
7575
bool AcknowledgeCA(ui64 taskId, TActorId computeActor, const NYql::NDqProto::TEvComputeActorState* state);
7676
void CompletedCA(ui64 taskId, TActorId computeActor);
77+
void TaskNotStarted(ui64 taskId);
7778
TProgressStat::TEntry CalculateConsumptionUpdate();
7879
void ShiftConsumption();
7980
void Submit();

ydb/core/kqp/executer_actor/kqp_scan_executer.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,9 @@ class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Sc
273273
public:
274274

275275
void Finalize() {
276+
YQL_ENSURE(!AlreadyReplied);
277+
AlreadyReplied = true;
278+
276279
FillResponseStats(Ydb::StatusIds::SUCCESS);
277280

278281
LWTRACK(KqpScanExecuterFinalize, ResponseEv->Orbit, TxId, LastTaskId, LastComputeActorId, ResponseEv->ResultsSize());
@@ -281,8 +284,6 @@ class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Sc
281284
ExecuterSpan.EndOk();
282285
}
283286

284-
LOG_D("Sending response to: " << Target);
285-
Send(Target, ResponseEv.release());
286287
PassAway();
287288
}
288289

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,18 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
515515
RuntimeSettings.TerminateHandler(success, issues);
516516
}
517517

518+
// Send final state to executer to inform about termination.
519+
{
520+
auto ev = MakeHolder<TEvDqCompute::TEvState>();
521+
auto& record = ev->Record;
522+
523+
record.SetState(NDqProto::COMPUTE_STATE_FINISHED);
524+
record.SetStatusCode(NDqProto::StatusIds::ABORTED);
525+
record.SetTaskId(Task.GetId());
526+
527+
this->Send(ExecuterId, ev.Release());
528+
}
529+
518530
this->PassAway();
519531
Terminated = true;
520532
}

0 commit comments

Comments
 (0)