@@ -177,10 +177,10 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
177
177
}
178
178
179
179
void ReportEventElapsedTime () {
180
- if (Stats) {
181
- ui64 elapsedMicros = TlsActivationContext-> GetCurrentEventTicksAsSeconds () * 1'000'000 ;
182
- Stats-> ExecuterCpuTime += TDuration::MicroSeconds (elapsedMicros) ;
183
- }
180
+ YQL_ENSURE (Stats);
181
+
182
+ ui64 elapsedMicros = TlsActivationContext-> GetCurrentEventTicksAsSeconds () * 1'000'000 ;
183
+ Stats-> ExecuterCpuTime += TDuration::MicroSeconds (elapsedMicros);
184
184
}
185
185
186
186
protected:
@@ -330,11 +330,10 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
330
330
}
331
331
332
332
YQL_ENSURE (channel.DstTask == 0 );
333
+ YQL_ENSURE (Stats);
333
334
334
- if (Stats) {
335
- Stats->ResultBytes += batch.Size ();
336
- Stats->ResultRows += batch.RowCount ();
337
- }
335
+ Stats->ResultBytes += batch.Size ();
336
+ Stats->ResultRows += batch.RowCount ();
338
337
339
338
LOG_T (" Got result, channelId: " << channel.Id << " , shardId: " << task.Meta .ShardId
340
339
<< " , inputIndex: " << channel.DstInputIndex << " , from: " << ev->Sender
@@ -391,7 +390,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
391
390
<< " , state: " << NYql::NDqProto::EComputeState_Name ((NYql::NDqProto::EComputeState) state.GetState ())
392
391
<< " , stats: " << state.GetStats ());
393
392
394
- if (Stats && state.HasStats () && Request.ProgressStatsPeriod ) {
393
+ YQL_ENSURE (Stats);
394
+
395
+ if (state.HasStats () && Request.ProgressStatsPeriod ) {
395
396
Stats->UpdateTaskStats (taskId, state.GetStats ());
396
397
auto now = TInstant::Now ();
397
398
if (LastProgressStats + Request.ProgressStatsPeriod <= now) {
@@ -418,13 +419,11 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
418
419
if (Planner->CompletedCA (taskId, computeActor)) {
419
420
ExtraData[computeActor].Swap (state.MutableExtraData ());
420
421
421
- if (Stats) {
422
- Stats->AddComputeActorStats (
423
- computeActor.NodeId (),
424
- std::move (*state.MutableStats ()),
425
- TDuration::MilliSeconds (AggregationSettings.GetCollectLongTasksStatsTimeoutMs ())
426
- );
427
- }
422
+ Stats->AddComputeActorStats (
423
+ computeActor.NodeId (),
424
+ std::move (*state.MutableStats ()),
425
+ TDuration::MilliSeconds (AggregationSettings.GetCollectLongTasksStatsTimeoutMs ())
426
+ );
428
427
429
428
LastTaskId = taskId;
430
429
LastComputeActorId = computeActor.ToString ();
@@ -512,9 +511,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
512
511
auto now = TAppData::TimeProvider->Now ();
513
512
StartResolveTime = now;
514
513
515
- if (Stats) {
516
- Stats-> StartTs = now;
517
- }
514
+ YQL_ENSURE (Stats);
515
+
516
+ Stats-> StartTs = now;
518
517
}
519
518
520
519
TMaybe<size_t > FindReadRangesSource (const NKqpProto::TKqpPhyStage& stage) {
@@ -1148,8 +1147,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
1148
1147
: Nothing ();
1149
1148
1150
1149
YQL_ENSURE (!shardsResolved || nodeId);
1150
+ YQL_ENSURE (Stats);
1151
1151
1152
- if (shardId && Stats ) {
1152
+ if (shardId) {
1153
1153
Stats->AffectedShards .insert (*shardId);
1154
1154
}
1155
1155
@@ -1217,11 +1217,13 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
1217
1217
1218
1218
if (partitions.size () > 0 && source.GetSequentialInFlightShards () > 0 && partitions.size () > source.GetSequentialInFlightShards ()) {
1219
1219
auto [startShard, shardInfo] = MakeVirtualTablePartition (source, stageInfo, HolderFactory (), TypeEnv ());
1220
- if (Stats) {
1221
- for (auto & [shardId, _] : partitions) {
1222
- Stats->AffectedShards .insert (shardId);
1223
- }
1220
+
1221
+ YQL_ENSURE (Stats);
1222
+
1223
+ for (auto & [shardId, _] : partitions) {
1224
+ Stats->AffectedShards .insert (shardId);
1224
1225
}
1226
+
1225
1227
if (shardInfo.KeyReadRanges ) {
1226
1228
addPartiton (startShard, {}, shardInfo, source.GetSequentialInFlightShards ());
1227
1229
fillRangesForTasks ();
@@ -1484,6 +1486,8 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
1484
1486
THashMap<ui64, ui64> assignedShardsCount;
1485
1487
auto & stage = stageInfo.Meta .GetStage (stageInfo.Id );
1486
1488
1489
+ YQL_ENSURE (Stats);
1490
+
1487
1491
const auto & tableInfo = stageInfo.Meta .TableConstInfo ;
1488
1492
const auto & keyTypes = tableInfo->KeyColumnTypes ;
1489
1493
ui32 metaId = 0 ;
@@ -1512,7 +1516,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
1512
1516
nodeShards[nodeId].emplace_back (TShardInfoWithId (i.first , std::move (i.second )));
1513
1517
}
1514
1518
1515
- if (Stats && CollectProfileStats (Request.StatsMode )) {
1519
+ if (CollectProfileStats (Request.StatsMode )) {
1516
1520
for (auto && i : nodeShards) {
1517
1521
Stats->AddNodeShardsCount (stageInfo.Id .StageId , i.first , i.second .size ());
1518
1522
}
@@ -1697,7 +1701,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
1697
1701
ExecuterSpan.EndError (TStringBuilder () << NYql::NDqProto::StatusIds_StatusCode_Name (status));
1698
1702
}
1699
1703
1700
- FillResponseStats (Ydb::StatusIds::TIMEOUT);
1704
+ ResponseEv-> Record . MutableResponse ()-> SetStatus (Ydb::StatusIds::TIMEOUT);
1701
1705
1702
1706
// TEvAbortExecution can come from either ComputeActor or SessionActor (== Target).
1703
1707
if (abortSender != Target) {
@@ -1707,38 +1711,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
1707
1711
1708
1712
LOG_E (" Sending timeout response to: " << Target);
1709
1713
1710
- Request.Transactions .crop (0 );
1711
1714
this ->Shutdown ();
1712
1715
}
1713
1716
1714
- void FillResponseStats (Ydb::StatusIds::StatusCode status) {
1715
- auto & response = *ResponseEv->Record .MutableResponse ();
1716
-
1717
- response.SetStatus (status);
1718
-
1719
- if (Stats) {
1720
- ReportEventElapsedTime ();
1721
-
1722
- Stats->FinishTs = TInstant::Now ();
1723
- Stats->Finish ();
1724
-
1725
- if (Stats->CollectStatsByLongTasks || CollectFullStats (Request.StatsMode )) {
1726
- for (ui32 txId = 0 ; txId < Request.Transactions .size (); ++txId) {
1727
- const auto & tx = Request.Transactions [txId].Body ;
1728
- auto planWithStats = AddExecStatsToTxPlan (tx->GetPlan (), response.GetResult ().GetStats ());
1729
- response.MutableResult ()->MutableStats ()->AddTxPlansWithStats (planWithStats);
1730
- }
1731
- }
1732
-
1733
- if (Stats->CollectStatsByLongTasks ) {
1734
- const auto & txPlansWithStats = response.GetResult ().GetStats ().GetTxPlansWithStats ();
1735
- if (!txPlansWithStats.empty ()) {
1736
- LOG_N (" Full stats: " << txPlansWithStats);
1737
- }
1738
- }
1739
- }
1740
- }
1741
-
1742
1717
virtual void ReplyErrorAndDie (Ydb::StatusIds::StatusCode status,
1743
1718
google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>* issues)
1744
1719
{
@@ -1752,8 +1727,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
1752
1727
AlreadyReplied = true ;
1753
1728
auto & response = *ResponseEv->Record .MutableResponse ();
1754
1729
1755
- FillResponseStats (status);
1756
-
1730
+ response.SetStatus (status);
1757
1731
response.MutableIssues ()->Swap (issues);
1758
1732
1759
1733
LOG_T (" ReplyErrorAndDie. Response: " << response.DebugString ()
@@ -1772,7 +1746,6 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
1772
1746
ExecuterSpan.EndError (response.DebugString ());
1773
1747
ExecuterStateSpan.EndError (response.DebugString ());
1774
1748
1775
- Request.Transactions .crop (0 );
1776
1749
this ->Shutdown ();
1777
1750
}
1778
1751
@@ -1850,8 +1823,35 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
1850
1823
void PassAway () override {
1851
1824
YQL_ENSURE (AlreadyReplied && ResponseEv);
1852
1825
1853
- // Actualize stats with the last stats from terminated CAs, but keep the status.
1854
- FillResponseStats (ResponseEv->Record .GetResponse ().GetStatus ());
1826
+ // Fill response stats
1827
+ {
1828
+ auto & response = *ResponseEv->Record .MutableResponse ();
1829
+
1830
+ YQL_ENSURE (Stats);
1831
+
1832
+ ReportEventElapsedTime ();
1833
+
1834
+ Stats->FinishTs = TInstant::Now ();
1835
+ Stats->Finish ();
1836
+
1837
+ if (Stats->CollectStatsByLongTasks || CollectFullStats (Request.StatsMode )) {
1838
+ response.MutableResult ()->MutableStats ()->ClearTxPlansWithStats ();
1839
+ for (ui32 txId = 0 ; txId < Request.Transactions .size (); ++txId) {
1840
+ const auto & tx = Request.Transactions [txId].Body ;
1841
+ auto planWithStats = AddExecStatsToTxPlan (tx->GetPlan (), response.GetResult ().GetStats ());
1842
+ response.MutableResult ()->MutableStats ()->AddTxPlansWithStats (planWithStats);
1843
+ }
1844
+ }
1845
+
1846
+ if (Stats->CollectStatsByLongTasks ) {
1847
+ const auto & txPlansWithStats = response.GetResult ().GetStats ().GetTxPlansWithStats ();
1848
+ if (!txPlansWithStats.empty ()) {
1849
+ LOG_N (" Full stats: " << response.GetResult ().GetStats ());
1850
+ }
1851
+ }
1852
+ }
1853
+
1854
+ Request.Transactions .crop (0 );
1855
1855
this ->Send (Target, ResponseEv.release ());
1856
1856
1857
1857
for (auto channelPair: ResultChannelProxies) {
0 commit comments