@@ -177,10 +177,10 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
177
177
}
178
178
179
179
void ReportEventElapsedTime () {
180
- if (Stats) {
181
- ui64 elapsedMicros = TlsActivationContext-> GetCurrentEventTicksAsSeconds () * 1'000'000 ;
182
- Stats-> ExecuterCpuTime += TDuration::MicroSeconds (elapsedMicros) ;
183
- }
180
+ YQL_ENSURE (Stats);
181
+
182
+ ui64 elapsedMicros = TlsActivationContext-> GetCurrentEventTicksAsSeconds () * 1'000'000 ;
183
+ Stats-> ExecuterCpuTime += TDuration::MicroSeconds (elapsedMicros);
184
184
}
185
185
186
186
protected:
@@ -330,11 +330,10 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
330
330
}
331
331
332
332
YQL_ENSURE (channel.DstTask == 0 );
333
+ YQL_ENSURE (Stats);
333
334
334
- if (Stats) {
335
- Stats->ResultBytes += batch.Size ();
336
- Stats->ResultRows += batch.RowCount ();
337
- }
335
+ Stats->ResultBytes += batch.Size ();
336
+ Stats->ResultRows += batch.RowCount ();
338
337
339
338
LOG_T (" Got result, channelId: " << channel.Id << " , shardId: " << task.Meta .ShardId
340
339
<< " , inputIndex: " << channel.DstInputIndex << " , from: " << ev->Sender
@@ -391,7 +390,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
391
390
<< " , state: " << NYql::NDqProto::EComputeState_Name ((NYql::NDqProto::EComputeState) state.GetState ())
392
391
<< " , stats: " << state.GetStats ());
393
392
394
- if (Stats && state.HasStats () && Request.ProgressStatsPeriod ) {
393
+ YQL_ENSURE (Stats);
394
+
395
+ if (state.HasStats () && Request.ProgressStatsPeriod ) {
395
396
Stats->UpdateTaskStats (taskId, state.GetStats ());
396
397
auto now = TInstant::Now ();
397
398
if (LastProgressStats + Request.ProgressStatsPeriod <= now) {
@@ -418,13 +419,11 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
418
419
if (Planner->CompletedCA (taskId, computeActor)) {
419
420
ExtraData[computeActor].Swap (state.MutableExtraData ());
420
421
421
- if (Stats) {
422
- Stats->AddComputeActorStats (
423
- computeActor.NodeId (),
424
- std::move (*state.MutableStats ()),
425
- TDuration::MilliSeconds (AggregationSettings.GetCollectLongTasksStatsTimeoutMs ())
426
- );
427
- }
422
+ Stats->AddComputeActorStats (
423
+ computeActor.NodeId (),
424
+ std::move (*state.MutableStats ()),
425
+ TDuration::MilliSeconds (AggregationSettings.GetCollectLongTasksStatsTimeoutMs ())
426
+ );
428
427
429
428
LastTaskId = taskId;
430
429
LastComputeActorId = computeActor.ToString ();
@@ -512,9 +511,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
512
511
auto now = TAppData::TimeProvider->Now ();
513
512
StartResolveTime = now;
514
513
515
- if (Stats) {
516
- Stats-> StartTs = now;
517
- }
514
+ YQL_ENSURE (Stats);
515
+
516
+ Stats-> StartTs = now;
518
517
}
519
518
520
519
TMaybe<size_t > FindReadRangesSource (const NKqpProto::TKqpPhyStage& stage) {
@@ -1167,8 +1166,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
1167
1166
: Nothing ();
1168
1167
1169
1168
YQL_ENSURE (!shardsResolved || nodeId);
1169
+ YQL_ENSURE (Stats);
1170
1170
1171
- if (shardId && Stats ) {
1171
+ if (shardId) {
1172
1172
Stats->AffectedShards .insert (*shardId);
1173
1173
}
1174
1174
@@ -1236,11 +1236,13 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
1236
1236
1237
1237
if (partitions.size () > 0 && source.GetSequentialInFlightShards () > 0 && partitions.size () > source.GetSequentialInFlightShards ()) {
1238
1238
auto [startShard, shardInfo] = MakeVirtualTablePartition (source, stageInfo, HolderFactory (), TypeEnv ());
1239
- if (Stats) {
1240
- for (auto & [shardId, _] : partitions) {
1241
- Stats->AffectedShards .insert (shardId);
1242
- }
1239
+
1240
+ YQL_ENSURE (Stats);
1241
+
1242
+ for (auto & [shardId, _] : partitions) {
1243
+ Stats->AffectedShards .insert (shardId);
1243
1244
}
1245
+
1244
1246
if (shardInfo.KeyReadRanges ) {
1245
1247
addPartiton (startShard, {}, shardInfo, source.GetSequentialInFlightShards ());
1246
1248
fillRangesForTasks ();
@@ -1507,6 +1509,8 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
1507
1509
THashMap<ui64, ui64> assignedShardsCount;
1508
1510
auto & stage = stageInfo.Meta .GetStage (stageInfo.Id );
1509
1511
1512
+ YQL_ENSURE (Stats);
1513
+
1510
1514
const auto & tableInfo = stageInfo.Meta .TableConstInfo ;
1511
1515
const auto & keyTypes = tableInfo->KeyColumnTypes ;
1512
1516
ui32 metaId = 0 ;
@@ -1535,7 +1539,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
1535
1539
nodeShards[nodeId].emplace_back (TShardInfoWithId (i.first , std::move (i.second )));
1536
1540
}
1537
1541
1538
- if (Stats && CollectProfileStats (Request.StatsMode )) {
1542
+ if (CollectProfileStats (Request.StatsMode )) {
1539
1543
for (auto && i : nodeShards) {
1540
1544
Stats->AddNodeShardsCount (stageInfo.Id .StageId , i.first , i.second .size ());
1541
1545
}
@@ -1720,7 +1724,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
1720
1724
ExecuterSpan.EndError (TStringBuilder () << NYql::NDqProto::StatusIds_StatusCode_Name (status));
1721
1725
}
1722
1726
1723
- FillResponseStats (Ydb::StatusIds::TIMEOUT);
1727
+ ResponseEv-> Record . MutableResponse ()-> SetStatus (Ydb::StatusIds::TIMEOUT);
1724
1728
1725
1729
// TEvAbortExecution can come from either ComputeActor or SessionActor (== Target).
1726
1730
if (abortSender != Target) {
@@ -1730,38 +1734,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
1730
1734
1731
1735
LOG_E (" Sending timeout response to: " << Target);
1732
1736
1733
- Request.Transactions .crop (0 );
1734
1737
this ->Shutdown ();
1735
1738
}
1736
1739
1737
- void FillResponseStats (Ydb::StatusIds::StatusCode status) {
1738
- auto & response = *ResponseEv->Record .MutableResponse ();
1739
-
1740
- response.SetStatus (status);
1741
-
1742
- if (Stats) {
1743
- ReportEventElapsedTime ();
1744
-
1745
- Stats->FinishTs = TInstant::Now ();
1746
- Stats->Finish ();
1747
-
1748
- if (Stats->CollectStatsByLongTasks || CollectFullStats (Request.StatsMode )) {
1749
- for (ui32 txId = 0 ; txId < Request.Transactions .size (); ++txId) {
1750
- const auto & tx = Request.Transactions [txId].Body ;
1751
- auto planWithStats = AddExecStatsToTxPlan (tx->GetPlan (), response.GetResult ().GetStats ());
1752
- response.MutableResult ()->MutableStats ()->AddTxPlansWithStats (planWithStats);
1753
- }
1754
- }
1755
-
1756
- if (Stats->CollectStatsByLongTasks ) {
1757
- const auto & txPlansWithStats = response.GetResult ().GetStats ().GetTxPlansWithStats ();
1758
- if (!txPlansWithStats.empty ()) {
1759
- LOG_N (" Full stats: " << txPlansWithStats);
1760
- }
1761
- }
1762
- }
1763
- }
1764
-
1765
1740
virtual void ReplyErrorAndDie (Ydb::StatusIds::StatusCode status,
1766
1741
google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>* issues)
1767
1742
{
@@ -1775,8 +1750,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
1775
1750
AlreadyReplied = true ;
1776
1751
auto & response = *ResponseEv->Record .MutableResponse ();
1777
1752
1778
- FillResponseStats (status);
1779
-
1753
+ response.SetStatus (status);
1780
1754
response.MutableIssues ()->Swap (issues);
1781
1755
1782
1756
LOG_T (" ReplyErrorAndDie. Response: " << response.DebugString ()
@@ -1795,7 +1769,6 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
1795
1769
ExecuterSpan.EndError (response.DebugString ());
1796
1770
ExecuterStateSpan.EndError (response.DebugString ());
1797
1771
1798
- Request.Transactions .crop (0 );
1799
1772
this ->Shutdown ();
1800
1773
}
1801
1774
@@ -1873,8 +1846,35 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
1873
1846
void PassAway () override {
1874
1847
YQL_ENSURE (AlreadyReplied && ResponseEv);
1875
1848
1876
- // Actualize stats with the last stats from terminated CAs, but keep the status.
1877
- FillResponseStats (ResponseEv->Record .GetResponse ().GetStatus ());
1849
+ // Fill response stats
1850
+ {
1851
+ auto & response = *ResponseEv->Record .MutableResponse ();
1852
+
1853
+ YQL_ENSURE (Stats);
1854
+
1855
+ ReportEventElapsedTime ();
1856
+
1857
+ Stats->FinishTs = TInstant::Now ();
1858
+ Stats->Finish ();
1859
+
1860
+ if (Stats->CollectStatsByLongTasks || CollectFullStats (Request.StatsMode )) {
1861
+ response.MutableResult ()->MutableStats ()->ClearTxPlansWithStats ();
1862
+ for (ui32 txId = 0 ; txId < Request.Transactions .size (); ++txId) {
1863
+ const auto & tx = Request.Transactions [txId].Body ;
1864
+ auto planWithStats = AddExecStatsToTxPlan (tx->GetPlan (), response.GetResult ().GetStats ());
1865
+ response.MutableResult ()->MutableStats ()->AddTxPlansWithStats (planWithStats);
1866
+ }
1867
+ }
1868
+
1869
+ if (Stats->CollectStatsByLongTasks ) {
1870
+ const auto & txPlansWithStats = response.GetResult ().GetStats ().GetTxPlansWithStats ();
1871
+ if (!txPlansWithStats.empty ()) {
1872
+ LOG_N (" Full stats: " << response.GetResult ().GetStats ());
1873
+ }
1874
+ }
1875
+ }
1876
+
1877
+ Request.Transactions .crop (0 );
1878
1878
this ->Send (Target, ResponseEv.release ());
1879
1879
1880
1880
for (auto channelPair: ResultChannelProxies) {
0 commit comments