Skip to content

Commit cc79492

Browse files
committed
Revert "Wait for all CAs inside Executer before shutdown (ydb-platform#7829)"
This reverts commit fe50fe3.
1 parent dce7cdd commit cc79492

File tree

8 files changed

+52
-281
lines changed

8 files changed

+52
-281
lines changed

.gitignore

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,6 @@ __pycache__/
2828
*.pb.h
2929
*.pb.cc
3030

31-
# Other generated
32-
*.fbs.h
33-
3431
# MacOS specific
3532
.DS_Store
3633

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 4 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -198,8 +198,6 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
198198
}
199199

200200
void Finalize() {
201-
YQL_ENSURE(!AlreadyReplied);
202-
203201
if (LocksBroken) {
204202
YQL_ENSURE(ResponseEv->BrokenLockShardId);
205203
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {});
@@ -278,7 +276,9 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
278276

279277
ExecuterSpan.EndOk();
280278

281-
AlreadyReplied = true;
279+
Request.Transactions.crop(0);
280+
LOG_D("Sending response to: " << Target << ", results: " << ResponseEv->ResultsSize());
281+
Send(Target, ResponseEv.release());
282282
PassAway();
283283
}
284284

@@ -318,8 +318,6 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
318318
return "WaitSnapshotState";
319319
} else if (func == &TThis::WaitResolveState) {
320320
return "WaitResolveState";
321-
} else if (func == &TThis::WaitShutdownState) {
322-
return "WaitShutdownState";
323321
} else {
324322
return TBase::CurrentStateFuncName();
325323
}
@@ -2255,7 +2253,7 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
22552253
// Volatile transactions must always use generic readsets
22562254
VolatileTx ||
22572255
// Transactions with topics must always use generic readsets
2258-
!topicTxs.empty() ||
2256+
!topicTxs.empty() ||
22592257
// HTAP transactions always use generic readsets
22602258
!evWriteTxs.empty());
22612259

@@ -2633,23 +2631,6 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
26332631
}
26342632
}
26352633

2636-
void Shutdown() override {
2637-
if (Planner) {
2638-
if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
2639-
LOG_I("Shutdown immediately - nothing to wait");
2640-
PassAway();
2641-
} else {
2642-
this->Become(&TThis::WaitShutdownState);
2643-
LOG_I("Waiting for shutdown of " << Planner->GetPendingComputeTasks().size() << " tasks and "
2644-
<< Planner->GetPendingComputeActors().size() << " compute actors");
2645-
// TODO(ilezhankin): the CA awaiting timeout should be configurable.
2646-
TActivationContext::Schedule(TDuration::Seconds(10), new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPoison));
2647-
}
2648-
} else {
2649-
PassAway();
2650-
}
2651-
}
2652-
26532634
void PassAway() override {
26542635
auto totalTime = TInstant::Now() - StartTime;
26552636
Counters->Counters->DataTxTotalTimeHistogram->Collect(totalTime.MilliSeconds());
@@ -2667,54 +2648,6 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
26672648
TBase::PassAway();
26682649
}
26692650

2670-
STATEFN(WaitShutdownState) {
2671-
switch (ev->GetTypeRewrite()) {
2672-
hFunc(TEvDqCompute::TEvState, HandleShutdown);
2673-
hFunc(TEvInterconnect::TEvNodeDisconnected, HandleShutdown);
2674-
hFunc(TEvents::TEvPoison, HandleShutdown);
2675-
default:
2676-
LOG_E("Unexpected event: " << ev->GetTypeName()); // ignore all other events
2677-
}
2678-
}
2679-
2680-
void HandleShutdown(TEvDqCompute::TEvState::TPtr& ev) {
2681-
HandleComputeStats(ev);
2682-
2683-
if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
2684-
PassAway();
2685-
}
2686-
}
2687-
2688-
void HandleShutdown(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
2689-
const auto nodeId = ev->Get()->NodeId;
2690-
LOG_N("Node has disconnected while shutdown: " << nodeId);
2691-
2692-
YQL_ENSURE(Planner);
2693-
2694-
for (const auto& task : TasksGraph.GetTasks()) {
2695-
if (task.Meta.NodeId == nodeId && !task.Meta.Completed) {
2696-
if (task.ComputeActorId) {
2697-
Planner->CompletedCA(task.Id, task.ComputeActorId);
2698-
} else {
2699-
Planner->TaskNotStarted(task.Id);
2700-
}
2701-
}
2702-
}
2703-
2704-
if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
2705-
PassAway();
2706-
}
2707-
}
2708-
2709-
void HandleShutdown(TEvents::TEvPoison::TPtr& ev) {
2710-
// Self-poison means timeout - don't wait anymore.
2711-
LOG_I("Timed out on waiting for Compute Actors to finish - forcing shutdown");
2712-
2713-
if (ev->Sender == SelfId()) {
2714-
PassAway();
2715-
}
2716-
}
2717-
27182651
private:
27192652
void ReplyTxStateUnknown(ui64 shardId) {
27202653
auto message = TStringBuilder() << "Tx state unknown for shard " << shardId << ", txid " << TxId;

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 30 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
418418
auto& extraData = ExtraData[computeActor];
419419
extraData.TaskId = taskId;
420420
extraData.Data.Swap(state.MutableExtraData());
421-
421+
422422

423423
Stats->AddComputeActorStats(
424424
computeActor.NodeId(),
@@ -686,7 +686,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
686686
if (statusCode == Ydb::StatusIds::INTERNAL_ERROR) {
687687
InternalError(issues);
688688
} else if (statusCode == Ydb::StatusIds::TIMEOUT) {
689-
TimeoutError(ev->Sender);
689+
AbortExecutionAndDie(ev->Sender, NYql::NDqProto::StatusIds::TIMEOUT, "Request timeout exceeded");
690690
} else {
691691
RuntimeError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), issues);
692692
}
@@ -1624,14 +1624,14 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
16241624
protected:
16251625
void TerminateComputeActors(Ydb::StatusIds::StatusCode code, const NYql::TIssues& issues) {
16261626
for (const auto& task : this->TasksGraph.GetTasks()) {
1627-
if (task.ComputeActorId && !task.Meta.Completed) {
1627+
if (task.ComputeActorId) {
16281628
LOG_I("aborting compute actor execution, message: " << issues.ToOneLineString()
16291629
<< ", compute actor: " << task.ComputeActorId << ", task: " << task.Id);
16301630

16311631
auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDq::YdbStatusToDqStatus(code), issues);
16321632
this->Send(task.ComputeActorId, ev.Release());
16331633
} else {
1634-
LOG_I("task: " << task.Id << ", does not have the CA id yet or is already complete");
1634+
LOG_I("task: " << task.Id << ", does not have Compute ActorId yet");
16351635
}
16361636
}
16371637
}
@@ -1649,6 +1649,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
16491649

16501650
void InternalError(const NYql::TIssues& issues) {
16511651
LOG_E(issues.ToOneLineString());
1652+
TerminateComputeActors(Ydb::StatusIds::INTERNAL_ERROR, issues);
16521653
auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::UNEXPECTED, "Internal error while executing transaction.");
16531654
for (const NYql::TIssue& i : issues) {
16541655
issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(i));
@@ -1662,13 +1663,15 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
16621663

16631664
void ReplyUnavailable(const TString& message) {
16641665
LOG_E("UNAVAILABLE: " << message);
1666+
TerminateComputeActors(Ydb::StatusIds::UNAVAILABLE, message);
16651667
auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE);
16661668
issue.AddSubIssue(new NYql::TIssue(message));
16671669
ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE, issue);
16681670
}
16691671

16701672
void RuntimeError(Ydb::StatusIds::StatusCode code, const NYql::TIssues& issues) {
16711673
LOG_E(Ydb::StatusIds_StatusCode_Name(code) << ": " << issues.ToOneLineString());
1674+
TerminateComputeActors(code, issues);
16721675
ReplyErrorAndDie(code, issues);
16731676
}
16741677

@@ -1684,19 +1687,11 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
16841687
ReplyErrorAndDie(status, &issues);
16851688
}
16861689

1687-
void TimeoutError(TActorId abortSender) {
1690+
void AbortExecutionAndDie(TActorId abortSender, NYql::NDqProto::StatusIds::StatusCode status, const TString& message) {
16881691
if (AlreadyReplied) {
1689-
LOG_E("Timeout when we already replied - not good" << Endl << TBackTrace().PrintToString() << Endl);
16901692
return;
16911693
}
16921694

1693-
const auto status = NYql::NDqProto::StatusIds::TIMEOUT;
1694-
const TString message = "Request timeout exceeded";
1695-
1696-
TerminateComputeActors(Ydb::StatusIds::TIMEOUT, message);
1697-
1698-
AlreadyReplied = true;
1699-
17001695
LOG_E("Abort execution: " << NYql::NDqProto::StatusIds_StatusCode_Name(status) << "," << message);
17011696
if (ExecuterSpan) {
17021697
ExecuterSpan.EndError(TStringBuilder() << NYql::NDqProto::StatusIds_StatusCode_Name(status));
@@ -1706,24 +1701,34 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17061701

17071702
// TEvAbortExecution can come from either ComputeActor or SessionActor (== Target).
17081703
if (abortSender != Target) {
1709-
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(status, message);
1704+
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(status, "Request timeout exceeded");
17101705
this->Send(Target, abortEv.Release());
17111706
}
17121707

1708+
AlreadyReplied = true;
17131709
LOG_E("Sending timeout response to: " << Target);
1710+
this->Send(Target, ResponseEv.release());
17141711

1715-
this->Shutdown();
1712+
Request.Transactions.crop(0);
1713+
TerminateComputeActors(Ydb::StatusIds::TIMEOUT, message);
1714+
this->PassAway();
17161715
}
17171716

17181717
virtual void ReplyErrorAndDie(Ydb::StatusIds::StatusCode status,
17191718
google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>* issues)
17201719
{
17211720
if (AlreadyReplied) {
1722-
LOG_E("Error when we already replied - not good" << Endl << TBackTrace().PrintToString() << Endl);
17231721
return;
17241722
}
17251723

1726-
TerminateComputeActors(status, "Terminate execution");
1724+
if (Planner) {
1725+
for (auto computeActor : Planner->GetPendingComputeActors()) {
1726+
LOG_D("terminate compute actor " << computeActor.first);
1727+
1728+
auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDq::YdbStatusToDqStatus(status), "Terminate execution");
1729+
this->Send(computeActor.first, ev.Release());
1730+
}
1731+
}
17271732

17281733
AlreadyReplied = true;
17291734
auto& response = *ResponseEv->Record.MutableResponse();
@@ -1749,7 +1754,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17491754
ExecuterSpan.EndError(response.DebugString());
17501755
ExecuterStateSpan.EndError(response.DebugString());
17511756

1752-
this->Shutdown();
1757+
Request.Transactions.crop(0);
1758+
this->Send(Target, ResponseEv.release());
1759+
this->PassAway();
17531760
}
17541761

17551762
protected:
@@ -1817,46 +1824,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
18171824
}
18181825

18191826
protected:
1820-
// Introduced separate method from `PassAway()` - to not get confused with expectations from other actors,
1821-
// that `PassAway()` should kill actor immediately.
1822-
virtual void Shutdown() {
1823-
PassAway();
1824-
}
1825-
18261827
void PassAway() override {
1827-
YQL_ENSURE(AlreadyReplied && ResponseEv);
1828-
1829-
// Fill response stats
1830-
{
1831-
auto& response = *ResponseEv->Record.MutableResponse();
1832-
1833-
YQL_ENSURE(Stats);
1834-
1835-
ReportEventElapsedTime();
1836-
1837-
Stats->FinishTs = TInstant::Now();
1838-
Stats->Finish();
1839-
1840-
if (Stats->CollectStatsByLongTasks || CollectFullStats(Request.StatsMode)) {
1841-
response.MutableResult()->MutableStats()->ClearTxPlansWithStats();
1842-
for (ui32 txId = 0; txId < Request.Transactions.size(); ++txId) {
1843-
const auto& tx = Request.Transactions[txId].Body;
1844-
auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), response.GetResult().GetStats());
1845-
response.MutableResult()->MutableStats()->AddTxPlansWithStats(planWithStats);
1846-
}
1847-
}
1848-
1849-
if (Stats->CollectStatsByLongTasks) {
1850-
const auto& txPlansWithStats = response.GetResult().GetStats().GetTxPlansWithStats();
1851-
if (!txPlansWithStats.empty()) {
1852-
LOG_N("Full stats: " << response.GetResult().GetStats());
1853-
}
1854-
}
1855-
}
1856-
1857-
Request.Transactions.crop(0);
1858-
this->Send(Target, ResponseEv.release());
1859-
18601828
for (auto channelPair: ResultChannelProxies) {
18611829
LOG_D("terminate result channel " << channelPair.first << " proxy at " << channelPair.second->SelfId());
18621830

@@ -1877,11 +1845,12 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
18771845

18781846
if (KqpTableResolverId) {
18791847
this->Send(KqpTableResolverId, new TEvents::TEvPoison);
1848+
this->Send(this->SelfId(), new TEvents::TEvPoison);
1849+
LOG_T("Terminate, become ZombieState");
1850+
this->Become(&TKqpExecuterBase::ZombieState);
1851+
} else {
1852+
IActor::PassAway();
18801853
}
1881-
1882-
this->Send(this->SelfId(), new TEvents::TEvPoison);
1883-
LOG_T("Terminate, become ZombieState");
1884-
this->Become(&TKqpExecuterBase::ZombieState);
18851854
}
18861855

18871856
STATEFN(ZombieState) {

ydb/core/kqp/executer_actor/kqp_planner.cpp

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -644,17 +644,6 @@ bool TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) {
644644
return true;
645645
}
646646

647-
void TKqpPlanner::TaskNotStarted(ui64 taskId) {
648-
// NOTE: should be invoked only while shutting down - when node is disconnected.
649-
650-
auto& task = TasksGraph.GetTask(taskId);
651-
652-
YQL_ENSURE(!task.ComputeActorId);
653-
YQL_ENSURE(!task.Meta.Completed);
654-
655-
PendingComputeTasks.erase(taskId);
656-
}
657-
658647
TProgressStat::TEntry TKqpPlanner::CalculateConsumptionUpdate() {
659648
TProgressStat::TEntry consumption;
660649

ydb/core/kqp/executer_actor/kqp_planner.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ class TKqpPlanner {
7676
std::unique_ptr<IEventHandle> AssignTasksToNodes();
7777
bool AcknowledgeCA(ui64 taskId, TActorId computeActor, const NYql::NDqProto::TEvComputeActorState* state);
7878
bool CompletedCA(ui64 taskId, TActorId computeActor);
79-
void TaskNotStarted(ui64 taskId);
8079
TProgressStat::TEntry CalculateConsumptionUpdate();
8180
void ShiftConsumption();
8281
void Submit();

ydb/core/kqp/executer_actor/kqp_scan_executer.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -273,9 +273,6 @@ class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Sc
273273
public:
274274

275275
void Finalize() {
276-
YQL_ENSURE(!AlreadyReplied);
277-
AlreadyReplied = true;
278-
279276
ResponseEv->Record.MutableResponse()->SetStatus(Ydb::StatusIds::SUCCESS);
280277

281278
LWTRACK(KqpScanExecuterFinalize, ResponseEv->Orbit, TxId, LastTaskId, LastComputeActorId, ResponseEv->ResultsSize());
@@ -284,6 +281,8 @@ class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Sc
284281
ExecuterSpan.EndOk();
285282
}
286283

284+
LOG_D("Sending response to: " << Target);
285+
Send(Target, ResponseEv.release());
287286
PassAway();
288287
}
289288

0 commit comments

Comments
 (0)