@@ -408,86 +408,48 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
408
408
}
409
409
}
410
410
411
+ YQL_ENSURE (Planner);
412
+ bool populateChannels = Planner->AcknowledgeCA (taskId, computeActor, &state);
413
+
411
414
switch (state.GetState ()) {
412
415
case NYql::NDqProto::COMPUTE_STATE_UNKNOWN: {
413
416
YQL_ENSURE (false , " unexpected state from " << computeActor << " , task: " << taskId);
414
417
return ;
415
418
}
416
419
417
- case NYql::NDqProto::COMPUTE_STATE_FAILURE: {
418
- ReplyErrorAndDie (NYql::NDq::DqStatusToYdbStatus (state.GetStatusCode ()), state.MutableIssues ());
419
- return ;
420
- }
421
-
422
420
case NYql::NDqProto::COMPUTE_STATE_EXECUTING: {
423
- // initial TEvState event from Compute Actor
424
- // there can be race with RM answer
425
- if (Planner) {
426
- if (Planner->GetPendingComputeTasks ().erase (taskId)) {
427
- auto it = Planner->GetPendingComputeActors ().emplace (computeActor, TProgressStat ());
428
- YQL_ENSURE (it.second );
429
-
430
- if (state.HasStats ()) {
431
- it.first ->second .Set (state.GetStats ());
432
- }
433
-
434
- auto & task = TasksGraph.GetTask (taskId);
435
- task.ComputeActorId = computeActor;
436
-
437
- THashMap<TActorId, THashSet<ui64>> updates;
438
- CollectTaskChannelsUpdates (task, updates);
439
- PropagateChannelsUpdates (updates);
440
- } else {
441
- auto it = Planner->GetPendingComputeActors ().find (computeActor);
442
- if (it != Planner->GetPendingComputeActors ().end ()) {
443
- if (state.HasStats ()) {
444
- it->second .Set (state.GetStats ());
445
- }
446
- }
447
- }
421
+ if (populateChannels) {
422
+ auto & task = TasksGraph.GetTask (taskId);
423
+ THashMap<TActorId, THashSet<ui64>> updates;
424
+ CollectTaskChannelsUpdates (task, updates);
425
+ PropagateChannelsUpdates (updates);
448
426
}
449
427
break ;
450
428
}
451
429
430
+ case NYql::NDqProto::COMPUTE_STATE_FAILURE:
452
431
case NYql::NDqProto::COMPUTE_STATE_FINISHED: {
432
+ ExtraData[computeActor].Swap (state.MutableExtraData ());
453
433
if (Stats) {
454
434
Stats->AddComputeActorStats (
455
435
computeActor.NodeId (),
456
436
std::move (*state.MutableStats ()),
457
437
TDuration::MilliSeconds (AggregationSettings.GetCollectLongTasksStatsTimeoutMs ())
458
438
);
459
439
}
460
- ExtraData[computeActor].Swap (state.MutableExtraData ());
461
440
462
441
LastTaskId = taskId;
463
442
LastComputeActorId = computeActor.ToString ();
464
-
465
- if (Planner) {
466
- auto it = Planner->GetPendingComputeActors ().find (computeActor);
467
- if (it == Planner->GetPendingComputeActors ().end ()) {
468
- LOG_W (" Got execution state for compute actor: " << computeActor
469
- << " , task: " << taskId
470
- << " , state: " << NYql::NDqProto::EComputeState_Name ((NYql::NDqProto::EComputeState) state.GetState ())
471
- << " , too early (waiting reply from RM)" );
472
-
473
- if (Planner && Planner->GetPendingComputeTasks ().erase (taskId)) {
474
- LOG_E (" Got execution state for compute actor: " << computeActor
475
- << " , for unknown task: " << state.GetTaskId ()
476
- << " , state: " << NYql::NDqProto::EComputeState_Name ((NYql::NDqProto::EComputeState) state.GetState ()));
477
- return ;
478
- }
479
- } else {
480
- if (state.HasStats ()) {
481
- it->second .Set (state.GetStats ());
482
- }
483
- LastStats.emplace_back (std::move (it->second ));
484
- Planner->GetPendingComputeActors ().erase (it);
485
- YQL_ENSURE (Planner->GetPendingComputeTasks ().find (taskId) == Planner->GetPendingComputeTasks ().end ());
486
- }
487
- }
443
+ YQL_ENSURE (Planner);
444
+ Planner->CompletedCA (taskId, computeActor);
488
445
}
489
446
}
490
447
448
+ if (state.GetState () == NYql::NDqProto::COMPUTE_STATE_FAILURE) {
449
+ ReplyErrorAndDie (NYql::NDq::DqStatusToYdbStatus (state.GetStatusCode ()), state.MutableIssues ());
450
+ return ;
451
+ }
452
+
491
453
static_cast <TDerived*>(this )->CheckExecutionComplete ();
492
454
}
493
455
@@ -683,20 +645,14 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
683
645
auto taskId = startedTask.GetTaskId ();
684
646
auto & task = TasksGraph.GetTask (taskId);
685
647
686
- task.ComputeActorId = ActorIdFromProto (startedTask.GetActorId ());
687
-
688
- LOG_D (" Executing task: " << taskId << " on compute actor: " << task.ComputeActorId );
689
-
690
- if (Planner) {
691
- if (Planner->GetPendingComputeTasks ().erase (taskId) == 0 ) {
692
- LOG_D (" Executing task: " << taskId << " , compute actor: " << task.ComputeActorId << " , already finished" );
693
- } else {
694
- auto result = Planner->GetPendingComputeActors ().emplace (std::make_pair (task.ComputeActorId , TProgressStat ()));
695
- YQL_ENSURE (result.second );
696
-
697
- CollectTaskChannelsUpdates (task, channelsUpdates);
698
- }
648
+ TActorId computeActorId = ActorIdFromProto (startedTask.GetActorId ());
649
+ LOG_D (" Executing task: " << taskId << " on compute actor: " << computeActorId);
650
+ YQL_ENSURE (Planner);
651
+ bool channelUpdates = Planner->AcknowledgeCA (taskId, computeActorId, nullptr );
652
+ if (channelUpdates) {
653
+ CollectTaskChannelsUpdates (task, channelsUpdates);
699
654
}
655
+
700
656
}
701
657
702
658
PropagateChannelsUpdates (channelsUpdates);
@@ -789,16 +745,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
789
745
LastResourceUsageUpdate = now;
790
746
791
747
TProgressStat::TEntry consumption;
792
- if (Planner) {
793
- for (const auto & p : Planner->GetPendingComputeActors ()) {
794
- const auto & t = p.second .GetLastUsage ();
795
- consumption += t;
796
- }
797
- }
798
748
799
- for (const auto & p : LastStats) {
800
- const auto & t = p.GetLastUsage ();
801
- consumption += t;
749
+ if (Planner) {
750
+ consumption += Planner->CalculateConsumptionUpdate ();
802
751
}
803
752
804
753
auto ru = NRuCalc::CalcRequestUnit (consumption);
@@ -811,13 +760,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
811
760
return ;
812
761
813
762
if (Planner) {
814
- for (auto & p : Planner->GetPendingComputeActors ()) {
815
- p.second .Update ();
816
- }
817
- }
818
-
819
- for (auto & p : LastStats) {
820
- p.Update ();
763
+ Planner->ShiftConsumption ();
821
764
}
822
765
823
766
if (Request.RlPath ) {
@@ -1758,7 +1701,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
1758
1701
ExecuterSpan.EndError (TStringBuilder () << NYql::NDqProto::StatusIds_StatusCode_Name (status));
1759
1702
}
1760
1703
1761
- static_cast <TDerived*>( this )-> FillResponseStats (Ydb::StatusIds::TIMEOUT);
1704
+ FillResponseStats (Ydb::StatusIds::TIMEOUT);
1762
1705
1763
1706
// TEvAbortExecution can come from either ComputeActor or SessionActor (== Target).
1764
1707
if (abortSender != Target) {
@@ -1775,6 +1718,34 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
1775
1718
this ->PassAway ();
1776
1719
}
1777
1720
1721
+ void FillResponseStats (Ydb::StatusIds::StatusCode status) {
1722
+ auto & response = *ResponseEv->Record .MutableResponse ();
1723
+
1724
+ response.SetStatus (status);
1725
+
1726
+ if (Stats) {
1727
+ ReportEventElapsedTime ();
1728
+
1729
+ Stats->FinishTs = TInstant::Now ();
1730
+ Stats->Finish ();
1731
+
1732
+ if (Stats->CollectStatsByLongTasks || CollectFullStats (Request.StatsMode )) {
1733
+ for (ui32 txId = 0 ; txId < Request.Transactions .size (); ++txId) {
1734
+ const auto & tx = Request.Transactions [txId].Body ;
1735
+ auto planWithStats = AddExecStatsToTxPlan (tx->GetPlan (), response.GetResult ().GetStats ());
1736
+ response.MutableResult ()->MutableStats ()->AddTxPlansWithStats (planWithStats);
1737
+ }
1738
+ }
1739
+
1740
+ if (Stats->CollectStatsByLongTasks ) {
1741
+ const auto & txPlansWithStats = response.GetResult ().GetStats ().GetTxPlansWithStats ();
1742
+ if (!txPlansWithStats.empty ()) {
1743
+ LOG_N (" Full stats: " << txPlansWithStats);
1744
+ }
1745
+ }
1746
+ }
1747
+ }
1748
+
1778
1749
virtual void ReplyErrorAndDie (Ydb::StatusIds::StatusCode status,
1779
1750
google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>* issues)
1780
1751
{
@@ -1794,7 +1765,8 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
1794
1765
AlreadyReplied = true ;
1795
1766
auto & response = *ResponseEv->Record .MutableResponse ();
1796
1767
1797
- response.SetStatus (status);
1768
+ FillResponseStats (status);
1769
+
1798
1770
response.MutableIssues ()->Swap (issues);
1799
1771
1800
1772
LOG_T (" ReplyErrorAndDie. Response: " << response.DebugString ()
@@ -1972,8 +1944,6 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
1972
1944
TActorId KqpShardsResolverId;
1973
1945
THashMap<TActorId, NYql::NDqProto::TComputeActorExtraData> ExtraData;
1974
1946
1975
- TVector<TProgressStat> LastStats;
1976
-
1977
1947
TInstant StartResolveTime;
1978
1948
TInstant LastResourceUsageUpdate;
1979
1949
0 commit comments