Skip to content

Commit 00fe6e5

Browse files
committed
Revert "Wait for all CAs inside Executer before shutdown (ydb-platform#7829)"
This reverts commit fe50fe3.
1 parent 88a9d6c commit 00fe6e5

File tree

8 files changed

+48
-257
lines changed

8 files changed

+48
-257
lines changed

.gitignore

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,6 @@ __pycache__/
2828
*.pb.h
2929
*.pb.cc
3030

31-
# Other generated
32-
*.fbs.h
33-
3431
# MacOS specific
3532
.DS_Store
3633

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 2 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -198,8 +198,6 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
198198
}
199199

200200
void Finalize() {
201-
YQL_ENSURE(!AlreadyReplied);
202-
203201
if (LocksBroken) {
204202
YQL_ENSURE(ResponseEv->BrokenLockShardId);
205203
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {});
@@ -281,7 +279,8 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
281279
ExecuterSpan.EndOk();
282280

283281
Request.Transactions.crop(0);
284-
AlreadyReplied = true;
282+
LOG_D("Sending response to: " << Target << ", results: " << ResponseEv->ResultsSize());
283+
Send(Target, ResponseEv.release());
285284
PassAway();
286285
}
287286

@@ -321,8 +320,6 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
321320
return "WaitSnapshotState";
322321
} else if (func == &TThis::WaitResolveState) {
323322
return "WaitResolveState";
324-
} else if (func == &TThis::WaitShutdownState) {
325-
return "WaitShutdownState";
326323
} else {
327324
return TBase::CurrentStateFuncName();
328325
}
@@ -2636,22 +2633,6 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
26362633
}
26372634
}
26382635

2639-
void Shutdown() override {
2640-
if (Planner) {
2641-
if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
2642-
LOG_I("Shutdown immediately - nothing to wait");
2643-
PassAway();
2644-
} else {
2645-
this->Become(&TThis::WaitShutdownState);
2646-
LOG_I("Waiting for shutdown of " << Planner->GetPendingComputeTasks().size() << " tasks and "
2647-
<< Planner->GetPendingComputeActors().size() << " compute actors");
2648-
TActivationContext::Schedule(TDuration::Seconds(10), new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPoison));
2649-
}
2650-
} else {
2651-
PassAway();
2652-
}
2653-
}
2654-
26552636
void PassAway() override {
26562637
auto totalTime = TInstant::Now() - StartTime;
26572638
Counters->Counters->DataTxTotalTimeHistogram->Collect(totalTime.MilliSeconds());
@@ -2669,61 +2650,6 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
26692650
TBase::PassAway();
26702651
}
26712652

2672-
STATEFN(WaitShutdownState) {
2673-
switch (ev->GetTypeRewrite()) {
2674-
hFunc(TEvDqCompute::TEvState, HandleShutdown);
2675-
hFunc(TEvInterconnect::TEvNodeDisconnected, HandleShutdown);
2676-
hFunc(TEvents::TEvPoison, HandleShutdown);
2677-
default:
2678-
LOG_E("Unexpected event: " << ev->GetTypeName()); // ignore all other events
2679-
}
2680-
}
2681-
2682-
void HandleShutdown(TEvDqCompute::TEvState::TPtr& ev) {
2683-
if (ev->Get()->Record.GetState() == NDqProto::COMPUTE_STATE_FAILURE) {
2684-
YQL_ENSURE(Planner);
2685-
2686-
TActorId actor = ev->Sender;
2687-
ui64 taskId = ev->Get()->Record.GetTaskId();
2688-
2689-
Planner->CompletedCA(taskId, actor);
2690-
2691-
if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
2692-
PassAway();
2693-
}
2694-
}
2695-
}
2696-
2697-
void HandleShutdown(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
2698-
const auto nodeId = ev->Get()->NodeId;
2699-
LOG_N("Node has disconnected while shutdown: " << nodeId);
2700-
2701-
YQL_ENSURE(Planner);
2702-
2703-
for (const auto& task : TasksGraph.GetTasks()) {
2704-
if (task.Meta.NodeId == nodeId && !task.Meta.Completed) {
2705-
if (task.ComputeActorId) {
2706-
Planner->CompletedCA(task.Id, task.ComputeActorId);
2707-
} else {
2708-
Planner->TaskNotStarted(task.Id);
2709-
}
2710-
}
2711-
}
2712-
2713-
if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
2714-
PassAway();
2715-
}
2716-
}
2717-
2718-
void HandleShutdown(TEvents::TEvPoison::TPtr& ev) {
2719-
// Self-poison means timeout - don't wait anymore.
2720-
LOG_I("Timed out on waiting for Compute Actors to finish - forcing shutdown");
2721-
2722-
if (ev->Sender == SelfId()) {
2723-
PassAway();
2724-
}
2725-
}
2726-
27272653
private:
27282654
void ReplyTxStateUnknown(ui64 shardId) {
27292655
auto message = TStringBuilder() << "Tx state unknown for shard " << shardId << ", txid " << TxId;

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 27 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -668,7 +668,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
668668
if (statusCode == Ydb::StatusIds::INTERNAL_ERROR) {
669669
InternalError(issues);
670670
} else if (statusCode == Ydb::StatusIds::TIMEOUT) {
671-
TimeoutError(ev->Sender);
671+
AbortExecutionAndDie(ev->Sender, NYql::NDqProto::StatusIds::TIMEOUT, "Request timeout exceeded");
672672
} else {
673673
RuntimeError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), issues);
674674
}
@@ -1601,14 +1601,14 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
16011601
protected:
16021602
void TerminateComputeActors(Ydb::StatusIds::StatusCode code, const NYql::TIssues& issues) {
16031603
for (const auto& task : this->TasksGraph.GetTasks()) {
1604-
if (task.ComputeActorId && !task.Meta.Completed) {
1604+
if (task.ComputeActorId) {
16051605
LOG_I("aborting compute actor execution, message: " << issues.ToOneLineString()
16061606
<< ", compute actor: " << task.ComputeActorId << ", task: " << task.Id);
16071607

16081608
auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDq::YdbStatusToDqStatus(code), issues);
16091609
this->Send(task.ComputeActorId, ev.Release());
16101610
} else {
1611-
LOG_I("task: " << task.Id << ", does not have the CA id yet or is already complete");
1611+
LOG_I("task: " << task.Id << ", does not have Compute ActorId yet");
16121612
}
16131613
}
16141614
}
@@ -1626,6 +1626,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
16261626

16271627
void InternalError(const NYql::TIssues& issues) {
16281628
LOG_E(issues.ToOneLineString());
1629+
TerminateComputeActors(Ydb::StatusIds::INTERNAL_ERROR, issues);
16291630
auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::UNEXPECTED, "Internal error while executing transaction.");
16301631
for (const NYql::TIssue& i : issues) {
16311632
issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(i));
@@ -1639,13 +1640,15 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
16391640

16401641
void ReplyUnavailable(const TString& message) {
16411642
LOG_E("UNAVAILABLE: " << message);
1643+
TerminateComputeActors(Ydb::StatusIds::UNAVAILABLE, message);
16421644
auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE);
16431645
issue.AddSubIssue(new NYql::TIssue(message));
16441646
ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE, issue);
16451647
}
16461648

16471649
void RuntimeError(Ydb::StatusIds::StatusCode code, const NYql::TIssues& issues) {
16481650
LOG_E(Ydb::StatusIds_StatusCode_Name(code) << ": " << issues.ToOneLineString());
1651+
TerminateComputeActors(code, issues);
16491652
ReplyErrorAndDie(code, issues);
16501653
}
16511654

@@ -1661,19 +1664,11 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
16611664
ReplyErrorAndDie(status, &issues);
16621665
}
16631666

1664-
void TimeoutError(TActorId abortSender) {
1667+
void AbortExecutionAndDie(TActorId abortSender, NYql::NDqProto::StatusIds::StatusCode status, const TString& message) {
16651668
if (AlreadyReplied) {
1666-
LOG_E("Timeout when we already replied - not good" << Endl << TBackTrace().PrintToString() << Endl);
16671669
return;
16681670
}
16691671

1670-
const auto status = NYql::NDqProto::StatusIds::TIMEOUT;
1671-
const TString message = "Request timeout exceeded";
1672-
1673-
TerminateComputeActors(Ydb::StatusIds::TIMEOUT, message);
1674-
1675-
AlreadyReplied = true;
1676-
16771672
LOG_E("Abort execution: " << NYql::NDqProto::StatusIds_StatusCode_Name(status) << "," << message);
16781673
if (ExecuterSpan) {
16791674
ExecuterSpan.EndError(TStringBuilder() << NYql::NDqProto::StatusIds_StatusCode_Name(status));
@@ -1683,14 +1678,17 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
16831678

16841679
// TEvAbortExecution can come from either ComputeActor or SessionActor (== Target).
16851680
if (abortSender != Target) {
1686-
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(status, message);
1681+
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(status, "Request timeout exceeded");
16871682
this->Send(Target, abortEv.Release());
16881683
}
16891684

1685+
AlreadyReplied = true;
16901686
LOG_E("Sending timeout response to: " << Target);
1687+
this->Send(Target, ResponseEv.release());
16911688

16921689
Request.Transactions.crop(0);
1693-
this->Shutdown();
1690+
TerminateComputeActors(Ydb::StatusIds::TIMEOUT, message);
1691+
this->PassAway();
16941692
}
16951693

16961694
void FillResponseStats(Ydb::StatusIds::StatusCode status) {
@@ -1725,11 +1723,17 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17251723
google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>* issues)
17261724
{
17271725
if (AlreadyReplied) {
1728-
LOG_E("Error when we already replied - not good" << Endl << TBackTrace().PrintToString() << Endl);
17291726
return;
17301727
}
17311728

1732-
TerminateComputeActors(status, "Terminate execution");
1729+
if (Planner) {
1730+
for (auto computeActor : Planner->GetPendingComputeActors()) {
1731+
LOG_D("terminate compute actor " << computeActor.first);
1732+
1733+
auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDq::YdbStatusToDqStatus(status), "Terminate execution");
1734+
this->Send(computeActor.first, ev.Release());
1735+
}
1736+
}
17331737

17341738
AlreadyReplied = true;
17351739
auto& response = *ResponseEv->Record.MutableResponse();
@@ -1756,7 +1760,8 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17561760
ExecuterStateSpan.EndError(response.DebugString());
17571761

17581762
Request.Transactions.crop(0);
1759-
this->Shutdown();
1763+
this->Send(Target, ResponseEv.release());
1764+
this->PassAway();
17601765
}
17611766

17621767
protected:
@@ -1824,16 +1829,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
18241829
}
18251830

18261831
protected:
1827-
// Introduced separate method from `PassAway()` - to not get confused with expectations from other actors,
1828-
// that `PassAway()` should kill actor immediately.
1829-
virtual void Shutdown() {
1830-
PassAway();
1831-
}
1832-
18331832
void PassAway() override {
1834-
YQL_ENSURE(AlreadyReplied && ResponseEv);
1835-
this->Send(Target, ResponseEv.release());
1836-
18371833
for (auto channelPair: ResultChannelProxies) {
18381834
LOG_D("terminate result channel " << channelPair.first << " proxy at " << channelPair.second->SelfId());
18391835

@@ -1854,11 +1850,12 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
18541850

18551851
if (KqpTableResolverId) {
18561852
this->Send(KqpTableResolverId, new TEvents::TEvPoison);
1853+
this->Send(this->SelfId(), new TEvents::TEvPoison);
1854+
LOG_T("Terminate, become ZombieState");
1855+
this->Become(&TKqpExecuterBase::ZombieState);
1856+
} else {
1857+
IActor::PassAway();
18571858
}
1858-
1859-
this->Send(this->SelfId(), new TEvents::TEvPoison);
1860-
LOG_T("Terminate, become ZombieState");
1861-
this->Become(&TKqpExecuterBase::ZombieState);
18621859
}
18631860

18641861
STATEFN(ZombieState) {

ydb/core/kqp/executer_actor/kqp_planner.cpp

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -638,19 +638,7 @@ void TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) {
638638
YQL_ENSURE(it != PendingComputeActors.end());
639639
LastStats.emplace_back(std::move(it->second));
640640
PendingComputeActors.erase(it);
641-
642-
LOG_I("Compute actor has finished execution: " << computeActor.ToString());
643-
}
644-
645-
void TKqpPlanner::TaskNotStarted(ui64 taskId) {
646-
// NOTE: should be invoked only while shutting down - when node is disconnected.
647-
648-
auto& task = TasksGraph.GetTask(taskId);
649-
650-
YQL_ENSURE(!task.ComputeActorId);
651-
YQL_ENSURE(!task.Meta.Completed);
652-
653-
PendingComputeTasks.erase(taskId);
641+
return;
654642
}
655643

656644
TProgressStat::TEntry TKqpPlanner::CalculateConsumptionUpdate() {

ydb/core/kqp/executer_actor/kqp_planner.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ class TKqpPlanner {
7676
std::unique_ptr<IEventHandle> AssignTasksToNodes();
7777
bool AcknowledgeCA(ui64 taskId, TActorId computeActor, const NYql::NDqProto::TEvComputeActorState* state);
7878
void CompletedCA(ui64 taskId, TActorId computeActor);
79-
void TaskNotStarted(ui64 taskId);
8079
TProgressStat::TEntry CalculateConsumptionUpdate();
8180
void ShiftConsumption();
8281
void Submit();

ydb/core/kqp/executer_actor/kqp_scan_executer.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -273,9 +273,6 @@ class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Sc
273273
public:
274274

275275
void Finalize() {
276-
YQL_ENSURE(!AlreadyReplied);
277-
AlreadyReplied = true;
278-
279276
FillResponseStats(Ydb::StatusIds::SUCCESS);
280277

281278
LWTRACK(KqpScanExecuterFinalize, ResponseEv->Orbit, TxId, LastTaskId, LastComputeActorId, ResponseEv->ResultsSize());
@@ -284,6 +281,8 @@ class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Sc
284281
ExecuterSpan.EndOk();
285282
}
286283

284+
LOG_D("Sending response to: " << Target);
285+
Send(Target, ResponseEv.release());
287286
PassAway();
288287
}
289288

0 commit comments

Comments
 (0)