@@ -77,27 +77,98 @@ struct TFollowerInfo : public TAtomicRefCount<TGuardedInfo> {
77
77
{}
78
78
};
79
79
80
- class TReplicaGuardian : public TActorBootstrapped <TReplicaGuardian> {
81
- TIntrusiveConstPtr<TGuardedInfo> Info;
80
+ template <typename TDerived>
81
+ class TBaseGuardian : public TActorBootstrapped <TDerived> {
82
+ protected:
82
83
const TActorId Replica;
83
84
const TActorId Guard;
84
85
85
- ui64 Signature;
86
- TInstant DowntimeFrom;
86
+ TInstant DowntimeFrom = TInstant::Max();
87
87
ui64 LastCookie = 0 ;
88
+ bool ReplicaMissingReported = false ;
89
+ TMonotonic LastReplicaMissing = TMonotonic::Max();
90
+
91
+ TBaseGuardian (TActorId replica, TActorId guard)
92
+ : Replica(replica)
93
+ , Guard(guard)
94
+ {}
95
+
96
+ void Gone () {
97
+ TDerived::Send (Guard, new TEvents::TEvGone ());
98
+ PassAway ();
99
+ }
88
100
89
101
void PassAway () override {
90
- if (Replica.NodeId () != SelfId ().NodeId ())
91
- Send (TActivationContext::InterconnectProxy (Replica.NodeId ()), new TEvents::TEvUnsubscribe);
102
+ if (Replica.NodeId () != TDerived:: SelfId ().NodeId ())
103
+ TDerived:: Send (TActivationContext::InterconnectProxy (Replica.NodeId ()), new TEvents::TEvUnsubscribe);
92
104
93
105
if (KIKIMR_ALLOW_SSREPLICA_PROBES) {
94
106
const TActorId ssProxyId = MakeStateStorageProxyID ();
95
- Send (ssProxyId, new TEvStateStorage::TEvReplicaProbeUnsubscribe (Replica));
107
+ TDerived:: Send (ssProxyId, new TEvStateStorage::TEvReplicaProbeUnsubscribe (Replica));
96
108
}
97
109
98
- TActor ::PassAway ();
110
+ TActorBootstrapped<TDerived> ::PassAway ();
99
111
}
100
112
113
+ void ReplicaMissing (bool value) {
114
+ if (ReplicaMissingReported < value) {
115
+ const TMonotonic now = TActivationContext::Monotonic ();
116
+ if (LastReplicaMissing == TMonotonic::Max ()) {
117
+ // this if the first time in row we report replica missing
118
+ LastReplicaMissing = now;
119
+ } else {
120
+ // make it actually "missing" only after a specific amount of time
121
+ value = LastReplicaMissing + TDuration::Seconds (3 ) < now;
122
+ }
123
+ } else if (value < ReplicaMissingReported) {
124
+ LastReplicaMissing = TMonotonic::Max ();
125
+ }
126
+ if (value != ReplicaMissingReported) {
127
+ TDerived::Send (Guard, new TEvPrivate::TEvReplicaMissing (value));
128
+ ReplicaMissingReported = true ;
129
+ }
130
+ }
131
+
132
+ void Handle (TEvents::TEvUndelivered::TPtr& ev) {
133
+ if (ev->Cookie == LastCookie) {
134
+ ReplicaMissing (true );
135
+ SomeSleep ();
136
+ }
137
+ }
138
+
139
+ void HandleThenSomeSleep (TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
140
+ if (ev->Cookie == LastCookie) {
141
+ ++LastCookie;
142
+ SomeSleep ();
143
+ }
144
+ }
145
+
146
+ void HandleThenRequestInfo (TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
147
+ if (ev->Cookie == LastCookie) {
148
+ ++LastCookie;
149
+ static_cast <TDerived&>(*this ).RequestInfo ();
150
+ }
151
+ }
152
+
153
+ void SomeSleep () {
154
+ const TInstant now = TActivationContext::Now ();
155
+ if (DowntimeFrom > now) {
156
+ DowntimeFrom = now;
157
+ } else if (DowntimeFrom + TDuration::Seconds (15 ) < now) {
158
+ return Gone ();
159
+ }
160
+
161
+ TDerived::Become (&TDerived::StateSleep, TDuration::MilliSeconds (250 ), new TEvents::TEvWakeup ());
162
+ }
163
+ };
164
+
165
+ class TReplicaGuardian : public TBaseGuardian <TReplicaGuardian> {
166
+ TIntrusiveConstPtr<TGuardedInfo> Info;
167
+
168
+ ui64 Signature;
169
+
170
+ friend class TBaseGuardian ;
171
+
101
172
void RequestInfo () {
102
173
if (KIKIMR_ALLOW_SSREPLICA_PROBES) {
103
174
const TActorId ssProxyId = MakeStateStorageProxyID ();
@@ -130,49 +201,6 @@ class TReplicaGuardian : public TActorBootstrapped<TReplicaGuardian> {
130
201
Become (&TThis::StateUpdate);
131
202
}
132
203
133
- void Gone () {
134
- Send (Guard, new TEvents::TEvGone ());
135
- PassAway ();
136
- }
137
-
138
- void Handle (TEvents::TEvUndelivered::TPtr& ev) {
139
- if (ev->Cookie == LastCookie) {
140
- // We could not deliver the last message, report to guardian that
141
- // this replica is missing. We don't do anything else, as this
142
- // error is assumed permanent until we disconnect, in which case
143
- // we assume the target node may have been restarted and
144
- // reconfigured.
145
- Send (Guard, new TEvPrivate::TEvReplicaMissing (true ));
146
- }
147
- }
148
-
149
- void HandleThenSomeSleep (TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
150
- if (ev->Cookie == LastCookie) {
151
- ++LastCookie;
152
- Send (Guard, new TEvPrivate::TEvReplicaMissing (false ));
153
- SomeSleep ();
154
- }
155
- }
156
-
157
- void HandleThenRequestInfo (TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
158
- if (ev->Cookie == LastCookie) {
159
- ++LastCookie;
160
- Send (Guard, new TEvPrivate::TEvReplicaMissing (false ));
161
- RequestInfo ();
162
- }
163
- }
164
-
165
- void SomeSleep () {
166
- const TInstant now = TActivationContext::Now ();
167
- if (DowntimeFrom > now) {
168
- DowntimeFrom = now;
169
- } else if (DowntimeFrom + TDuration::Seconds (15 ) < now) {
170
- return Gone ();
171
- }
172
-
173
- Become (&TThis::StateSleep, TDuration::MilliSeconds (250 ), new TEvents::TEvWakeup ());
174
- }
175
-
176
204
void Demoted () {
177
205
Send (Info->Leader , new TEvTablet::TEvDemoted (false ));
178
206
return PassAway ();
@@ -189,6 +217,7 @@ class TReplicaGuardian : public TActorBootstrapped<TReplicaGuardian> {
189
217
Signature = record.GetSignature ();
190
218
191
219
DowntimeFrom = TInstant::Max ();
220
+ ReplicaMissing (false );
192
221
193
222
if (status == NKikimrProto::OK) {
194
223
const ui32 gen = record.GetCurrentGeneration ();
@@ -223,11 +252,9 @@ class TReplicaGuardian : public TActorBootstrapped<TReplicaGuardian> {
223
252
}
224
253
225
254
TReplicaGuardian (TGuardedInfo *info, TActorId replica, TActorId guard)
226
- : Info(info)
227
- , Replica(replica)
228
- , Guard(guard)
255
+ : TBaseGuardian(replica, guard)
256
+ , Info(info)
229
257
, Signature(0 )
230
- , DowntimeFrom(TInstant::Max())
231
258
{}
232
259
233
260
void Bootstrap () {
@@ -240,7 +267,7 @@ class TReplicaGuardian : public TActorBootstrapped<TReplicaGuardian> {
240
267
cFunc (TEvStateStorage::TEvReplicaProbeConnected::EventType, MakeRequest);
241
268
cFunc (TEvStateStorage::TEvReplicaProbeDisconnected::EventType, Gone);
242
269
cFunc (TEvStateStorage::TEvReplicaShutdown::EventType, Gone);
243
- hFunc (TEvents::TEvUndelivered, Handle );
270
+ hFunc (TEvents::TEvUndelivered, TBaseGuardian:: Handle );
244
271
hFunc (TEvInterconnect::TEvNodeDisconnected, HandleThenSomeSleep);
245
272
cFunc (TEvents::TEvPoisonPill::EventType, PassAway);
246
273
}
@@ -250,7 +277,7 @@ class TReplicaGuardian : public TActorBootstrapped<TReplicaGuardian> {
250
277
switch (ev->GetTypeRewrite ()) {
251
278
hFunc (TEvStateStorage::TEvReplicaInfo, Handle );
252
279
cFunc (TEvStateStorage::TEvReplicaShutdown::EventType, Gone);
253
- hFunc (TEvents::TEvUndelivered, Handle );
280
+ hFunc (TEvents::TEvUndelivered, TBaseGuardian:: Handle );
254
281
hFunc (TEvInterconnect::TEvNodeDisconnected, HandleThenRequestInfo);
255
282
cFunc (TEvents::TEvPoisonPill::EventType, PassAway);
256
283
}
@@ -268,20 +295,15 @@ class TReplicaGuardian : public TActorBootstrapped<TReplicaGuardian> {
268
295
switch (ev->GetTypeRewrite ()) {
269
296
hFunc (TEvStateStorage::TEvReplicaInfo, Handle );
270
297
cFunc (TEvStateStorage::TEvReplicaShutdown::EventType, Gone);
271
- hFunc (TEvents::TEvUndelivered, Handle );
298
+ hFunc (TEvents::TEvUndelivered, TBaseGuardian:: Handle );
272
299
hFunc (TEvInterconnect::TEvNodeDisconnected, HandleThenSomeSleep);
273
300
cFunc (TEvents::TEvPoisonPill::EventType, PassAway);
274
301
}
275
302
}
276
303
};
277
304
278
- class TFollowerGuardian : public TActorBootstrapped <TFollowerGuardian> {
305
+ class TFollowerGuardian : public TBaseGuardian <TFollowerGuardian> {
279
306
TIntrusiveConstPtr<TFollowerInfo> Info;
280
- const TActorId Replica;
281
- const TActorId Guard;
282
-
283
- TInstant DowntimeFrom;
284
- ui64 LastCookie = 0 ;
285
307
286
308
void RefreshInfo (TEvPrivate::TEvRefreshFollowerState::TPtr &ev) {
287
309
Info = ev->Get ()->FollowerInfo ;
@@ -312,67 +334,23 @@ class TFollowerGuardian : public TActorBootstrapped<TFollowerGuardian> {
312
334
Become (&TThis::StateCalm);
313
335
}
314
336
315
- void Handle (TEvents::TEvUndelivered::TPtr& ev) {
316
- if (ev->Cookie == LastCookie) {
317
- // We could not deliver the last message, report to guardian that
318
- // this replica is missing. We don't do anything else, as this
319
- // error is assumed permanent until we disconnect, in which case
320
- // we assume the target node may have been restarted and
321
- // reconfigured.
322
- Send (Guard, new TEvPrivate::TEvReplicaMissing (true ));
323
- }
324
- }
325
-
326
- void Handle (TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
327
- if (ev->Cookie == LastCookie) {
328
- ++LastCookie;
329
- Send (Guard, new TEvPrivate::TEvReplicaMissing (false ));
330
- SomeSleep ();
331
- }
332
- }
333
-
334
- void SomeSleep () {
335
- const TInstant now = TActivationContext::Now ();
336
- if (DowntimeFrom > now) {
337
- DowntimeFrom = now;
338
- } else if (DowntimeFrom + TDuration::Seconds (15 ) < now) {
339
- return Gone ();
340
- }
341
-
342
- Become (&TThis::StateSleep, TDuration::MilliSeconds (250 ), new TEvents::TEvWakeup ());
343
- }
344
-
345
337
void PassAway () override {
346
338
Send (Replica, new TEvStateStorage::TEvReplicaUnregFollower (Info->TabletID , Info->Follower ));
347
- if (Replica.NodeId () != SelfId ().NodeId ())
348
- Send (TActivationContext::InterconnectProxy (Replica.NodeId ()), new TEvents::TEvUnsubscribe ());
349
-
350
- if (KIKIMR_ALLOW_SSREPLICA_PROBES) {
351
- const TActorId ssProxyId = MakeStateStorageProxyID ();
352
- Send (ssProxyId, new TEvStateStorage::TEvReplicaProbeUnsubscribe (Replica));
353
- }
354
-
355
- TActor::PassAway ();
356
- }
357
-
358
- void Gone () {
359
- Send (Guard, new TEvents::TEvGone ());
360
- PassAway ();
339
+ TBaseGuardian::PassAway ();
361
340
}
362
341
363
342
void Ping () {
364
343
DowntimeFrom = TInstant::Max ();
365
344
}
345
+
366
346
public:
367
347
static constexpr NKikimrServices::TActivity::EType ActorActivityType () {
368
348
return NKikimrServices::TActivity::SS_REPLICA_GUARDIAN;
369
349
}
370
350
371
351
TFollowerGuardian (TFollowerInfo *info, const TActorId replica, const TActorId guard)
372
- : Info(info)
373
- , Replica(replica)
374
- , Guard(guard)
375
- , DowntimeFrom(TInstant::Max())
352
+ : TBaseGuardian(replica, guard)
353
+ , Info(info)
376
354
{}
377
355
378
356
void Bootstrap () {
@@ -384,8 +362,8 @@ class TFollowerGuardian : public TActorBootstrapped<TFollowerGuardian> {
384
362
hFunc (TEvPrivate::TEvRefreshFollowerState, UpdateInfo);
385
363
cFunc (TEvStateStorage::TEvReplicaProbeConnected::EventType, MakeRequest);
386
364
cFunc (TEvStateStorage::TEvReplicaProbeDisconnected::EventType, Gone);
387
- hFunc (TEvents::TEvUndelivered, Handle );
388
- hFunc (TEvInterconnect::TEvNodeDisconnected, Handle );
365
+ hFunc (TEvents::TEvUndelivered, TBaseGuardian:: Handle );
366
+ hFunc (TEvInterconnect::TEvNodeDisconnected, HandleThenSomeSleep );
389
367
cFunc (TEvTablet::TEvPing::EventType, Ping);
390
368
cFunc (TEvents::TEvPoisonPill::EventType, PassAway);
391
369
cFunc (TEvStateStorage::TEvReplicaShutdown::EventType, Gone);
@@ -425,11 +403,18 @@ class TTabletGuardian : public TActorBootstrapped<TTabletGuardian> {
425
403
return PassAway ();
426
404
}
427
405
406
+ void PassAway () override {
407
+ const TActorId proxyActorID = MakeStateStorageProxyID ();
408
+ TActivationContext::Send (new IEventHandle (TEvents::TSystem::Unsubscribe, 0 , proxyActorID, SelfId (), nullptr , 0 ));
409
+ TActorBootstrapped::PassAway ();
410
+ }
411
+
428
412
void Handle (TEvStateStorage::TEvResolveReplicasList::TPtr &ev) {
429
413
const TVector<TActorId> &replicasList = ev->Get ()->Replicas ;
430
414
Y_ABORT_UNLESS (!replicasList.empty (), " must not happens, guardian must be created over active tablet" );
431
415
432
416
const ui32 replicaSz = replicasList.size ();
417
+ Y_ABORT_UNLESS (ReplicaGuardians.empty () || ReplicaGuardians.size () == replicaSz);
433
418
434
419
TVector<std::pair<TActorId, TActorId>> updatedReplicaGuardians;
435
420
updatedReplicaGuardians.reserve (replicaSz);
@@ -519,16 +504,16 @@ class TTabletGuardian : public TActorBootstrapped<TTabletGuardian> {
519
504
return ret; // true on erase, false on outdated notify
520
505
}
521
506
522
- void SendResolveRequest (TDuration delay) {
507
+ void SendResolveRequest (TDuration delay, bool initial ) {
523
508
const ui64 tabletId = Info ? Info->TabletID : FollowerInfo->TabletID ;
524
509
const TActorId proxyActorID = MakeStateStorageProxyID ();
525
510
526
511
if (delay == TDuration::Zero ()) {
527
- Send (proxyActorID, new TEvStateStorage::TEvResolveReplicas (tabletId), IEventHandle::FlagTrackDelivery);
512
+ Send (proxyActorID, new TEvStateStorage::TEvResolveReplicas (tabletId, initial ), IEventHandle::FlagTrackDelivery);
528
513
} else {
529
514
TActivationContext::Schedule (
530
515
delay,
531
- new IEventHandle (proxyActorID, SelfId (), new TEvStateStorage::TEvResolveReplicas (tabletId), IEventHandle::FlagTrackDelivery)
516
+ new IEventHandle (proxyActorID, SelfId (), new TEvStateStorage::TEvResolveReplicas (tabletId, initial ), IEventHandle::FlagTrackDelivery)
532
517
);
533
518
}
534
519
@@ -543,7 +528,7 @@ class TTabletGuardian : public TActorBootstrapped<TTabletGuardian> {
543
528
void HandleGoneCalm (TEvents::TEvGone::TPtr &ev) {
544
529
if (ReplicaDown (ev->Sender )) {
545
530
const ui64 rndDelay = AppData ()->RandomProvider ->GenRand () % 150 ;
546
- SendResolveRequest (TDuration::MilliSeconds (150 + rndDelay));
531
+ SendResolveRequest (TDuration::MilliSeconds (150 + rndDelay), false );
547
532
}
548
533
}
549
534
@@ -641,7 +626,7 @@ class TTabletGuardian : public TActorBootstrapped<TTabletGuardian> {
641
626
{}
642
627
643
628
void Bootstrap () {
644
- SendResolveRequest (TDuration::Zero ());
629
+ SendResolveRequest (TDuration::Zero (), true );
645
630
}
646
631
647
632
STATEFN (StateResolve) {
0 commit comments