Skip to content

Commit 7b73908

Browse files
committed
Improve compute actor code
1 parent 4965bc0 commit 7b73908

File tree

1 file changed

+14
-32
lines changed

1 file changed

+14
-32
lines changed

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

Lines changed: 14 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -515,22 +515,6 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
515515
RuntimeSettings.TerminateHandler(success, issues);
516516
}
517517

518-
// Send final state to executer to inform about termination.
519-
if (!success) {
520-
auto ev = MakeHolder<TEvDqCompute::TEvState>();
521-
auto& record = ev->Record;
522-
523-
if (RuntimeSettings.StatsMode >= NDqProto::DQ_STATS_MODE_BASIC) {
524-
FillStats(record.MutableStats(), /* last */ true);
525-
}
526-
527-
record.SetState(NDqProto::COMPUTE_STATE_FINISHED);
528-
record.SetStatusCode(NDqProto::StatusIds::ABORTED);
529-
record.SetTaskId(Task.GetId());
530-
531-
this->Send(ExecuterId, ev.Release());
532-
}
533-
534518
this->PassAway();
535519
Terminated = true;
536520
}
@@ -570,7 +554,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
570554
}
571555
}
572556

573-
void ReportStateAndMaybeDie(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues)
557+
void ReportStateAndMaybeDie(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues, bool forceTerminate = false)
574558
{
575559
auto execEv = MakeHolder<TEvDqCompute::TEvState>();
576560
auto& record = execEv->Record;
@@ -591,7 +575,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
591575

592576
this->Send(ExecuterId, execEv.Release());
593577

594-
if (Checkpoints && State == NDqProto::COMPUTE_STATE_FINISHED) {
578+
if (!forceTerminate && Checkpoints && State == NDqProto::COMPUTE_STATE_FINISHED) {
595579
// checkpointed CAs must not self-destroy
596580
return;
597581
}
@@ -1048,10 +1032,6 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
10481032
auto tag = (EEvWakeupTag) ev->Get()->Tag;
10491033
switch (tag) {
10501034
case EEvWakeupTag::TimeoutTag: {
1051-
auto abortEv = MakeHolder<TEvDq::TEvAbortExecution>(NYql::NDqProto::StatusIds::TIMEOUT, TStringBuilder()
1052-
<< "Timeout event from compute actor " << this->SelfId()
1053-
<< ", TxId: " << TxId << ", task: " << Task.GetId());
1054-
10551035
if (ComputeActorSpan) {
10561036
ComputeActorSpan.EndError(
10571037
TStringBuilder()
@@ -1060,10 +1040,8 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
10601040
);
10611041
}
10621042

1063-
this->Send(ExecuterId, abortEv.Release());
1064-
1065-
TerminateSources("timeout exceeded", false);
1066-
Terminate(false, "timeout exceeded");
1043+
State = NDqProto::COMPUTE_STATE_FAILURE;
1044+
ReportStateAndMaybeDie(NYql::NDqProto::StatusIds::TIMEOUT, {TIssue("timeout exceeded")}, true);
10671045
break;
10681046
}
10691047
case EEvWakeupTag::PeriodicStatsTag: {
@@ -1087,8 +1065,9 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
10871065
switch (lostEventType) {
10881066
case TEvDqCompute::TEvState::EventType: {
10891067
CA_LOG_E("Handle undelivered TEvState event, abort execution");
1090-
this->TerminateSources("executer lost", false);
1091-
Terminate(false, "executer lost");
1068+
1069+
TerminateSources("executer lost", false);
1070+
Terminate(false, "executer lost"); // Executer lost - no need to report state
10921071
break;
10931072
}
10941073
default: {
@@ -1134,14 +1113,17 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
11341113
InternalError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, *ev->Get()->GetIssues().begin());
11351114
return;
11361115
}
1116+
11371117
TIssues issues = ev->Get()->GetIssues();
11381118
CA_LOG_E("Handle abort execution event from: " << ev->Sender
11391119
<< ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(ev->Get()->Record.GetStatusCode())
11401120
<< ", reason: " << issues.ToOneLineString());
11411121

1142-
bool success = ev->Get()->Record.GetStatusCode() == NYql::NDqProto::StatusIds::SUCCESS;
1143-
1144-
this->TerminateSources(issues, success);
1122+
if (ev->Get()->Record.GetStatusCode() == NYql::NDqProto::StatusIds::SUCCESS) {
1123+
State = NDqProto::COMPUTE_STATE_FINISHED;
1124+
} else {
1125+
State = NDqProto::COMPUTE_STATE_FAILURE;
1126+
}
11451127

11461128
if (ev->Sender != ExecuterId) {
11471129
if (ComputeActorSpan) {
@@ -1151,7 +1133,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
11511133
NActors::TActivationContext::Send(ev->Forward(ExecuterId));
11521134
}
11531135

1154-
Terminate(success, issues);
1136+
ReportStateAndMaybeDie(ev->Get()->Record.GetStatusCode(), issues, true);
11551137
}
11561138

11571139
void HandleExecuteBase(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {

0 commit comments

Comments
 (0)