@@ -19,43 +19,38 @@ static void ResetInterconnectProxyConfig(ui32 nodeId, const TActorContext &ctx)
19
19
20
20
void TDynamicNodeResolverBase::Bootstrap (const TActorContext &ctx)
21
21
{
22
- auto dinfo = AppData (ctx)->DomainsInfo ;
23
-
24
- NTabletPipe::TClientRetryPolicy retryPolicy = {
25
- .RetryLimitCount = 12 ,
26
- .MinRetryTime = TDuration::MilliSeconds (50 ),
27
- .MaxRetryTime = TDuration::Seconds (2 )
28
- };
22
+ LOG_D (" New cache miss: nodeId=" << NodeId << " , deadline=" << Deadline);
29
23
30
- auto pipe = NTabletPipe::CreateClient (ctx.SelfID , MakeNodeBrokerID (), NTabletPipe::TClientConfig (retryPolicy));
31
- NodeBrokerPipe = ctx.RegisterWithSameMailbox (pipe);
24
+ OpenPipe (ctx);
32
25
33
26
TAutoPtr<TEvNodeBroker::TEvResolveNode> request = new TEvNodeBroker::TEvResolveNode;
34
27
request->Record .SetNodeId (NodeId);
35
- NTabletPipe::SendData (ctx, NodeBrokerPipe, request.Release ());
28
+ NTabletPipe::SendData (ctx, Config-> NodeBrokerPipe , request.Release ());
36
29
37
30
Become (&TDynamicNodeResolverBase::StateWork);
38
- if (Deadline != TInstant::Max ()) {
39
- Schedule (Deadline, new TEvents::TEvWakeup);
31
+ bool newEarliestDeadline = Config->PendingCacheMisses .empty () || Config->PendingCacheMisses .top ().Deadline > Deadline;
32
+ if (Deadline != TInstant::Max () && newEarliestDeadline) {
33
+ LOG_D (" Schedule wakeup for new earliest deadline " << Deadline);
34
+ ctx.Schedule (Deadline, std::make_unique<IEventHandle>(Owner, TActorId (), new TEvents::TEvWakeup));
40
35
}
36
+ Config->PendingCacheMisses .emplace (SelfId (), Deadline);
41
37
}
42
38
43
- void TDynamicNodeResolverBase::Die (const TActorContext &ctx)
44
- {
45
- if (NodeBrokerPipe)
46
- NTabletPipe::CloseClient (ctx, NodeBrokerPipe);
47
- TBase::Die (ctx);
48
- }
49
-
50
- void TDynamicNodeResolverBase::ReplyWithErrorAndDie (const TActorContext &ctx)
39
+ void TDynamicNodeResolverBase::ReplyWithErrorAndDie (const TString& error, const TActorContext &ctx)
51
40
{
41
+ LOG_D (" Cache miss failed: nodeId=" << NodeId << " , error=" << error);
52
42
OnError (ctx);
53
43
Die (ctx);
54
44
}
55
45
56
46
void TDynamicNodeResolverBase::Handle (TEvNodeBroker::TEvResolvedNode::TPtr &ev, const TActorContext &ctx)
57
47
{
58
48
auto &rec = ev->Get ()->Record ;
49
+
50
+ LOG_D (" Handle TEvNodeBroker::TEvResolvedNode("
51
+ << " nodeId=" << NodeId
52
+ << " , status=" << rec.GetStatus ().GetCode () << " )" );
53
+
59
54
TDynamicConfig::TDynamicNodeInfo oldNode;
60
55
auto it = Config->DynamicNodes .find (NodeId);
61
56
bool exists = it != Config->DynamicNodes .end ();
@@ -71,7 +66,7 @@ void TDynamicNodeResolverBase::Handle(TEvNodeBroker::TEvResolvedNode::TPtr &ev,
71
66
ResetInterconnectProxyConfig (NodeId, ctx);
72
67
ListNodesCache->Invalidate (); // node was erased
73
68
}
74
- ReplyWithErrorAndDie (ctx);
69
+ ReplyWithErrorAndDie (" Unknown node " , ctx);
75
70
return ;
76
71
}
77
72
@@ -85,14 +80,30 @@ void TDynamicNodeResolverBase::Handle(TEvNodeBroker::TEvResolvedNode::TPtr &ev,
85
80
ResetInterconnectProxyConfig (NodeId, ctx);
86
81
Config->DynamicNodes .emplace (NodeId, node);
87
82
83
+ LOG_D (" Cache miss succeed: nodeId=" << NodeId);
88
84
OnSuccess (ctx);
89
85
Die (ctx);
90
86
}
91
87
92
88
void TDynamicNodeResolverBase::Handle (TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActorContext &ctx)
93
89
{
94
90
if (ev->Get ()->Status != NKikimrProto::OK)
95
- ReplyWithErrorAndDie (ctx);
91
+ ReplyWithErrorAndDie (" Pipe failed to connect" , ctx);
92
+ }
93
+
94
+ void TDynamicNodeResolverBase::HandleWakeup (const TActorContext &ctx) {
95
+ ReplyWithErrorAndDie (" Deadline exceeded" , ctx);
96
+ }
97
+
98
+ void TDynamicNodeResolverBase::HandleClientDestroyed (const TActorContext &ctx) {
99
+ ReplyWithErrorAndDie (" Pipe was destroyed" , ctx);
100
+ }
101
+
102
+ void TDynamicNodeResolverBase::OpenPipe (const TActorContext &ctx) {
103
+ if (!Config->NodeBrokerPipe ) {
104
+ auto pipe = NTabletPipe::CreateClient (Owner, MakeNodeBrokerID ());
105
+ Config->NodeBrokerPipe = ctx.RegisterWithSameMailbox (pipe);
106
+ }
96
107
}
97
108
98
109
void TDynamicNodeResolver::OnSuccess (const TActorContext &ctx)
@@ -180,19 +191,19 @@ void TDynamicNameserver::ReplaceNameserverSetup(TIntrusivePtr<TTableNameserverSe
180
191
181
192
void TDynamicNameserver::Die (const TActorContext &ctx)
182
193
{
183
- for (auto &pipe : NodeBrokerPipes ) {
184
- if (pipe )
185
- NTabletPipe::CloseClient (ctx, pipe );
194
+ for (auto &config : DynamicConfigs ) {
195
+ if (config-> NodeBrokerPipe )
196
+ NTabletPipe::CloseClient (ctx, config-> NodeBrokerPipe );
186
197
}
187
198
TBase::Die (ctx);
188
199
}
189
200
190
201
void TDynamicNameserver::OpenPipe (ui32 domain,
191
202
const TActorContext &ctx)
192
203
{
193
- if (!NodeBrokerPipes [domain]) {
204
+ if (!DynamicConfigs [domain]-> NodeBrokerPipe ) {
194
205
auto pipe = NTabletPipe::CreateClient (ctx.SelfID , MakeNodeBrokerID ());
195
- NodeBrokerPipes [domain] = ctx.RegisterWithSameMailbox (pipe);
206
+ DynamicConfigs [domain]-> NodeBrokerPipe = ctx.RegisterWithSameMailbox (pipe);
196
207
}
197
208
}
198
209
@@ -204,7 +215,7 @@ void TDynamicNameserver::RequestEpochUpdate(ui32 domain,
204
215
205
216
TAutoPtr<TEvNodeBroker::TEvListNodes> request = new TEvNodeBroker::TEvListNodes;
206
217
request->Record .SetMinEpoch (epoch);
207
- NTabletPipe::SendData (ctx, NodeBrokerPipes [domain], request.Release ());
218
+ NTabletPipe::SendData (ctx, DynamicConfigs [domain]-> NodeBrokerPipe , request.Release ());
208
219
EpochUpdates[domain] = epoch;
209
220
}
210
221
@@ -346,22 +357,29 @@ void TDynamicNameserver::UpdateState(const NKikimrNodeBroker::TNodesInfo &rec,
346
357
}
347
358
}
348
359
349
- void TDynamicNameserver::OnPipeDestroyed (ui32 domain,
350
- const TActorContext &ctx)
360
+ template < typename TEv>
361
+ void TDynamicNameserver::OnPipeDestroyed (TAutoPtr<TEventHandle<TEv>> &ev, ui32 domain, const TActorContext &ctx)
351
362
{
352
- NodeBrokerPipes [domain] = TActorId ();
363
+ DynamicConfigs [domain]-> NodeBrokerPipe = TActorId ();
353
364
PendingRequestAnswered (domain, ctx);
354
365
355
366
if (EpochUpdates.contains (domain)) {
356
367
ctx.Schedule (TDuration::Seconds (1 ),
357
368
new TEvPrivate::TEvUpdateEpoch (domain, EpochUpdates.at (domain)));
358
369
EpochUpdates.erase (domain);
359
370
}
371
+
372
+ for (const auto &cacheMiss : DynamicConfigs[domain]->PendingCacheMisses .Container ()) {
373
+ Send (cacheMiss.RequestActor , new TEv (*ev->Get ()));
374
+ }
375
+ DynamicConfigs[domain]->PendingCacheMisses .clear ();
360
376
}
361
377
362
378
void TDynamicNameserver::Handle (TEvInterconnect::TEvResolveNode::TPtr &ev,
363
379
const TActorContext &ctx)
364
380
{
381
+ LOG_D (" Handle TEvInterconnect::TEvResolveNode(id=" << ev->Get ()->Record .GetNodeId () << " )" );
382
+
365
383
auto & record = ev->Get ()->Record ;
366
384
const ui32 nodeId = record.GetNodeId ();
367
385
const TInstant deadline = record.HasDeadline () ? TInstant::FromValue (record.GetDeadline ()) : TInstant::Max ();
@@ -391,7 +409,7 @@ void TDynamicNameserver::Handle(TEvInterconnect::TEvListNodes::TPtr &ev,
391
409
OpenPipe (domain, ctx);
392
410
TAutoPtr<TEvNodeBroker::TEvListNodes> request = new TEvNodeBroker::TEvListNodes;
393
411
request->Record .SetCachedVersion (DynamicConfigs[domain]->Epoch .Version );
394
- NTabletPipe::SendData (ctx, NodeBrokerPipes [domain], request.Release ());
412
+ NTabletPipe::SendData (ctx, DynamicConfigs [domain]-> NodeBrokerPipe , request.Release ());
395
413
PendingRequests.Set (domain);
396
414
}
397
415
}
@@ -403,6 +421,8 @@ void TDynamicNameserver::Handle(TEvInterconnect::TEvListNodes::TPtr &ev,
403
421
404
422
void TDynamicNameserver::Handle (TEvInterconnect::TEvGetNode::TPtr &ev, const TActorContext &ctx)
405
423
{
424
+ LOG_D (" Handle TEvInterconnect::TEvGetNode(id=" << ev->Get ()->NodeId << " )" );
425
+
406
426
ui32 nodeId = ev->Get ()->NodeId ;
407
427
THolder<TEvInterconnect::TEvNodeInfo> reply (new TEvInterconnect::TEvNodeInfo (nodeId));
408
428
auto config = AppData (ctx)->DynamicNameserviceConfig ;
@@ -435,18 +455,21 @@ void TDynamicNameserver::Handle(TEvInterconnect::TEvGetNode::TPtr &ev, const TAc
435
455
436
456
void TDynamicNameserver::Handle (TEvTabletPipe::TEvClientDestroyed::TPtr &ev, const TActorContext &ctx)
437
457
{
458
+ LOG_D (" Handle TEvTabletPipe::TEvClientDestroyed" );
438
459
ui32 domain = AppData ()->DomainsInfo ->GetDomain ()->DomainUid ;
439
- if (NodeBrokerPipes[domain] == ev->Get ()->ClientId )
440
- OnPipeDestroyed (domain, ctx);
460
+ if (DynamicConfigs[domain]->NodeBrokerPipe == ev->Get ()->ClientId ) {
461
+ OnPipeDestroyed (ev, domain, ctx);
462
+ }
441
463
}
442
464
443
465
void TDynamicNameserver::Handle (TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActorContext &ctx)
444
466
{
467
+ LOG_D (" Handle TEvTabletPipe::TEvClientConnected(status=" << ev->Get ()->Status << " )" );
445
468
if (ev->Get ()->Status != NKikimrProto::OK) {
446
469
ui32 domain = AppData (ctx)->DomainsInfo ->GetDomain ()->DomainUid ;
447
- if (NodeBrokerPipes [domain] == ev->Get ()->ClientId ) {
448
- NTabletPipe::CloseClient (ctx, NodeBrokerPipes [domain]);
449
- OnPipeDestroyed (domain, ctx);
470
+ if (DynamicConfigs [domain]-> NodeBrokerPipe == ev->Get ()->ClientId ) {
471
+ NTabletPipe::CloseClient (ctx, DynamicConfigs [domain]-> NodeBrokerPipe );
472
+ OnPipeDestroyed (ev, domain, ctx);
450
473
}
451
474
}
452
475
}
@@ -495,6 +518,26 @@ void TDynamicNameserver::Handle(TEvents::TEvUnsubscribe::TPtr ev) {
495
518
StaticNodeChangeSubscribers.erase (ev->Sender );
496
519
}
497
520
521
+ void TDynamicNameserver::HandleWakeup (const TActorContext &ctx) {
522
+ auto now = ctx.Now ();
523
+ LOG_D (" HandleWakeup at " << now);
524
+
525
+ ui32 domain = AppData ()->DomainsInfo ->GetDomain ()->DomainUid ;
526
+ auto &pendingCacheMisses = DynamicConfigs[domain]->PendingCacheMisses ;
527
+
528
+ while (!pendingCacheMisses.empty () && pendingCacheMisses.top ().Deadline <= now) {
529
+ const auto &top = pendingCacheMisses.top ();
530
+ Send (top.RequestActor , new TEvents::TEvWakeup);
531
+ pendingCacheMisses.pop ();
532
+ }
533
+
534
+ if (!pendingCacheMisses.empty () && pendingCacheMisses.top ().Deadline != TInstant::Max ()) {
535
+ auto deadline = pendingCacheMisses.top ().Deadline ;
536
+ LOG_D (" Schedule next wakeup at " << deadline);
537
+ Schedule (deadline, new TEvents::TEvWakeup);
538
+ }
539
+ }
540
+
498
541
IActor *CreateDynamicNameserver (const TIntrusivePtr<TTableNameserverSetup> &setup, ui32 poolId) {
499
542
return new TDynamicNameserver (setup, poolId);
500
543
}
0 commit comments