Skip to content

Commit 01f7382

Browse files
authored
Merge 9185b23 into acc9844
2 parents acc9844 + 9185b23 commit 01f7382

File tree

5 files changed

+51
-34
lines changed

5 files changed

+51
-34
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -532,7 +532,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
532532
if (ev->Get()->Record.GetState() == NDqProto::COMPUTE_STATE_FAILURE) {
533533
CancelProposal(0);
534534
}
535-
HandleComputeStats(ev);
535+
HandleComputeState(ev);
536536
}
537537

538538
void HandlePrepare(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
@@ -1012,7 +1012,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
10121012
hFunc(TEvInterconnect::TEvNodeDisconnected, HandleDisconnected);
10131013
hFunc(TEvKqpNode::TEvStartKqpTasksResponse, HandleStartKqpTasksResponse);
10141014
hFunc(TEvTxProxy::TEvProposeTransactionStatus, HandleExecute);
1015-
hFunc(TEvDqCompute::TEvState, HandleComputeStats);
1015+
hFunc(TEvDqCompute::TEvState, HandleComputeState);
10161016
hFunc(NYql::NDq::TEvDqCompute::TEvChannelData, HandleChannelData);
10171017
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck);
10181018
hFunc(TEvKqp::TEvAbortExecution, HandleExecute);
@@ -2638,6 +2638,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
26382638
this->Become(&TThis::WaitShutdownState);
26392639
LOG_I("Waiting for shutdown of " << Planner->GetPendingComputeTasks().size() << " tasks and "
26402640
<< Planner->GetPendingComputeActors().size() << " compute actors");
2641+
// TODO(ilezhankin): the CA awaiting timeout should be configurable.
26412642
TActivationContext::Schedule(TDuration::Seconds(10), new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPoison));
26422643
}
26432644
} else {
@@ -2673,17 +2674,10 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
26732674
}
26742675

26752676
void HandleShutdown(TEvDqCompute::TEvState::TPtr& ev) {
2676-
if (ev->Get()->Record.GetState() == NDqProto::COMPUTE_STATE_FAILURE) {
2677-
YQL_ENSURE(Planner);
2678-
2679-
TActorId actor = ev->Sender;
2680-
ui64 taskId = ev->Get()->Record.GetTaskId();
2681-
2682-
Planner->CompletedCA(taskId, actor);
2677+
HandleComputeStats(ev);
26832678

2684-
if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
2685-
PassAway();
2686-
}
2679+
if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
2680+
PassAway();
26872681
}
26882682
}
26892683

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
380380
this->Send(channelComputeActorId, ackEv.Release(), /* TODO: undelivery */ 0, /* cookie */ channelId);
381381
}
382382

383-
void HandleComputeStats(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) {
383+
bool HandleComputeStats(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) {
384384
TActorId computeActor = ev->Sender;
385385
auto& state = ev->Get()->Record;
386386
ui64 taskId = state.GetTaskId();
@@ -409,7 +409,39 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
409409
}
410410

411411
YQL_ENSURE(Planner);
412-
bool populateChannels = Planner->AcknowledgeCA(taskId, computeActor, &state);
412+
bool ack = Planner->AcknowledgeCA(taskId, computeActor, &state);
413+
414+
switch (state.GetState()) {
415+
case NYql::NDqProto::COMPUTE_STATE_FAILURE:
416+
case NYql::NDqProto::COMPUTE_STATE_FINISHED:
417+
// Don't finalize stats twice.
418+
if (Planner->CompletedCA(taskId, computeActor)) {
419+
ExtraData[computeActor].Swap(state.MutableExtraData());
420+
421+
if (Stats) {
422+
Stats->AddComputeActorStats(
423+
computeActor.NodeId(),
424+
std::move(*state.MutableStats()),
425+
TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs())
426+
);
427+
}
428+
429+
LastTaskId = taskId;
430+
LastComputeActorId = computeActor.ToString();
431+
}
432+
default:
433+
; // ignore all other states.
434+
}
435+
436+
return ack;
437+
}
438+
439+
void HandleComputeState(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) {
440+
TActorId computeActor = ev->Sender;
441+
auto& state = ev->Get()->Record;
442+
ui64 taskId = state.GetTaskId();
443+
444+
bool populateChannels = HandleComputeStats(ev);
413445

414446
switch (state.GetState()) {
415447
case NYql::NDqProto::COMPUTE_STATE_UNKNOWN: {
@@ -427,22 +459,8 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
427459
break;
428460
}
429461

430-
case NYql::NDqProto::COMPUTE_STATE_FAILURE:
431-
case NYql::NDqProto::COMPUTE_STATE_FINISHED: {
432-
ExtraData[computeActor].Swap(state.MutableExtraData());
433-
if (Stats) {
434-
Stats->AddComputeActorStats(
435-
computeActor.NodeId(),
436-
std::move(*state.MutableStats()),
437-
TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs())
438-
);
439-
}
440-
441-
LastTaskId = taskId;
442-
LastComputeActorId = computeActor.ToString();
443-
YQL_ENSURE(Planner);
444-
Planner->CompletedCA(taskId, computeActor);
445-
}
462+
default:
463+
; // ignore all other states.
446464
}
447465

448466
if (state.GetState() == NYql::NDqProto::COMPUTE_STATE_FAILURE) {
@@ -1854,6 +1872,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
18541872

18551873
void PassAway() override {
18561874
YQL_ENSURE(AlreadyReplied && ResponseEv);
1875+
1876+
// Actualize stats with the last stats from terminated CAs, but keep the status.
1877+
FillResponseStats(ResponseEv->Record.GetResponse().GetStatus());
18571878
this->Send(Target, ResponseEv.release());
18581879

18591880
for (auto channelPair: ResultChannelProxies) {

ydb/core/kqp/executer_actor/kqp_planner.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -592,11 +592,11 @@ bool TKqpPlanner::AcknowledgeCA(ui64 taskId, TActorId computeActor, const NYql::
592592
return false;
593593
}
594594

595-
void TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) {
595+
bool TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) {
596596
auto& task = TasksGraph.GetTask(taskId);
597597
if (task.Meta.Completed) {
598598
YQL_ENSURE(!PendingComputeActors.contains(computeActor));
599-
return;
599+
return false;
600600
}
601601

602602
task.Meta.Completed = true;
@@ -606,6 +606,8 @@ void TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) {
606606
PendingComputeActors.erase(it);
607607

608608
LOG_I("Compute actor has finished execution: " << computeActor.ToString());
609+
610+
return true;
609611
}
610612

611613
void TKqpPlanner::TaskNotStarted(ui64 taskId) {

ydb/core/kqp/executer_actor/kqp_planner.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ class TKqpPlanner {
7575
std::unique_ptr<IEventHandle> PlanExecution();
7676
std::unique_ptr<IEventHandle> AssignTasksToNodes();
7777
bool AcknowledgeCA(ui64 taskId, TActorId computeActor, const NYql::NDqProto::TEvComputeActorState* state);
78-
void CompletedCA(ui64 taskId, TActorId computeActor);
78+
bool CompletedCA(ui64 taskId, TActorId computeActor);
7979
void TaskNotStarted(ui64 taskId);
8080
TProgressStat::TEntry CalculateConsumptionUpdate();
8181
void ShiftConsumption();

ydb/core/kqp/executer_actor/kqp_scan_executer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Sc
110110
STATEFN(ExecuteState) {
111111
try {
112112
switch (ev->GetTypeRewrite()) {
113-
hFunc(TEvDqCompute::TEvState, HandleComputeStats);
113+
hFunc(TEvDqCompute::TEvState, HandleComputeState);
114114
hFunc(TEvDqCompute::TEvChannelData, HandleChannelData); // from CA
115115
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck);
116116
hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution);

0 commit comments

Comments
 (0)