@@ -177,10 +177,10 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
177
177
}
178
178
179
179
void ReportEventElapsedTime () {
180
- if (Stats) {
181
- ui64 elapsedMicros = TlsActivationContext-> GetCurrentEventTicksAsSeconds () * 1'000'000 ;
182
- Stats-> ExecuterCpuTime += TDuration::MicroSeconds (elapsedMicros) ;
183
- }
180
+ YQL_ENSURE (Stats);
181
+
182
+ ui64 elapsedMicros = TlsActivationContext-> GetCurrentEventTicksAsSeconds () * 1'000'000 ;
183
+ Stats-> ExecuterCpuTime += TDuration::MicroSeconds (elapsedMicros);
184
184
}
185
185
186
186
protected:
@@ -330,11 +330,10 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
330
330
}
331
331
332
332
YQL_ENSURE (channel.DstTask == 0 );
333
+ YQL_ENSURE (Stats);
333
334
334
- if (Stats) {
335
- Stats->ResultBytes += batch.Size ();
336
- Stats->ResultRows += batch.RowCount ();
337
- }
335
+ Stats->ResultBytes += batch.Size ();
336
+ Stats->ResultRows += batch.RowCount ();
338
337
339
338
LOG_T (" Got result, channelId: " << channel.Id << " , shardId: " << task.Meta .ShardId
340
339
<< " , inputIndex: " << channel.DstInputIndex << " , from: " << ev->Sender
@@ -391,7 +390,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
391
390
<< " , state: " << NYql::NDqProto::EComputeState_Name ((NYql::NDqProto::EComputeState) state.GetState ())
392
391
<< " , stats: " << state.GetStats ());
393
392
394
- if (Stats && state.HasStats () && Request.ProgressStatsPeriod ) {
393
+ YQL_ENSURE (Stats);
394
+
395
+ if (state.HasStats () && Request.ProgressStatsPeriod ) {
395
396
Stats->UpdateTaskStats (taskId, state.GetStats ());
396
397
auto now = TInstant::Now ();
397
398
if (LastProgressStats + Request.ProgressStatsPeriod <= now) {
@@ -401,7 +402,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
401
402
for (ui32 txId = 0 ; txId < Request.Transactions .size (); ++txId) {
402
403
const auto & tx = Request.Transactions [txId].Body ;
403
404
auto planWithStats = AddExecStatsToTxPlan (tx->GetPlan (), execStats);
404
- execStats.AddTxPlansWithStats (planWithStats) ;
405
+ (* execStats.MutableTxPlansWithStats ())[txId] = planWithStats ;
405
406
}
406
407
this ->Send (Target, progress.Release ());
407
408
LastProgressStats = now;
@@ -418,13 +419,11 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
418
419
if (Planner->CompletedCA (taskId, computeActor)) {
419
420
ExtraData[computeActor].Swap (state.MutableExtraData ());
420
421
421
- if (Stats) {
422
- Stats->AddComputeActorStats (
423
- computeActor.NodeId (),
424
- std::move (*state.MutableStats ()),
425
- TDuration::MilliSeconds (AggregationSettings.GetCollectLongTasksStatsTimeoutMs ())
426
- );
427
- }
422
+ Stats->AddComputeActorStats (
423
+ computeActor.NodeId (),
424
+ std::move (*state.MutableStats ()),
425
+ TDuration::MilliSeconds (AggregationSettings.GetCollectLongTasksStatsTimeoutMs ())
426
+ );
428
427
429
428
LastTaskId = taskId;
430
429
LastComputeActorId = computeActor.ToString ();
@@ -512,9 +511,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
512
511
auto now = TAppData::TimeProvider->Now ();
513
512
StartResolveTime = now;
514
513
515
- if (Stats) {
516
- Stats-> StartTs = now;
517
- }
514
+ YQL_ENSURE (Stats);
515
+
516
+ Stats-> StartTs = now;
518
517
}
519
518
520
519
TMaybe<size_t > FindReadRangesSource (const NKqpProto::TKqpPhyStage& stage) {
@@ -1167,8 +1166,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
1167
1166
: Nothing ();
1168
1167
1169
1168
YQL_ENSURE (!shardsResolved || nodeId);
1169
+ YQL_ENSURE (Stats);
1170
1170
1171
- if (shardId && Stats ) {
1171
+ if (shardId) {
1172
1172
Stats->AffectedShards .insert (*shardId);
1173
1173
}
1174
1174
@@ -1236,11 +1236,13 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
1236
1236
1237
1237
if (partitions.size () > 0 && source.GetSequentialInFlightShards () > 0 && partitions.size () > source.GetSequentialInFlightShards ()) {
1238
1238
auto [startShard, shardInfo] = MakeVirtualTablePartition (source, stageInfo, HolderFactory (), TypeEnv ());
1239
- if (Stats) {
1240
- for (auto & [shardId, _] : partitions) {
1241
- Stats->AffectedShards .insert (shardId);
1242
- }
1239
+
1240
+ YQL_ENSURE (Stats);
1241
+
1242
+ for (auto & [shardId, _] : partitions) {
1243
+ Stats->AffectedShards .insert (shardId);
1243
1244
}
1245
+
1244
1246
if (shardInfo.KeyReadRanges ) {
1245
1247
addPartiton (startShard, {}, shardInfo, source.GetSequentialInFlightShards ());
1246
1248
fillRangesForTasks ();
@@ -1507,6 +1509,8 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
1507
1509
THashMap<ui64, ui64> assignedShardsCount;
1508
1510
auto & stage = stageInfo.Meta .GetStage (stageInfo.Id );
1509
1511
1512
+ YQL_ENSURE (Stats);
1513
+
1510
1514
const auto & tableInfo = stageInfo.Meta .TableConstInfo ;
1511
1515
const auto & keyTypes = tableInfo->KeyColumnTypes ;
1512
1516
ui32 metaId = 0 ;
@@ -1535,7 +1539,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
1535
1539
nodeShards[nodeId].emplace_back (TShardInfoWithId (i.first , std::move (i.second )));
1536
1540
}
1537
1541
1538
- if (Stats && CollectProfileStats (Request.StatsMode )) {
1542
+ if (CollectProfileStats (Request.StatsMode )) {
1539
1543
for (auto && i : nodeShards) {
1540
1544
Stats->AddNodeShardsCount (stageInfo.Id .StageId , i.first , i.second .size ());
1541
1545
}
@@ -1720,7 +1724,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
1720
1724
ExecuterSpan.EndError (TStringBuilder () << NYql::NDqProto::StatusIds_StatusCode_Name (status));
1721
1725
}
1722
1726
1723
- FillResponseStats (Ydb::StatusIds::TIMEOUT);
1727
+ ResponseEv-> Record . MutableResponse ()-> SetStatus (Ydb::StatusIds::TIMEOUT);
1724
1728
1725
1729
// TEvAbortExecution can come from either ComputeActor or SessionActor (== Target).
1726
1730
if (abortSender != Target) {
@@ -1730,34 +1734,31 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
1730
1734
1731
1735
LOG_E (" Sending timeout response to: " << Target);
1732
1736
1733
- Request.Transactions .crop (0 );
1734
1737
this ->Shutdown ();
1735
1738
}
1736
1739
1737
- void FillResponseStats (Ydb::StatusIds::StatusCode status ) {
1740
+ void FillResponseStats () {
1738
1741
auto & response = *ResponseEv->Record .MutableResponse ();
1739
1742
1740
- response. SetStatus (status );
1743
+ YQL_ENSURE (Stats );
1741
1744
1742
- if (Stats) {
1743
- ReportEventElapsedTime ();
1745
+ ReportEventElapsedTime ();
1744
1746
1745
- Stats->FinishTs = TInstant::Now ();
1746
- Stats->Finish ();
1747
+ Stats->FinishTs = TInstant::Now ();
1748
+ Stats->Finish ();
1747
1749
1748
- if (Stats->CollectStatsByLongTasks || CollectFullStats (Request.StatsMode )) {
1749
- for (ui32 txId = 0 ; txId < Request.Transactions .size (); ++txId) {
1750
- const auto & tx = Request.Transactions [txId].Body ;
1751
- auto planWithStats = AddExecStatsToTxPlan (tx->GetPlan (), response.GetResult ().GetStats ());
1752
- response.MutableResult ()->MutableStats ()->AddTxPlansWithStats (planWithStats);
1753
- }
1750
+ if (Stats->CollectStatsByLongTasks || CollectFullStats (Request.StatsMode )) {
1751
+ for (ui32 txId = 0 ; txId < Request.Transactions .size (); ++txId) {
1752
+ const auto & tx = Request.Transactions [txId].Body ;
1753
+ auto planWithStats = AddExecStatsToTxPlan (tx->GetPlan (), response.GetResult ().GetStats ());
1754
+ (*response.MutableResult ()->MutableStats ()->MutableTxPlansWithStats ())[txId] = planWithStats;
1754
1755
}
1756
+ }
1755
1757
1756
- if (Stats->CollectStatsByLongTasks ) {
1757
- const auto & txPlansWithStats = response.GetResult ().GetStats ().GetTxPlansWithStats ();
1758
- if (!txPlansWithStats.empty ()) {
1759
- LOG_N (" Full stats: " << txPlansWithStats);
1760
- }
1758
+ if (Stats->CollectStatsByLongTasks ) {
1759
+ const auto & txPlansWithStats = response.GetResult ().GetStats ().GetTxPlansWithStats ();
1760
+ if (!txPlansWithStats.empty ()) {
1761
+ LOG_N (" Full stats: " << response.GetResult ().GetStats ());
1761
1762
}
1762
1763
}
1763
1764
}
@@ -1775,8 +1776,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
1775
1776
AlreadyReplied = true ;
1776
1777
auto & response = *ResponseEv->Record .MutableResponse ();
1777
1778
1778
- FillResponseStats (status);
1779
-
1779
+ response.SetStatus (status);
1780
1780
response.MutableIssues ()->Swap (issues);
1781
1781
1782
1782
LOG_T (" ReplyErrorAndDie. Response: " << response.DebugString ()
@@ -1795,7 +1795,6 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
1795
1795
ExecuterSpan.EndError (response.DebugString ());
1796
1796
ExecuterStateSpan.EndError (response.DebugString ());
1797
1797
1798
- Request.Transactions .crop (0 );
1799
1798
this ->Shutdown ();
1800
1799
}
1801
1800
@@ -1872,11 +1871,11 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
1872
1871
1873
1872
void PassAway () override {
1874
1873
YQL_ENSURE (AlreadyReplied && ResponseEv);
1875
-
1876
- // Actualize stats with the last stats from terminated CAs, but keep the status.
1877
- FillResponseStats (ResponseEv->Record .GetResponse ().GetStatus ());
1874
+ FillResponseStats ();
1878
1875
this ->Send (Target, ResponseEv.release ());
1879
1876
1877
+ Request.Transactions .crop (0 );
1878
+
1880
1879
for (auto channelPair: ResultChannelProxies) {
1881
1880
LOG_D (" terminate result channel " << channelPair.first << " proxy at " << channelPair.second ->SelfId ());
1882
1881
0 commit comments