@@ -58,7 +58,7 @@ TColumnShardScan::TColumnShardScan(const TActorId& columnShardActorId, const TAc
58
58
, DataFormat(dataFormat)
59
59
, TabletId(tabletId)
60
60
, ReadMetadataRange(readMetadataRange)
61
- , Deadline(TInstant::Now() + ( timeout ? timeout + SCAN_HARD_TIMEOUT_GAP : SCAN_HARD_TIMEOUT) )
61
+ , Timeout( timeout ? timeout + SCAN_HARD_TIMEOUT_GAP : SCAN_HARD_TIMEOUT)
62
62
, ScanCountersPool(scanCountersPool)
63
63
, Stats(NTracing::TTraceClient::GetLocalClient(" SHARD" , ::ToString(TabletId)/* , "SCAN_TXID:" + ::ToString(TxId)*/ ))
64
64
, ComputeShardingPolicy(computeShardingPolicy) {
@@ -72,7 +72,6 @@ void TColumnShardScan::Bootstrap(const TActorContext& ctx) {
72
72
);
73
73
auto g = Stats->MakeGuard (" bootstrap" );
74
74
ScanActorId = ctx.SelfID ;
75
- Schedule (Deadline, new TEvents::TEvWakeup);
76
75
77
76
Y_ABORT_UNLESS (!ScanIterator);
78
77
ResourceSubscribeActorId = ctx.Register (new NResourceBroker::NSubscribe::TActor (TabletId, SelfId ()));
@@ -88,6 +87,7 @@ void TColumnShardScan::Bootstrap(const TActorContext& ctx) {
88
87
SendScanError (" scanner_start_error:" + startResult.GetErrorMessage ());
89
88
Finish (NColumnShard::TScanCounters::EStatusFinish::ProblemOnStart);
90
89
} else {
90
+ ScheduleWakeup (GetDeadline ());
91
91
92
92
// propagate self actor id // TODO: FlagSubscribeOnSession ?
93
93
Send (ScanComputeActorId, new NKqp::TEvKqpCompute::TEvScanInitActor (ScanId, ctx.SelfID , ScanGen, TabletId), IEventHandle::FlagTrackDelivery);
@@ -176,7 +176,11 @@ void TColumnShardScan::HandleScan(TEvents::TEvWakeup::TPtr& /*ev*/) {
176
176
" Scan " << ScanActorId << " guard execution timeout"
177
177
<< " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
178
178
179
- Finish (NColumnShard::TScanCounters::EStatusFinish::Deadline);
179
+ if (TMonotonic::Now () >= GetDeadline ()) {
180
+ Finish (NColumnShard::TScanCounters::EStatusFinish::Deadline);
181
+ } else {
182
+ ScheduleWakeup (GetDeadline ());
183
+ }
180
184
}
181
185
182
186
bool TColumnShardScan::ProduceResults () noexcept {
@@ -377,6 +381,7 @@ bool TColumnShardScan::SendResult(bool pageFault, bool lastBatch) {
377
381
}
378
382
ReadMetadataRange->OnReplyConstruction (TabletId, *Result);
379
383
AckReceivedInstant.reset ();
384
+ LastResultInstant = TMonotonic::Now ();
380
385
381
386
Send (ScanComputeActorId, Result.Release (), IEventHandle::FlagTrackDelivery); // TODO: FlagSubscribeOnSession ?
382
387
@@ -414,4 +419,17 @@ void TColumnShardScan::ReportStats() {
414
419
Bytes = 0 ;
415
420
}
416
421
422
+ void TColumnShardScan::ScheduleWakeup (const TMonotonic deadline) {
423
+ if (deadline != TMonotonic::Max ()) {
424
+ Schedule (deadline, new TEvents::TEvWakeup);
425
+ }
426
+ }
427
+
428
+ TMonotonic TColumnShardScan::GetDeadline () const {
429
+ AFL_VERIFY (StartInstant);
430
+ if (LastResultInstant) {
431
+ return *LastResultInstant + Timeout;
432
+ }
433
+ return *StartInstant + Timeout;
434
+ }
417
435
}
0 commit comments