Skip to content

Commit ccb5e0e

Browse files
authored
Merge 3e9e20c into dbf5fb8
2 parents dbf5fb8 + 3e9e20c commit ccb5e0e

File tree

2 files changed

+44
-28
lines changed

2 files changed

+44
-28
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2608,6 +2608,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
26082608
this->Become(&TThis::WaitShutdownState);
26092609
LOG_I("Waiting for shutdown of " << Planner->GetPendingComputeTasks().size() << " tasks and "
26102610
<< Planner->GetPendingComputeActors().size() << " compute actors");
2611+
// TODO(ilezhankin): the CA awaiting timeout should be configurable.
26112612
TActivationContext::Schedule(TDuration::Seconds(10), new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPoison));
26122613
}
26132614
} else {
@@ -2629,6 +2630,10 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
26292630
Send(MakePipePerNodeCacheID(true), new TEvPipeCache::TEvUnlink(0));
26302631
}
26312632

2633+
// Actualize stats with the last stats from terminated CAs, but keep the status.
2634+
YQL_ENSURE(ResponseEv);
2635+
FillResponseStats(ResponseEv->Record.GetResponse().GetStatus());
2636+
26322637
TBase::PassAway();
26332638
}
26342639

@@ -2643,17 +2648,10 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
26432648
}
26442649

26452650
void HandleShutdown(TEvDqCompute::TEvState::TPtr& ev) {
2646-
if (ev->Get()->Record.GetState() == NDqProto::COMPUTE_STATE_FAILURE) {
2647-
YQL_ENSURE(Planner);
2648-
2649-
TActorId actor = ev->Sender;
2650-
ui64 taskId = ev->Get()->Record.GetTaskId();
2651+
HandleComputeState(ev);
26512652

2652-
Planner->CompletedCA(taskId, actor);
2653-
2654-
if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
2655-
PassAway();
2656-
}
2653+
if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
2654+
PassAway();
26572655
}
26582656
}
26592657

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
380380
this->Send(channelComputeActorId, ackEv.Release(), /* TODO: undelivery */ 0, /* cookie */ channelId);
381381
}
382382

383-
void HandleComputeStats(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) {
383+
bool HandleComputeState(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) {
384384
TActorId computeActor = ev->Sender;
385385
auto& state = ev->Get()->Record;
386386
ui64 taskId = state.GetTaskId();
@@ -409,27 +409,13 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
409409
}
410410

411411
YQL_ENSURE(Planner);
412-
bool populateChannels = Planner->AcknowledgeCA(taskId, computeActor, &state);
412+
bool ack = Planner->AcknowledgeCA(taskId, computeActor, &state);
413413

414414
switch (state.GetState()) {
415-
case NYql::NDqProto::COMPUTE_STATE_UNKNOWN: {
416-
YQL_ENSURE(false, "unexpected state from " << computeActor << ", task: " << taskId);
417-
return;
418-
}
419-
420-
case NYql::NDqProto::COMPUTE_STATE_EXECUTING: {
421-
if (populateChannels) {
422-
auto& task = TasksGraph.GetTask(taskId);
423-
THashMap<TActorId, THashSet<ui64>> updates;
424-
CollectTaskChannelsUpdates(task, updates);
425-
PropagateChannelsUpdates(updates);
426-
}
427-
break;
428-
}
429-
430415
case NYql::NDqProto::COMPUTE_STATE_FAILURE:
431416
case NYql::NDqProto::COMPUTE_STATE_FINISHED: {
432417
ExtraData[computeActor].Swap(state.MutableExtraData());
418+
433419
if (Stats) {
434420
Stats->AddComputeActorStats(
435421
computeActor.NodeId(),
@@ -440,9 +426,41 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
440426

441427
LastTaskId = taskId;
442428
LastComputeActorId = computeActor.ToString();
443-
YQL_ENSURE(Planner);
429+
444430
Planner->CompletedCA(taskId, computeActor);
445431
}
432+
default:
433+
; // ignore all other states.
434+
}
435+
436+
return ack;
437+
}
438+
439+
void HandleComputeStats(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) {
440+
TActorId computeActor = ev->Sender;
441+
auto& state = ev->Get()->Record;
442+
ui64 taskId = state.GetTaskId();
443+
444+
bool populateChannels = HandleComputeState(ev);
445+
446+
switch (state.GetState()) {
447+
case NYql::NDqProto::COMPUTE_STATE_UNKNOWN: {
448+
YQL_ENSURE(false, "unexpected state from " << computeActor << ", task: " << taskId);
449+
return;
450+
}
451+
452+
case NYql::NDqProto::COMPUTE_STATE_EXECUTING: {
453+
if (populateChannels) {
454+
auto& task = TasksGraph.GetTask(taskId);
455+
THashMap<TActorId, THashSet<ui64>> updates;
456+
CollectTaskChannelsUpdates(task, updates);
457+
PropagateChannelsUpdates(updates);
458+
}
459+
break;
460+
}
461+
462+
default:
463+
; // ignore all other states.
446464
}
447465

448466
if (state.GetState() == NYql::NDqProto::COMPUTE_STATE_FAILURE) {

0 commit comments

Comments
 (0)