Skip to content

Commit fe50fe3

Browse files
authored
Wait for all CAs inside Executer before shutdown (ydb-platform#7829)
1 parent 1869eed commit fe50fe3

File tree

8 files changed

+257
-48
lines changed

8 files changed

+257
-48
lines changed

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ __pycache__/
2626
*.pb.h
2727
*.pb.cc
2828

29+
# Other generated
30+
*.fbs.h
31+
2932
# MacOS specific
3033
.DS_Store
3134

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

+76-2
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
205205
}
206206

207207
void Finalize() {
208+
YQL_ENSURE(!AlreadyReplied);
209+
208210
if (LocksBroken) {
209211
TString message = "Transaction locks invalidated.";
210212

@@ -278,8 +280,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
278280
ExecuterSpan.EndOk();
279281

280282
Request.Transactions.crop(0);
281-
LOG_D("Sending response to: " << Target << ", results: " << ResponseEv->ResultsSize());
282-
Send(Target, ResponseEv.release());
283+
AlreadyReplied = true;
283284
PassAway();
284285
}
285286

@@ -319,6 +320,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
319320
return "WaitSnapshotState";
320321
} else if (func == &TThis::WaitResolveState) {
321322
return "WaitResolveState";
323+
} else if (func == &TThis::WaitShutdownState) {
324+
return "WaitShutdownState";
322325
} else {
323326
return TBase::CurrentStateFuncName();
324327
}
@@ -2595,6 +2598,22 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
25952598
}
25962599
}
25972600

2601+
void Shutdown() override {
2602+
if (Planner) {
2603+
if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
2604+
LOG_I("Shutdown immediately - nothing to wait");
2605+
PassAway();
2606+
} else {
2607+
this->Become(&TThis::WaitShutdownState);
2608+
LOG_I("Waiting for shutdown of " << Planner->GetPendingComputeTasks().size() << " tasks and "
2609+
<< Planner->GetPendingComputeActors().size() << " compute actors");
2610+
TActivationContext::Schedule(TDuration::Seconds(10), new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPoison));
2611+
}
2612+
} else {
2613+
PassAway();
2614+
}
2615+
}
2616+
25982617
void PassAway() override {
25992618
auto totalTime = TInstant::Now() - StartTime;
26002619
Counters->Counters->DataTxTotalTimeHistogram->Collect(totalTime.MilliSeconds());
@@ -2612,6 +2631,61 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
26122631
TBase::PassAway();
26132632
}
26142633

2634+
STATEFN(WaitShutdownState) {
2635+
switch(ev->GetTypeRewrite()) {
2636+
hFunc(TEvDqCompute::TEvState, HandleShutdown);
2637+
hFunc(TEvInterconnect::TEvNodeDisconnected, HandleShutdown);
2638+
hFunc(TEvents::TEvPoison, HandleShutdown);
2639+
default:
2640+
LOG_E("Unexpected event: " << ev->GetTypeName()); // ignore all other events
2641+
}
2642+
}
2643+
2644+
void HandleShutdown(TEvDqCompute::TEvState::TPtr& ev) {
2645+
if (ev->Get()->Record.GetState() == NDqProto::COMPUTE_STATE_FAILURE) {
2646+
YQL_ENSURE(Planner);
2647+
2648+
TActorId actor = ev->Sender;
2649+
ui64 taskId = ev->Get()->Record.GetTaskId();
2650+
2651+
Planner->CompletedCA(taskId, actor);
2652+
2653+
if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
2654+
PassAway();
2655+
}
2656+
}
2657+
}
2658+
2659+
void HandleShutdown(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
2660+
const auto nodeId = ev->Get()->NodeId;
2661+
LOG_N("Node has disconnected while shutdown: " << nodeId);
2662+
2663+
YQL_ENSURE(Planner);
2664+
2665+
for (const auto& task : TasksGraph.GetTasks()) {
2666+
if (task.Meta.NodeId == nodeId && !task.Meta.Completed) {
2667+
if (task.ComputeActorId) {
2668+
Planner->CompletedCA(task.Id, task.ComputeActorId);
2669+
} else {
2670+
Planner->TaskNotStarted(task.Id);
2671+
}
2672+
}
2673+
}
2674+
2675+
if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
2676+
PassAway();
2677+
}
2678+
}
2679+
2680+
void HandleShutdown(TEvents::TEvPoison::TPtr& ev) {
2681+
// Self-poison means timeout - don't wait anymore.
2682+
LOG_I("Timed out on waiting for Compute Actors to finish - forcing shutdown");
2683+
2684+
if (ev->Sender == SelfId()) {
2685+
PassAway();
2686+
}
2687+
}
2688+
26152689
private:
26162690
void ReplyTxStateUnknown(ui64 shardId) {
26172691
auto message = TStringBuilder() << "Tx state unknown for shard " << shardId << ", txid " << TxId;

ydb/core/kqp/executer_actor/kqp_executer_impl.h

+30-27
Original file line numberDiff line numberDiff line change
@@ -667,7 +667,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
667667
if (statusCode == Ydb::StatusIds::INTERNAL_ERROR) {
668668
InternalError(issues);
669669
} else if (statusCode == Ydb::StatusIds::TIMEOUT) {
670-
AbortExecutionAndDie(ev->Sender, NYql::NDqProto::StatusIds::TIMEOUT, "Request timeout exceeded");
670+
TimeoutError(ev->Sender);
671671
} else {
672672
RuntimeError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), issues);
673673
}
@@ -1624,14 +1624,14 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
16241624
protected:
16251625
void TerminateComputeActors(Ydb::StatusIds::StatusCode code, const NYql::TIssues& issues) {
16261626
for (const auto& task : this->TasksGraph.GetTasks()) {
1627-
if (task.ComputeActorId) {
1627+
if (task.ComputeActorId && !task.Meta.Completed) {
16281628
LOG_I("aborting compute actor execution, message: " << issues.ToOneLineString()
16291629
<< ", compute actor: " << task.ComputeActorId << ", task: " << task.Id);
16301630

16311631
auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDq::YdbStatusToDqStatus(code), issues);
16321632
this->Send(task.ComputeActorId, ev.Release());
16331633
} else {
1634-
LOG_I("task: " << task.Id << ", does not have Compute ActorId yet");
1634+
LOG_I("task: " << task.Id << ", does not have the CA id yet or is already complete");
16351635
}
16361636
}
16371637
}
@@ -1649,7 +1649,6 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
16491649

16501650
void InternalError(const NYql::TIssues& issues) {
16511651
LOG_E(issues.ToOneLineString());
1652-
TerminateComputeActors(Ydb::StatusIds::INTERNAL_ERROR, issues);
16531652
auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::UNEXPECTED, "Internal error while executing transaction.");
16541653
for (const NYql::TIssue& i : issues) {
16551654
issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(i));
@@ -1663,15 +1662,13 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
16631662

16641663
void ReplyUnavailable(const TString& message) {
16651664
LOG_E("UNAVAILABLE: " << message);
1666-
TerminateComputeActors(Ydb::StatusIds::UNAVAILABLE, message);
16671665
auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE);
16681666
issue.AddSubIssue(new NYql::TIssue(message));
16691667
ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE, issue);
16701668
}
16711669

16721670
void RuntimeError(Ydb::StatusIds::StatusCode code, const NYql::TIssues& issues) {
16731671
LOG_E(Ydb::StatusIds_StatusCode_Name(code) << ": " << issues.ToOneLineString());
1674-
TerminateComputeActors(code, issues);
16751672
ReplyErrorAndDie(code, issues);
16761673
}
16771674

@@ -1687,11 +1684,19 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
16871684
ReplyErrorAndDie(status, &issues);
16881685
}
16891686

1690-
void AbortExecutionAndDie(TActorId abortSender, NYql::NDqProto::StatusIds::StatusCode status, const TString& message) {
1687+
void TimeoutError(TActorId abortSender) {
16911688
if (AlreadyReplied) {
1689+
LOG_E("Timeout when we already replied - not good" << Endl << TBackTrace().PrintToString() << Endl);
16921690
return;
16931691
}
16941692

1693+
const auto status = NYql::NDqProto::StatusIds::TIMEOUT;
1694+
const TString message = "Request timeout exceeded";
1695+
1696+
TerminateComputeActors(Ydb::StatusIds::TIMEOUT, message);
1697+
1698+
AlreadyReplied = true;
1699+
16951700
LOG_E("Abort execution: " << NYql::NDqProto::StatusIds_StatusCode_Name(status) << "," << message);
16961701
if (ExecuterSpan) {
16971702
ExecuterSpan.EndError(TStringBuilder() << NYql::NDqProto::StatusIds_StatusCode_Name(status));
@@ -1701,17 +1706,14 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17011706

17021707
// TEvAbortExecution can come from either ComputeActor or SessionActor (== Target).
17031708
if (abortSender != Target) {
1704-
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(status, "Request timeout exceeded");
1709+
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(status, message);
17051710
this->Send(Target, abortEv.Release());
17061711
}
17071712

1708-
AlreadyReplied = true;
17091713
LOG_E("Sending timeout response to: " << Target);
1710-
this->Send(Target, ResponseEv.release());
17111714

17121715
Request.Transactions.crop(0);
1713-
TerminateComputeActors(Ydb::StatusIds::TIMEOUT, message);
1714-
this->PassAway();
1716+
this->Shutdown();
17151717
}
17161718

17171719
void FillResponseStats(Ydb::StatusIds::StatusCode status) {
@@ -1746,17 +1748,11 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17461748
google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>* issues)
17471749
{
17481750
if (AlreadyReplied) {
1751+
LOG_E("Error when we already replied - not good" << Endl << TBackTrace().PrintToString() << Endl);
17491752
return;
17501753
}
17511754

1752-
if (Planner) {
1753-
for (auto computeActor : Planner->GetPendingComputeActors()) {
1754-
LOG_D("terminate compute actor " << computeActor.first);
1755-
1756-
auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDq::YdbStatusToDqStatus(status), "Terminate execution");
1757-
this->Send(computeActor.first, ev.Release());
1758-
}
1759-
}
1755+
TerminateComputeActors(status, "Terminate execution");
17601756

17611757
AlreadyReplied = true;
17621758
auto& response = *ResponseEv->Record.MutableResponse();
@@ -1782,8 +1778,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17821778
ExecuterStateSpan.EndError(response.DebugString());
17831779

17841780
Request.Transactions.crop(0);
1785-
this->Send(Target, ResponseEv.release());
1786-
this->PassAway();
1781+
this->Shutdown();
17871782
}
17881783

17891784
protected:
@@ -1851,7 +1846,16 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
18511846
}
18521847

18531848
protected:
1849+
// Introduced separate method from `PassAway()` - to not get confused with expectations from other actors,
1850+
// that `PassAway()` should kill actor immediately.
1851+
virtual void Shutdown() {
1852+
PassAway();
1853+
}
1854+
18541855
void PassAway() override {
1856+
YQL_ENSURE(AlreadyReplied && ResponseEv);
1857+
this->Send(Target, ResponseEv.release());
1858+
18551859
for (auto channelPair: ResultChannelProxies) {
18561860
LOG_D("terminate result channel " << channelPair.first << " proxy at " << channelPair.second->SelfId());
18571861

@@ -1872,12 +1876,11 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
18721876

18731877
if (KqpTableResolverId) {
18741878
this->Send(KqpTableResolverId, new TEvents::TEvPoison);
1875-
this->Send(this->SelfId(), new TEvents::TEvPoison);
1876-
LOG_T("Terminate, become ZombieState");
1877-
this->Become(&TKqpExecuterBase::ZombieState);
1878-
} else {
1879-
IActor::PassAway();
18801879
}
1880+
1881+
this->Send(this->SelfId(), new TEvents::TEvPoison);
1882+
LOG_T("Terminate, become ZombieState");
1883+
this->Become(&TKqpExecuterBase::ZombieState);
18811884
}
18821885

18831886
STATEFN(ZombieState) {

ydb/core/kqp/executer_actor/kqp_planner.cpp

+13-1
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,19 @@ void TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) {
554554
YQL_ENSURE(it != PendingComputeActors.end());
555555
LastStats.emplace_back(std::move(it->second));
556556
PendingComputeActors.erase(it);
557-
return;
557+
558+
LOG_I("Compute actor has finished execution: " << computeActor.ToString());
559+
}
560+
561+
void TKqpPlanner::TaskNotStarted(ui64 taskId) {
562+
// NOTE: should be invoked only while shutting down - when node is disconnected.
563+
564+
auto& task = TasksGraph.GetTask(taskId);
565+
566+
YQL_ENSURE(!task.ComputeActorId);
567+
YQL_ENSURE(!task.Meta.Completed);
568+
569+
PendingComputeTasks.erase(taskId);
558570
}
559571

560572
TProgressStat::TEntry TKqpPlanner::CalculateConsumptionUpdate() {

ydb/core/kqp/executer_actor/kqp_planner.h

+1
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ class TKqpPlanner {
7474
std::unique_ptr<IEventHandle> AssignTasksToNodes();
7575
bool AcknowledgeCA(ui64 taskId, TActorId computeActor, const NYql::NDqProto::TEvComputeActorState* state);
7676
void CompletedCA(ui64 taskId, TActorId computeActor);
77+
void TaskNotStarted(ui64 taskId);
7778
TProgressStat::TEntry CalculateConsumptionUpdate();
7879
void ShiftConsumption();
7980
void Submit();

ydb/core/kqp/executer_actor/kqp_scan_executer.cpp

+3-2
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,9 @@ class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Sc
273273
public:
274274

275275
void Finalize() {
276+
YQL_ENSURE(!AlreadyReplied);
277+
AlreadyReplied = true;
278+
276279
FillResponseStats(Ydb::StatusIds::SUCCESS);
277280

278281
LWTRACK(KqpScanExecuterFinalize, ResponseEv->Orbit, TxId, LastTaskId, LastComputeActorId, ResponseEv->ResultsSize());
@@ -281,8 +284,6 @@ class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Sc
281284
ExecuterSpan.EndOk();
282285
}
283286

284-
LOG_D("Sending response to: " << Target);
285-
Send(Target, ResponseEv.release());
286287
PassAway();
287288
}
288289

0 commit comments

Comments
 (0)