Skip to content

Commit 93089ee

Browse files
authored
Apply the last stats received from terminated CAs (#8356)
1 parent 0e39c53 commit 93089ee

File tree

5 files changed

+51
-34
lines changed

5 files changed

+51
-34
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -533,7 +533,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
533533
if (ev->Get()->Record.GetState() == NDqProto::COMPUTE_STATE_FAILURE) {
534534
CancelProposal(0);
535535
}
536-
HandleComputeStats(ev);
536+
HandleComputeState(ev);
537537
}
538538

539539
void HandlePrepare(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
@@ -1015,7 +1015,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
10151015
hFunc(TEvInterconnect::TEvNodeDisconnected, HandleDisconnected);
10161016
hFunc(TEvKqpNode::TEvStartKqpTasksResponse, HandleStartKqpTasksResponse);
10171017
hFunc(TEvTxProxy::TEvProposeTransactionStatus, HandleExecute);
1018-
hFunc(TEvDqCompute::TEvState, HandleComputeStats);
1018+
hFunc(TEvDqCompute::TEvState, HandleComputeState);
10191019
hFunc(NYql::NDq::TEvDqCompute::TEvChannelData, HandleChannelData);
10201020
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck);
10211021
hFunc(TEvKqp::TEvAbortExecution, HandleExecute);
@@ -2646,6 +2646,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
26462646
this->Become(&TThis::WaitShutdownState);
26472647
LOG_I("Waiting for shutdown of " << Planner->GetPendingComputeTasks().size() << " tasks and "
26482648
<< Planner->GetPendingComputeActors().size() << " compute actors");
2649+
// TODO(ilezhankin): the CA awaiting timeout should be configurable.
26492650
TActivationContext::Schedule(TDuration::Seconds(10), new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPoison));
26502651
}
26512652
} else {
@@ -2681,17 +2682,10 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
26812682
}
26822683

26832684
void HandleShutdown(TEvDqCompute::TEvState::TPtr& ev) {
2684-
if (ev->Get()->Record.GetState() == NDqProto::COMPUTE_STATE_FAILURE) {
2685-
YQL_ENSURE(Planner);
2686-
2687-
TActorId actor = ev->Sender;
2688-
ui64 taskId = ev->Get()->Record.GetTaskId();
2689-
2690-
Planner->CompletedCA(taskId, actor);
2685+
HandleComputeStats(ev);
26912686

2692-
if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
2693-
PassAway();
2694-
}
2687+
if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
2688+
PassAway();
26952689
}
26962690
}
26972691

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
380380
this->Send(channelComputeActorId, ackEv.Release(), /* TODO: undelivery */ 0, /* cookie */ channelId);
381381
}
382382

383-
void HandleComputeStats(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) {
383+
bool HandleComputeStats(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) {
384384
TActorId computeActor = ev->Sender;
385385
auto& state = ev->Get()->Record;
386386
ui64 taskId = state.GetTaskId();
@@ -409,7 +409,39 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
409409
}
410410

411411
YQL_ENSURE(Planner);
412-
bool populateChannels = Planner->AcknowledgeCA(taskId, computeActor, &state);
412+
bool ack = Planner->AcknowledgeCA(taskId, computeActor, &state);
413+
414+
switch (state.GetState()) {
415+
case NYql::NDqProto::COMPUTE_STATE_FAILURE:
416+
case NYql::NDqProto::COMPUTE_STATE_FINISHED:
417+
// Don't finalize stats twice.
418+
if (Planner->CompletedCA(taskId, computeActor)) {
419+
ExtraData[computeActor].Swap(state.MutableExtraData());
420+
421+
if (Stats) {
422+
Stats->AddComputeActorStats(
423+
computeActor.NodeId(),
424+
std::move(*state.MutableStats()),
425+
TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs())
426+
);
427+
}
428+
429+
LastTaskId = taskId;
430+
LastComputeActorId = computeActor.ToString();
431+
}
432+
default:
433+
; // ignore all other states.
434+
}
435+
436+
return ack;
437+
}
438+
439+
void HandleComputeState(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) {
440+
TActorId computeActor = ev->Sender;
441+
auto& state = ev->Get()->Record;
442+
ui64 taskId = state.GetTaskId();
443+
444+
bool populateChannels = HandleComputeStats(ev);
413445

414446
switch (state.GetState()) {
415447
case NYql::NDqProto::COMPUTE_STATE_UNKNOWN: {
@@ -427,22 +459,8 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
427459
break;
428460
}
429461

430-
case NYql::NDqProto::COMPUTE_STATE_FAILURE:
431-
case NYql::NDqProto::COMPUTE_STATE_FINISHED: {
432-
ExtraData[computeActor].Swap(state.MutableExtraData());
433-
if (Stats) {
434-
Stats->AddComputeActorStats(
435-
computeActor.NodeId(),
436-
std::move(*state.MutableStats()),
437-
TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs())
438-
);
439-
}
440-
441-
LastTaskId = taskId;
442-
LastComputeActorId = computeActor.ToString();
443-
YQL_ENSURE(Planner);
444-
Planner->CompletedCA(taskId, computeActor);
445-
}
462+
default:
463+
; // ignore all other states.
446464
}
447465

448466
if (state.GetState() == NYql::NDqProto::COMPUTE_STATE_FAILURE) {
@@ -1854,6 +1872,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
18541872

18551873
void PassAway() override {
18561874
YQL_ENSURE(AlreadyReplied && ResponseEv);
1875+
1876+
// Actualize stats with the last stats from terminated CAs, but keep the status.
1877+
FillResponseStats(ResponseEv->Record.GetResponse().GetStatus());
18571878
this->Send(Target, ResponseEv.release());
18581879

18591880
for (auto channelPair: ResultChannelProxies) {

ydb/core/kqp/executer_actor/kqp_planner.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -592,11 +592,11 @@ bool TKqpPlanner::AcknowledgeCA(ui64 taskId, TActorId computeActor, const NYql::
592592
return false;
593593
}
594594

595-
void TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) {
595+
bool TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) {
596596
auto& task = TasksGraph.GetTask(taskId);
597597
if (task.Meta.Completed) {
598598
YQL_ENSURE(!PendingComputeActors.contains(computeActor));
599-
return;
599+
return false;
600600
}
601601

602602
task.Meta.Completed = true;
@@ -606,6 +606,8 @@ void TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) {
606606
PendingComputeActors.erase(it);
607607

608608
LOG_I("Compute actor has finished execution: " << computeActor.ToString());
609+
610+
return true;
609611
}
610612

611613
void TKqpPlanner::TaskNotStarted(ui64 taskId) {

ydb/core/kqp/executer_actor/kqp_planner.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ class TKqpPlanner {
7575
std::unique_ptr<IEventHandle> PlanExecution();
7676
std::unique_ptr<IEventHandle> AssignTasksToNodes();
7777
bool AcknowledgeCA(ui64 taskId, TActorId computeActor, const NYql::NDqProto::TEvComputeActorState* state);
78-
void CompletedCA(ui64 taskId, TActorId computeActor);
78+
bool CompletedCA(ui64 taskId, TActorId computeActor);
7979
void TaskNotStarted(ui64 taskId);
8080
TProgressStat::TEntry CalculateConsumptionUpdate();
8181
void ShiftConsumption();

ydb/core/kqp/executer_actor/kqp_scan_executer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Sc
110110
STATEFN(ExecuteState) {
111111
try {
112112
switch (ev->GetTypeRewrite()) {
113-
hFunc(TEvDqCompute::TEvState, HandleComputeStats);
113+
hFunc(TEvDqCompute::TEvState, HandleComputeState);
114114
hFunc(TEvDqCompute::TEvChannelData, HandleChannelData); // from CA
115115
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck);
116116
hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution);

0 commit comments

Comments
 (0)