@@ -380,7 +380,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
380
380
this ->Send (channelComputeActorId, ackEv.Release (), /* TODO: undelivery */ 0 , /* cookie */ channelId);
381
381
}
382
382
383
- void HandleComputeStats (NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) {
383
+ bool HandleComputeState (NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) {
384
384
TActorId computeActor = ev->Sender ;
385
385
auto & state = ev->Get ()->Record ;
386
386
ui64 taskId = state.GetTaskId ();
@@ -409,27 +409,13 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
409
409
}
410
410
411
411
YQL_ENSURE (Planner);
412
- bool populateChannels = Planner->AcknowledgeCA (taskId, computeActor, &state);
412
+ bool ack = Planner->AcknowledgeCA (taskId, computeActor, &state);
413
413
414
414
switch (state.GetState ()) {
415
- case NYql::NDqProto::COMPUTE_STATE_UNKNOWN: {
416
- YQL_ENSURE (false , " unexpected state from " << computeActor << " , task: " << taskId);
417
- return ;
418
- }
419
-
420
- case NYql::NDqProto::COMPUTE_STATE_EXECUTING: {
421
- if (populateChannels) {
422
- auto & task = TasksGraph.GetTask (taskId);
423
- THashMap<TActorId, THashSet<ui64>> updates;
424
- CollectTaskChannelsUpdates (task, updates);
425
- PropagateChannelsUpdates (updates);
426
- }
427
- break ;
428
- }
429
-
430
415
case NYql::NDqProto::COMPUTE_STATE_FAILURE:
431
416
case NYql::NDqProto::COMPUTE_STATE_FINISHED: {
432
417
ExtraData[computeActor].Swap (state.MutableExtraData ());
418
+
433
419
if (Stats) {
434
420
Stats->AddComputeActorStats (
435
421
computeActor.NodeId (),
@@ -440,9 +426,41 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
440
426
441
427
LastTaskId = taskId;
442
428
LastComputeActorId = computeActor.ToString ();
443
- YQL_ENSURE (Planner);
429
+
444
430
Planner->CompletedCA (taskId, computeActor);
445
431
}
432
+ default :
433
+ ; // ignore all other states.
434
+ }
435
+
436
+ return ack;
437
+ }
438
+
439
+ void HandleComputeStats (NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) {
440
+ TActorId computeActor = ev->Sender ;
441
+ auto & state = ev->Get ()->Record ;
442
+ ui64 taskId = state.GetTaskId ();
443
+
444
+ bool populateChannels = HandleComputeState (ev);
445
+
446
+ switch (state.GetState ()) {
447
+ case NYql::NDqProto::COMPUTE_STATE_UNKNOWN: {
448
+ YQL_ENSURE (false , " unexpected state from " << computeActor << " , task: " << taskId);
449
+ return ;
450
+ }
451
+
452
+ case NYql::NDqProto::COMPUTE_STATE_EXECUTING: {
453
+ if (populateChannels) {
454
+ auto & task = TasksGraph.GetTask (taskId);
455
+ THashMap<TActorId, THashSet<ui64>> updates;
456
+ CollectTaskChannelsUpdates (task, updates);
457
+ PropagateChannelsUpdates (updates);
458
+ }
459
+ break ;
460
+ }
461
+
462
+ default :
463
+ ; // ignore all other states.
446
464
}
447
465
448
466
if (state.GetState () == NYql::NDqProto::COMPUTE_STATE_FAILURE) {
@@ -1854,6 +1872,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
1854
1872
1855
1873
void PassAway () override {
1856
1874
YQL_ENSURE (AlreadyReplied && ResponseEv);
1875
+
1876
+ // Actualize stats with the last stats from terminated CAs, but keep the status.
1877
+ FillResponseStats (ResponseEv->Record .GetResponse ().GetStatus ());
1857
1878
this ->Send (Target, ResponseEv.release ());
1858
1879
1859
1880
for (auto channelPair: ResultChannelProxies) {
0 commit comments