@@ -206,50 +206,16 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
206
206
);
207
207
}
208
208
209
- bool LogStatsByLongTasks () const {
210
- return Stats->CollectStatsByLongTasks && HasOlapTable;
211
- }
212
-
213
- void FillResponseStats (Ydb::StatusIds::StatusCode status) {
214
- auto & response = *ResponseEv->Record .MutableResponse ();
215
-
216
- response.SetStatus (status);
217
-
218
- if (Stats) {
219
- ReportEventElapsedTime ();
220
-
221
- Stats->FinishTs = TInstant::Now ();
222
- Stats->Finish ();
223
-
224
- if (LogStatsByLongTasks () || CollectFullStats (Request.StatsMode )) {
225
- for (ui32 txId = 0 ; txId < Request.Transactions .size (); ++txId) {
226
- const auto & tx = Request.Transactions [txId].Body ;
227
- auto planWithStats = AddExecStatsToTxPlan (tx->GetPlan (), response.GetResult ().GetStats ());
228
- response.MutableResult ()->MutableStats ()->AddTxPlansWithStats (planWithStats);
229
- }
230
- }
231
-
232
- if (LogStatsByLongTasks ()) {
233
- const auto & txPlansWithStats = response.GetResult ().GetStats ().GetTxPlansWithStats ();
234
- if (!txPlansWithStats.empty ()) {
235
- LOG_N (" Full stats: " << txPlansWithStats);
236
- }
237
- }
238
-
239
- Stats.reset ();
240
- }
241
- }
242
-
243
209
void Finalize () {
210
+ YQL_ENSURE (!AlreadyReplied);
211
+
244
212
if (LocksBroken) {
245
213
return ReplyErrorAndDie (
246
214
Ydb::StatusIds::ABORTED,
247
215
YqlIssue (TPosition (), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, " Transaction locks invalidated. Unknown table." ));
248
216
}
249
217
250
- auto & response = *ResponseEv->Record .MutableResponse ();
251
-
252
- FillResponseStats (Ydb::StatusIds::SUCCESS);
218
+ ResponseEv->Record .MutableResponse ()->SetStatus (Ydb::StatusIds::SUCCESS);
253
219
Counters->TxProxyMon ->ReportStatusOK ->Inc ();
254
220
255
221
auto addLocks = [this ](const auto & data) {
@@ -289,7 +255,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
289
255
if (LockHandle) {
290
256
ResponseEv->LockHandle = std::move (LockHandle);
291
257
}
292
- BuildLocks (*response. MutableResult ()->MutableLocks (), Locks);
258
+ BuildLocks (*ResponseEv-> Record . MutableResponse ()-> MutableResult ()->MutableLocks (), Locks);
293
259
}
294
260
295
261
auto resultSize = ResponseEv->GetByteSize ();
@@ -315,9 +281,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
315
281
316
282
ExecuterSpan.EndOk ();
317
283
318
- Request.Transactions .crop (0 );
319
- LOG_D (" Sending response to: " << Target << " , results: " << ResponseEv->ResultsSize ());
320
- Send (Target, ResponseEv.release ());
284
+ AlreadyReplied = true ;
321
285
PassAway ();
322
286
}
323
287
@@ -357,6 +321,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
357
321
return " WaitSnapshotState" ;
358
322
} else if (func == &TThis::WaitResolveState) {
359
323
return " WaitResolveState" ;
324
+ } else if (func == &TThis::WaitShutdownState) {
325
+ return " WaitShutdownState" ;
360
326
} else {
361
327
return TBase::CurrentStateFuncName ();
362
328
}
@@ -586,7 +552,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
586
552
if (ev->Get ()->Record .GetState () == NDqProto::COMPUTE_STATE_FAILURE) {
587
553
CancelProposal (0 );
588
554
}
589
- HandleComputeStats (ev);
555
+ HandleComputeState (ev);
590
556
}
591
557
592
558
void HandlePrepare (TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
@@ -1067,7 +1033,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
1067
1033
hFunc (TEvInterconnect::TEvNodeDisconnected, HandleDisconnected);
1068
1034
hFunc (TEvKqpNode::TEvStartKqpTasksResponse, HandleStartKqpTasksResponse);
1069
1035
hFunc (TEvTxProxy::TEvProposeTransactionStatus, HandleExecute);
1070
- hFunc (TEvDqCompute::TEvState, HandleComputeStats );
1036
+ hFunc (TEvDqCompute::TEvState, HandleComputeState );
1071
1037
hFunc (NYql::NDq::TEvDqCompute::TEvChannelData, HandleChannelData);
1072
1038
hFunc (TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck);
1073
1039
hFunc (TEvKqp::TEvAbortExecution, HandleExecute);
@@ -2685,6 +2651,23 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
2685
2651
}
2686
2652
}
2687
2653
2654
+ void Shutdown () override {
2655
+ if (Planner) {
2656
+ if (Planner->GetPendingComputeTasks ().empty () && Planner->GetPendingComputeActors ().empty ()) {
2657
+ LOG_I (" Shutdown immediately - nothing to wait" );
2658
+ PassAway ();
2659
+ } else {
2660
+ this ->Become (&TThis::WaitShutdownState);
2661
+ LOG_I (" Waiting for shutdown of " << Planner->GetPendingComputeTasks ().size () << " tasks and "
2662
+ << Planner->GetPendingComputeActors ().size () << " compute actors" );
2663
+ // TODO(ilezhankin): the CA awaiting timeout should be configurable.
2664
+ TActivationContext::Schedule (TDuration::Seconds (10 ), new IEventHandle (SelfId (), SelfId (), new TEvents::TEvPoison));
2665
+ }
2666
+ } else {
2667
+ PassAway ();
2668
+ }
2669
+ }
2670
+
2688
2671
void PassAway () override {
2689
2672
auto totalTime = TInstant::Now () - StartTime;
2690
2673
Counters->Counters ->DataTxTotalTimeHistogram ->Collect (totalTime.MilliSeconds ());
@@ -2702,6 +2685,54 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
2702
2685
TBase::PassAway ();
2703
2686
}
2704
2687
2688
+ STATEFN (WaitShutdownState) {
2689
+ switch (ev->GetTypeRewrite ()) {
2690
+ hFunc (TEvDqCompute::TEvState, HandleShutdown);
2691
+ hFunc (TEvInterconnect::TEvNodeDisconnected, HandleShutdown);
2692
+ hFunc (TEvents::TEvPoison, HandleShutdown);
2693
+ default :
2694
+ LOG_E (" Unexpected event: " << ev->GetTypeName ()); // ignore all other events
2695
+ }
2696
+ }
2697
+
2698
+ void HandleShutdown (TEvDqCompute::TEvState::TPtr& ev) {
2699
+ HandleComputeStats (ev);
2700
+
2701
+ if (Planner->GetPendingComputeTasks ().empty () && Planner->GetPendingComputeActors ().empty ()) {
2702
+ PassAway ();
2703
+ }
2704
+ }
2705
+
2706
+ void HandleShutdown (TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
2707
+ const auto nodeId = ev->Get ()->NodeId ;
2708
+ LOG_N (" Node has disconnected while shutdown: " << nodeId);
2709
+
2710
+ YQL_ENSURE (Planner);
2711
+
2712
+ for (const auto & task : TasksGraph.GetTasks ()) {
2713
+ if (task.Meta .NodeId == nodeId && !task.Meta .Completed ) {
2714
+ if (task.ComputeActorId ) {
2715
+ Planner->CompletedCA (task.Id , task.ComputeActorId );
2716
+ } else {
2717
+ Planner->TaskNotStarted (task.Id );
2718
+ }
2719
+ }
2720
+ }
2721
+
2722
+ if (Planner->GetPendingComputeTasks ().empty () && Planner->GetPendingComputeActors ().empty ()) {
2723
+ PassAway ();
2724
+ }
2725
+ }
2726
+
2727
+ void HandleShutdown (TEvents::TEvPoison::TPtr& ev) {
2728
+ // Self-poison means timeout - don't wait anymore.
2729
+ LOG_I (" Timed out on waiting for Compute Actors to finish - forcing shutdown" );
2730
+
2731
+ if (ev->Sender == SelfId ()) {
2732
+ PassAway ();
2733
+ }
2734
+ }
2735
+
2705
2736
private:
2706
2737
void ReplyTxStateUnknown (ui64 shardId) {
2707
2738
auto message = TStringBuilder () << " Tx state unknown for shard " << shardId << " , txid " << TxId;
0 commit comments