Skip to content

Commit 3e9e20c

Browse files
committed
Properly handle all CA states and stats
1 parent d3449f0 commit 3e9e20c

File tree

2 files changed

+39
-37
lines changed

2 files changed

+39
-37
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2647,26 +2647,10 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
26472647
}
26482648

26492649
void HandleShutdown(TEvDqCompute::TEvState::TPtr& ev) {
2650-
if (ev->Get()->Record.GetState() == NDqProto::COMPUTE_STATE_FAILURE) {
2651-
YQL_ENSURE(Planner);
2652-
2653-
TActorId actor = ev->Sender;
2654-
auto& state = ev->Get()->Record;
2655-
ui64 taskId = state.GetTaskId();
2656-
2657-
if (Stats) {
2658-
Stats->AddComputeActorStats(
2659-
actor.NodeId(),
2660-
std::move(*state.MutableStats()),
2661-
TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs())
2662-
);
2663-
}
2664-
2665-
Planner->CompletedCA(taskId, actor);
2650+
HandleComputeState(ev);
26662651

2667-
if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
2668-
PassAway();
2669-
}
2652+
if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
2653+
PassAway();
26702654
}
26712655
}
26722656

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
380380
this->Send(channelComputeActorId, ackEv.Release(), /* TODO: undelivery */ 0, /* cookie */ channelId);
381381
}
382382

383-
void HandleComputeStats(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) {
383+
bool HandleComputeState(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) {
384384
TActorId computeActor = ev->Sender;
385385
auto& state = ev->Get()->Record;
386386
ui64 taskId = state.GetTaskId();
@@ -409,27 +409,13 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
409409
}
410410

411411
YQL_ENSURE(Planner);
412-
bool populateChannels = Planner->AcknowledgeCA(taskId, computeActor, &state);
412+
bool ack = Planner->AcknowledgeCA(taskId, computeActor, &state);
413413

414414
switch (state.GetState()) {
415-
case NYql::NDqProto::COMPUTE_STATE_UNKNOWN: {
416-
YQL_ENSURE(false, "unexpected state from " << computeActor << ", task: " << taskId);
417-
return;
418-
}
419-
420-
case NYql::NDqProto::COMPUTE_STATE_EXECUTING: {
421-
if (populateChannels) {
422-
auto& task = TasksGraph.GetTask(taskId);
423-
THashMap<TActorId, THashSet<ui64>> updates;
424-
CollectTaskChannelsUpdates(task, updates);
425-
PropagateChannelsUpdates(updates);
426-
}
427-
break;
428-
}
429-
430415
case NYql::NDqProto::COMPUTE_STATE_FAILURE:
431416
case NYql::NDqProto::COMPUTE_STATE_FINISHED: {
432417
ExtraData[computeActor].Swap(state.MutableExtraData());
418+
433419
if (Stats) {
434420
Stats->AddComputeActorStats(
435421
computeActor.NodeId(),
@@ -440,9 +426,41 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
440426

441427
LastTaskId = taskId;
442428
LastComputeActorId = computeActor.ToString();
443-
YQL_ENSURE(Planner);
429+
444430
Planner->CompletedCA(taskId, computeActor);
445431
}
432+
default:
433+
; // ignore all other states.
434+
}
435+
436+
return ack;
437+
}
438+
439+
void HandleComputeStats(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) {
440+
TActorId computeActor = ev->Sender;
441+
auto& state = ev->Get()->Record;
442+
ui64 taskId = state.GetTaskId();
443+
444+
bool populateChannels = HandleComputeState(ev);
445+
446+
switch (state.GetState()) {
447+
case NYql::NDqProto::COMPUTE_STATE_UNKNOWN: {
448+
YQL_ENSURE(false, "unexpected state from " << computeActor << ", task: " << taskId);
449+
return;
450+
}
451+
452+
case NYql::NDqProto::COMPUTE_STATE_EXECUTING: {
453+
if (populateChannels) {
454+
auto& task = TasksGraph.GetTask(taskId);
455+
THashMap<TActorId, THashSet<ui64>> updates;
456+
CollectTaskChannelsUpdates(task, updates);
457+
PropagateChannelsUpdates(updates);
458+
}
459+
break;
460+
}
461+
462+
default:
463+
; // ignore all other states.
446464
}
447465

448466
if (state.GetState() == NYql::NDqProto::COMPUTE_STATE_FAILURE) {

0 commit comments

Comments
 (0)