Skip to content

Wait for all CAs inside Executer before shutdown #7829

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Aug 23, 2024
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ __pycache__/
*.pb.h
*.pb.cc

# Other generated
*.fbs.h

# MacOS specific
.DS_Store

Expand Down
78 changes: 76 additions & 2 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}

void Finalize() {
YQL_ENSURE(!AlreadyReplied);

if (LocksBroken) {
TString message = "Transaction locks invalidated.";

Expand Down Expand Up @@ -278,8 +280,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
ExecuterSpan.EndOk();

Request.Transactions.crop(0);
LOG_D("Sending response to: " << Target << ", results: " << ResponseEv->ResultsSize());
Send(Target, ResponseEv.release());
AlreadyReplied = true;
PassAway();
}

Expand Down Expand Up @@ -319,6 +320,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
return "WaitSnapshotState";
} else if (func == &TThis::WaitResolveState) {
return "WaitResolveState";
} else if (func == &TThis::WaitShutdownState) {
return "WaitShutdownState";
} else {
return TBase::CurrentStateFuncName();
}
Expand Down Expand Up @@ -2595,6 +2598,22 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}
}

void Shutdown() override {
if (Planner) {
if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
LOG_I("Shutdown immediately - nothing to wait");
PassAway();
} else {
this->Become(&TThis::WaitShutdownState);
LOG_I("Waiting for shutdown of " << Planner->GetPendingComputeTasks().size() << " tasks and "
<< Planner->GetPendingComputeActors().size() << " compute actors");
TActivationContext::Schedule(TDuration::Seconds(10), new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPoison));
}
} else {
PassAway();
}
}

void PassAway() override {
auto totalTime = TInstant::Now() - StartTime;
Counters->Counters->DataTxTotalTimeHistogram->Collect(totalTime.MilliSeconds());
Expand All @@ -2612,6 +2631,61 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
TBase::PassAway();
}

STATEFN(WaitShutdownState) {
switch(ev->GetTypeRewrite()) {
hFunc(TEvDqCompute::TEvState, HandleShutdown);
hFunc(TEvInterconnect::TEvNodeDisconnected, HandleShutdown);
hFunc(TEvents::TEvPoison, HandleShutdown);
default:
LOG_E("Unexpected event: " << ev->GetTypeName()); // ignore all other events
}
}

void HandleShutdown(TEvDqCompute::TEvState::TPtr& ev) {
if (ev->Get()->Record.GetState() == NDqProto::COMPUTE_STATE_FAILURE) {
YQL_ENSURE(Planner);

TActorId actor = ev->Sender;
ui64 taskId = ev->Get()->Record.GetTaskId();

Planner->CompletedCA(taskId, actor);

if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
PassAway();
}
}
}

void HandleShutdown(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
const auto nodeId = ev->Get()->NodeId;
LOG_N("Node has disconnected while shutdown: " << nodeId);

YQL_ENSURE(Planner);

for (const auto& task : TasksGraph.GetTasks()) {
if (task.Meta.NodeId == nodeId && !task.Meta.Completed) {
if (task.ComputeActorId) {
Planner->CompletedCA(task.Id, task.ComputeActorId);
} else {
Planner->TaskNotStarted(task.Id);
}
}
}

if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
PassAway();
}
}

void HandleShutdown(TEvents::TEvPoison::TPtr& ev) {
// Self-poison means timeout - don't wait anymore.
LOG_I("Timed out on waiting for Compute Actors to finish - forcing shutdown");

if (ev->Sender == SelfId()) {
PassAway();
}
}

private:
void ReplyTxStateUnknown(ui64 shardId) {
auto message = TStringBuilder() << "Tx state unknown for shard " << shardId << ", txid " << TxId;
Expand Down
57 changes: 30 additions & 27 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
if (statusCode == Ydb::StatusIds::INTERNAL_ERROR) {
InternalError(issues);
} else if (statusCode == Ydb::StatusIds::TIMEOUT) {
AbortExecutionAndDie(ev->Sender, NYql::NDqProto::StatusIds::TIMEOUT, "Request timeout exceeded");
TimeoutError(ev->Sender);
} else {
RuntimeError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), issues);
}
Expand Down Expand Up @@ -1624,14 +1624,14 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
protected:
void TerminateComputeActors(Ydb::StatusIds::StatusCode code, const NYql::TIssues& issues) {
for (const auto& task : this->TasksGraph.GetTasks()) {
if (task.ComputeActorId) {
if (task.ComputeActorId && !task.Meta.Completed) {
LOG_I("aborting compute actor execution, message: " << issues.ToOneLineString()
<< ", compute actor: " << task.ComputeActorId << ", task: " << task.Id);

auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDq::YdbStatusToDqStatus(code), issues);
this->Send(task.ComputeActorId, ev.Release());
} else {
LOG_I("task: " << task.Id << ", does not have Compute ActorId yet");
LOG_I("task: " << task.Id << ", does not have the CA id yet or is already complete");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add checks here also and write more explicit message, is it already completed or CA id is not known.

}
}
}
Expand All @@ -1649,7 +1649,6 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {

void InternalError(const NYql::TIssues& issues) {
LOG_E(issues.ToOneLineString());
TerminateComputeActors(Ydb::StatusIds::INTERNAL_ERROR, issues);
auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::UNEXPECTED, "Internal error while executing transaction.");
for (const NYql::TIssue& i : issues) {
issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(i));
Expand All @@ -1663,15 +1662,13 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {

void ReplyUnavailable(const TString& message) {
LOG_E("UNAVAILABLE: " << message);
TerminateComputeActors(Ydb::StatusIds::UNAVAILABLE, message);
auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE);
issue.AddSubIssue(new NYql::TIssue(message));
ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE, issue);
}

void RuntimeError(Ydb::StatusIds::StatusCode code, const NYql::TIssues& issues) {
LOG_E(Ydb::StatusIds_StatusCode_Name(code) << ": " << issues.ToOneLineString());
TerminateComputeActors(code, issues);
ReplyErrorAndDie(code, issues);
}

Expand All @@ -1687,11 +1684,19 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
ReplyErrorAndDie(status, &issues);
}

void AbortExecutionAndDie(TActorId abortSender, NYql::NDqProto::StatusIds::StatusCode status, const TString& message) {
void TimeoutError(TActorId abortSender) {
if (AlreadyReplied) {
LOG_E("Timeout when we already replied - not good" << Endl << TBackTrace().PrintToString() << Endl);
return;
}

const auto status = NYql::NDqProto::StatusIds::TIMEOUT;
const TString message = "Request timeout exceeded";

TerminateComputeActors(Ydb::StatusIds::TIMEOUT, message);

AlreadyReplied = true;

LOG_E("Abort execution: " << NYql::NDqProto::StatusIds_StatusCode_Name(status) << "," << message);
if (ExecuterSpan) {
ExecuterSpan.EndError(TStringBuilder() << NYql::NDqProto::StatusIds_StatusCode_Name(status));
Expand All @@ -1701,17 +1706,14 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {

// TEvAbortExecution can come from either ComputeActor or SessionActor (== Target).
if (abortSender != Target) {
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(status, "Request timeout exceeded");
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(status, message);
this->Send(Target, abortEv.Release());
}

AlreadyReplied = true;
LOG_E("Sending timeout response to: " << Target);
this->Send(Target, ResponseEv.release());

Request.Transactions.crop(0);
TerminateComputeActors(Ydb::StatusIds::TIMEOUT, message);
this->PassAway();
this->Shutdown();
}

void FillResponseStats(Ydb::StatusIds::StatusCode status) {
Expand Down Expand Up @@ -1746,17 +1748,11 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>* issues)
{
if (AlreadyReplied) {
LOG_E("Error when we already replied - not good" << Endl << TBackTrace().PrintToString() << Endl);
return;
}

if (Planner) {
for (auto computeActor : Planner->GetPendingComputeActors()) {
LOG_D("terminate compute actor " << computeActor.first);

auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDq::YdbStatusToDqStatus(status), "Terminate execution");
this->Send(computeActor.first, ev.Release());
}
}
TerminateComputeActors(status, "Terminate execution");

AlreadyReplied = true;
auto& response = *ResponseEv->Record.MutableResponse();
Expand All @@ -1782,8 +1778,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
ExecuterStateSpan.EndError(response.DebugString());

Request.Transactions.crop(0);
this->Send(Target, ResponseEv.release());
this->PassAway();
this->Shutdown();
}

protected:
Expand Down Expand Up @@ -1851,7 +1846,16 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
}

protected:
// Introduced separate method from `PassAway()` - to not get confused with expectations from other actors,
// that `PassAway()` should kill actor immediately.
virtual void Shutdown() {
PassAway();
}

void PassAway() override {
YQL_ENSURE(AlreadyReplied && ResponseEv);
this->Send(Target, ResponseEv.release());

for (auto channelPair: ResultChannelProxies) {
LOG_D("terminate result channel " << channelPair.first << " proxy at " << channelPair.second->SelfId());

Expand All @@ -1872,12 +1876,11 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {

if (KqpTableResolverId) {
this->Send(KqpTableResolverId, new TEvents::TEvPoison);
this->Send(this->SelfId(), new TEvents::TEvPoison);
LOG_T("Terminate, become ZombieState");
this->Become(&TKqpExecuterBase::ZombieState);
} else {
IActor::PassAway();
}

this->Send(this->SelfId(), new TEvents::TEvPoison);
LOG_T("Terminate, become ZombieState");
this->Become(&TKqpExecuterBase::ZombieState);
}

STATEFN(ZombieState) {
Expand Down
14 changes: 13 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,19 @@ void TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) {
YQL_ENSURE(it != PendingComputeActors.end());
LastStats.emplace_back(std::move(it->second));
PendingComputeActors.erase(it);
return;

LOG_I("Compute actor has finished execution: " << computeActor.ToString());
}

void TKqpPlanner::TaskNotStarted(ui64 taskId) {
// NOTE: should be invoked only while shutting down - when node is disconnected.

auto& task = TasksGraph.GetTask(taskId);

YQL_ENSURE(!task.ComputeActorId);
YQL_ENSURE(!task.Meta.Completed);

PendingComputeTasks.erase(taskId);
}

TProgressStat::TEntry TKqpPlanner::CalculateConsumptionUpdate() {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/executer_actor/kqp_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class TKqpPlanner {
std::unique_ptr<IEventHandle> AssignTasksToNodes();
bool AcknowledgeCA(ui64 taskId, TActorId computeActor, const NYql::NDqProto::TEvComputeActorState* state);
void CompletedCA(ui64 taskId, TActorId computeActor);
void TaskNotStarted(ui64 taskId);
TProgressStat::TEntry CalculateConsumptionUpdate();
void ShiftConsumption();
void Submit();
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,9 @@ class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Sc
public:

void Finalize() {
YQL_ENSURE(!AlreadyReplied);
AlreadyReplied = true;

FillResponseStats(Ydb::StatusIds::SUCCESS);

LWTRACK(KqpScanExecuterFinalize, ResponseEv->Orbit, TxId, LastTaskId, LastComputeActorId, ResponseEv->ResultsSize());
Expand All @@ -281,8 +284,6 @@ class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Sc
ExecuterSpan.EndOk();
}

LOG_D("Sending response to: " << Target);
Send(Target, ResponseEv.release());
PassAway();
}

Expand Down
Loading
Loading