Skip to content

Commit 77f9adc

Browse files
authored
Merge 89671e1 into 96075e8
2 parents 96075e8 + 89671e1 commit 77f9adc

File tree

6 files changed

+130
-30
lines changed

6 files changed

+130
-30
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
205205
}
206206

207207
void Finalize() {
208+
YQL_ENSURE(!AlreadyReplied);
209+
208210
if (LocksBroken) {
209211
TString message = "Transaction locks invalidated.";
210212

@@ -278,8 +280,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
278280
ExecuterSpan.EndOk();
279281

280282
Request.Transactions.crop(0);
281-
LOG_D("Sending response to: " << Target << ", results: " << ResponseEv->ResultsSize());
282-
Send(Target, ResponseEv.release());
283+
AlreadyReplied = true;
283284
PassAway();
284285
}
285286

@@ -319,6 +320,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
319320
return "WaitSnapshotState";
320321
} else if (func == &TThis::WaitResolveState) {
321322
return "WaitResolveState";
323+
} else if (func == &TThis::WaitShutdownState) {
324+
return "WaitShutdownState";
322325
} else {
323326
return TBase::CurrentStateFuncName();
324327
}
@@ -2596,6 +2599,80 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
25962599
}
25972600

25982601
void PassAway() override {
2602+
if (Planner) {
2603+
if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
2604+
DoShutdown();
2605+
} else {
2606+
this->Become(&TThis::WaitShutdownState);
2607+
LOG_I("Waiting for shutdown of " << Planner->GetPendingComputeTasks().size() << " tasks and "
2608+
<< Planner->GetPendingComputeActors().size() << " compute actors");
2609+
TActivationContext::Schedule(TDuration::Seconds(10), new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPoison));
2610+
}
2611+
} else {
2612+
DoShutdown();
2613+
}
2614+
}
2615+
2616+
STATEFN(WaitShutdownState) {
2617+
switch(ev->GetTypeRewrite()) {
2618+
hFunc(TEvDqCompute::TEvState, HandleShutdown);
2619+
hFunc(TEvInterconnect::TEvNodeDisconnected, HandleShutdown);
2620+
hFunc(TEvents::TEvPoison, HandleShutdown);
2621+
default:
2622+
LOG_E("Unexpected event: " << ev->GetTypeName()); // ignore all other events
2623+
}
2624+
}
2625+
2626+
void HandleShutdown(TEvDqCompute::TEvState::TPtr& ev) {
2627+
if (ev->Get()->Record.GetState() == NDqProto::COMPUTE_STATE_FINISHED) {
2628+
YQL_ENSURE(Planner);
2629+
2630+
if (ev->Get()->Record.GetStatusCode() != NDqProto::StatusIds::ABORTED) {
2631+
return;
2632+
}
2633+
2634+
TActorId actor = ev->Sender;
2635+
ui64 taskId = ev->Get()->Record.GetTaskId();
2636+
2637+
Planner->CompletedCA(taskId, actor);
2638+
2639+
if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
2640+
DoShutdown();
2641+
}
2642+
} else {
2643+
// TODO: handle other states.
2644+
}
2645+
}
2646+
2647+
void HandleShutdown(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
2648+
const auto nodeId = ev->Get()->NodeId;
2649+
LOG_N("Node has disconnected while shutdown: " << nodeId);
2650+
2651+
YQL_ENSURE(Planner);
2652+
2653+
for (const auto& task : TasksGraph.GetTasks()) {
2654+
if (task.Meta.NodeId == nodeId && !task.Meta.Completed) {
2655+
if (task.ComputeActorId) {
2656+
Planner->CompletedCA(task.Id, task.ComputeActorId);
2657+
} else {
2658+
Planner->TaskNotStarted(task.Id);
2659+
}
2660+
}
2661+
}
2662+
2663+
if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
2664+
DoShutdown();
2665+
}
2666+
}
2667+
2668+
void HandleShutdown(TEvents::TEvPoison::TPtr& ev) {
2669+
// Self-poison means timeout - don't wait anymore.
2670+
if (ev->Sender == SelfId()) {
2671+
DoShutdown();
2672+
}
2673+
}
2674+
2675+
void DoShutdown() {
25992676
auto totalTime = TInstant::Now() - StartTime;
26002677
Counters->Counters->DataTxTotalTimeHistogram->Collect(totalTime.MilliSeconds());
26012678

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 22 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -667,7 +667,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
667667
if (statusCode == Ydb::StatusIds::INTERNAL_ERROR) {
668668
InternalError(issues);
669669
} else if (statusCode == Ydb::StatusIds::TIMEOUT) {
670-
AbortExecutionAndDie(ev->Sender, NYql::NDqProto::StatusIds::TIMEOUT, "Request timeout exceeded");
670+
TimeoutError(ev->Sender);
671671
} else {
672672
RuntimeError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), issues);
673673
}
@@ -1624,14 +1624,14 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
16241624
protected:
16251625
void TerminateComputeActors(Ydb::StatusIds::StatusCode code, const NYql::TIssues& issues) {
16261626
for (const auto& task : this->TasksGraph.GetTasks()) {
1627-
if (task.ComputeActorId) {
1627+
if (task.ComputeActorId && !task.Meta.Completed) {
16281628
LOG_I("aborting compute actor execution, message: " << issues.ToOneLineString()
16291629
<< ", compute actor: " << task.ComputeActorId << ", task: " << task.Id);
16301630

16311631
auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDq::YdbStatusToDqStatus(code), issues);
16321632
this->Send(task.ComputeActorId, ev.Release());
16331633
} else {
1634-
LOG_I("task: " << task.Id << ", does not have Compute ActorId yet");
1634+
LOG_I("task: " << task.Id << ", does not have the CA id yet or is already complete");
16351635
}
16361636
}
16371637
}
@@ -1649,7 +1649,6 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
16491649

16501650
void InternalError(const NYql::TIssues& issues) {
16511651
LOG_E(issues.ToOneLineString());
1652-
TerminateComputeActors(Ydb::StatusIds::INTERNAL_ERROR, issues);
16531652
auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::UNEXPECTED, "Internal error while executing transaction.");
16541653
for (const NYql::TIssue& i : issues) {
16551654
issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(i));
@@ -1663,15 +1662,13 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
16631662

16641663
void ReplyUnavailable(const TString& message) {
16651664
LOG_E("UNAVAILABLE: " << message);
1666-
TerminateComputeActors(Ydb::StatusIds::UNAVAILABLE, message);
16671665
auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE);
16681666
issue.AddSubIssue(new NYql::TIssue(message));
16691667
ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE, issue);
16701668
}
16711669

16721670
void RuntimeError(Ydb::StatusIds::StatusCode code, const NYql::TIssues& issues) {
16731671
LOG_E(Ydb::StatusIds_StatusCode_Name(code) << ": " << issues.ToOneLineString());
1674-
TerminateComputeActors(code, issues);
16751672
ReplyErrorAndDie(code, issues);
16761673
}
16771674

@@ -1687,11 +1684,19 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
16871684
ReplyErrorAndDie(status, &issues);
16881685
}
16891686

1690-
void AbortExecutionAndDie(TActorId abortSender, NYql::NDqProto::StatusIds::StatusCode status, const TString& message) {
1687+
void TimeoutError(TActorId abortSender) {
16911688
if (AlreadyReplied) {
1689+
LOG_E("Timeout when we already replied - not good" << Endl << TBackTrace().PrintToString() << Endl);
16921690
return;
16931691
}
16941692

1693+
const auto status = NYql::NDqProto::StatusIds::TIMEOUT;
1694+
const TString message = "Request timeout exceeded";
1695+
1696+
TerminateComputeActors(Ydb::StatusIds::TIMEOUT, message);
1697+
1698+
AlreadyReplied = true;
1699+
16951700
LOG_E("Abort execution: " << NYql::NDqProto::StatusIds_StatusCode_Name(status) << "," << message);
16961701
if (ExecuterSpan) {
16971702
ExecuterSpan.EndError(TStringBuilder() << NYql::NDqProto::StatusIds_StatusCode_Name(status));
@@ -1701,16 +1706,13 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17011706

17021707
// TEvAbortExecution can come from either ComputeActor or SessionActor (== Target).
17031708
if (abortSender != Target) {
1704-
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(status, "Request timeout exceeded");
1709+
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(status, message);
17051710
this->Send(Target, abortEv.Release());
17061711
}
17071712

1708-
AlreadyReplied = true;
17091713
LOG_E("Sending timeout response to: " << Target);
1710-
this->Send(Target, ResponseEv.release());
17111714

17121715
Request.Transactions.crop(0);
1713-
TerminateComputeActors(Ydb::StatusIds::TIMEOUT, message);
17141716
this->PassAway();
17151717
}
17161718

@@ -1746,17 +1748,11 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17461748
google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>* issues)
17471749
{
17481750
if (AlreadyReplied) {
1751+
LOG_E("Error when we already replied - not good" << Endl << TBackTrace().PrintToString() << Endl);
17491752
return;
17501753
}
17511754

1752-
if (Planner) {
1753-
for (auto computeActor : Planner->GetPendingComputeActors()) {
1754-
LOG_D("terminate compute actor " << computeActor.first);
1755-
1756-
auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDq::YdbStatusToDqStatus(status), "Terminate execution");
1757-
this->Send(computeActor.first, ev.Release());
1758-
}
1759-
}
1755+
TerminateComputeActors(status, "Terminate execution");
17601756

17611757
AlreadyReplied = true;
17621758
auto& response = *ResponseEv->Record.MutableResponse();
@@ -1782,7 +1778,6 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17821778
ExecuterStateSpan.EndError(response.DebugString());
17831779

17841780
Request.Transactions.crop(0);
1785-
this->Send(Target, ResponseEv.release());
17861781
this->PassAway();
17871782
}
17881783

@@ -1852,6 +1847,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
18521847

18531848
protected:
18541849
void PassAway() override {
1850+
YQL_ENSURE(AlreadyReplied && ResponseEv);
1851+
this->Send(Target, ResponseEv.release());
1852+
18551853
for (auto channelPair: ResultChannelProxies) {
18561854
LOG_D("terminate result channel " << channelPair.first << " proxy at " << channelPair.second->SelfId());
18571855

@@ -1872,12 +1870,11 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
18721870

18731871
if (KqpTableResolverId) {
18741872
this->Send(KqpTableResolverId, new TEvents::TEvPoison);
1875-
this->Send(this->SelfId(), new TEvents::TEvPoison);
1876-
LOG_T("Terminate, become ZombieState");
1877-
this->Become(&TKqpExecuterBase::ZombieState);
1878-
} else {
1879-
IActor::PassAway();
18801873
}
1874+
1875+
this->Send(this->SelfId(), new TEvents::TEvPoison);
1876+
LOG_T("Terminate, become ZombieState");
1877+
this->Become(&TKqpExecuterBase::ZombieState);
18811878
}
18821879

18831880
STATEFN(ZombieState) {

ydb/core/kqp/executer_actor/kqp_planner.cpp

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,19 @@ void TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) {
554554
YQL_ENSURE(it != PendingComputeActors.end());
555555
LastStats.emplace_back(std::move(it->second));
556556
PendingComputeActors.erase(it);
557-
return;
557+
558+
LOG_I("Compute actor has finished execution: " << computeActor.ToString());
559+
}
560+
561+
void TKqpPlanner::TaskNotStarted(ui64 taskId) {
562+
// NOTE: should be invoked only while shutting down - when node is disconnected.
563+
564+
auto& task = TasksGraph.GetTask(taskId);
565+
566+
YQL_ENSURE(!task.ComputeActorId);
567+
YQL_ENSURE(!task.Meta.Completed);
568+
569+
PendingComputeTasks.erase(taskId);
558570
}
559571

560572
TProgressStat::TEntry TKqpPlanner::CalculateConsumptionUpdate() {

ydb/core/kqp/executer_actor/kqp_planner.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ class TKqpPlanner {
7474
std::unique_ptr<IEventHandle> AssignTasksToNodes();
7575
bool AcknowledgeCA(ui64 taskId, TActorId computeActor, const NYql::NDqProto::TEvComputeActorState* state);
7676
void CompletedCA(ui64 taskId, TActorId computeActor);
77+
void TaskNotStarted(ui64 taskId);
7778
TProgressStat::TEntry CalculateConsumptionUpdate();
7879
void ShiftConsumption();
7980
void Submit();

ydb/core/kqp/executer_actor/kqp_scan_executer.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,9 @@ class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Sc
273273
public:
274274

275275
void Finalize() {
276+
YQL_ENSURE(!AlreadyReplied);
277+
AlreadyReplied = true;
278+
276279
FillResponseStats(Ydb::StatusIds::SUCCESS);
277280

278281
LWTRACK(KqpScanExecuterFinalize, ResponseEv->Orbit, TxId, LastTaskId, LastComputeActorId, ResponseEv->ResultsSize());
@@ -281,8 +284,6 @@ class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Sc
281284
ExecuterSpan.EndOk();
282285
}
283286

284-
LOG_D("Sending response to: " << Target);
285-
Send(Target, ResponseEv.release());
286287
PassAway();
287288
}
288289

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,18 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
515515
RuntimeSettings.TerminateHandler(success, issues);
516516
}
517517

518+
// Send final state to executer to inform about termination.
519+
if (!success) {
520+
auto ev = MakeHolder<TEvDqCompute::TEvState>();
521+
auto& record = ev->Record;
522+
523+
record.SetState(NDqProto::COMPUTE_STATE_FINISHED);
524+
record.SetStatusCode(NDqProto::StatusIds::ABORTED);
525+
record.SetTaskId(Task.GetId());
526+
527+
this->Send(ExecuterId, ev.Release());
528+
}
529+
518530
this->PassAway();
519531
Terminated = true;
520532
}

0 commit comments

Comments
 (0)