@@ -497,6 +497,7 @@ void THive::Handle(TEvPrivate::TEvBootTablets::TPtr&) {
497
497
for (auto * node : unimportantNodes) {
498
498
node->Ping ();
499
499
}
500
+ ProcessNodePingQueue ();
500
501
TVector<TTabletId> tabletsToReleaseFromParent;
501
502
TSideEffects sideEffects;
502
503
sideEffects.Reset (SelfId ());
@@ -687,11 +688,13 @@ void THive::Cleanup() {
687
688
688
689
void THive::Handle (TEvLocal::TEvStatus::TPtr& ev) {
689
690
BLOG_D (" Handle TEvLocal::TEvStatus for Node " << ev->Sender .NodeId () << " : " << ev->Get ()->Record .ShortDebugString ());
691
+ RemoveFromPingInProgress (ev->Sender .NodeId ());
690
692
Execute (CreateStatus (ev->Sender , ev->Get ()->Record ));
691
693
}
692
694
693
695
void THive::Handle (TEvLocal::TEvSyncTablets::TPtr& ev) {
694
696
BLOG_D (" THive::Handle::TEvSyncTablets" );
697
+ RemoveFromPingInProgress (ev->Sender .NodeId ());
695
698
Execute (CreateSyncTablets (ev->Sender , ev->Get ()->Record ));
696
699
}
697
700
@@ -745,6 +748,7 @@ void THive::Handle(TEvInterconnect::TEvNodeConnected::TPtr &ev) {
745
748
void THive::Handle (TEvInterconnect::TEvNodeDisconnected::TPtr &ev) {
746
749
TNodeId nodeId = ev->Get ()->NodeId ;
747
750
BLOG_W (" Handle TEvInterconnect::TEvNodeDisconnected, NodeId " << nodeId);
751
+ RemoveFromPingInProgress (nodeId);
748
752
if (ConnectedNodes.erase (nodeId)) {
749
753
UpdateCounterNodesConnected (-1 );
750
754
}
@@ -917,6 +921,7 @@ void THive::Handle(TEvents::TEvUndelivered::TPtr &ev) {
917
921
case TEvLocal::EvPing: {
918
922
TNodeId nodeId = ev->Cookie ;
919
923
TNodeInfo* node = FindNode (nodeId);
924
+ NodePingsInProgress.erase (nodeId);
920
925
if (node != nullptr && ev->Sender == node->Local ) {
921
926
if (node->IsDisconnecting ()) {
922
927
// ping continiousily until we fully disconnected from the node
@@ -925,6 +930,7 @@ void THive::Handle(TEvents::TEvUndelivered::TPtr &ev) {
925
930
KillNode (node->Id , node->Local );
926
931
}
927
932
}
933
+ ProcessNodePingQueue ();
928
934
break ;
929
935
}
930
936
};
@@ -1696,6 +1702,13 @@ void THive::UpdateCounterTabletsStarting(i64 tabletsStartingDiff) {
1696
1702
}
1697
1703
}
1698
1704
1705
+ void THive::UpdateCounterPingQueueSize () {
1706
+ if (TabletCounters != nullptr ) {
1707
+ auto & counter = TabletCounters->Simple ()[NHive::COUNTER_PINGQUEUE_SIZE];
1708
+ counter.Set (NodePingQueue.size ());
1709
+ }
1710
+ }
1711
+
1699
1712
void THive::RecordTabletMove (const TTabletMoveInfo& moveInfo) {
1700
1713
TabletMoveHistory.PushBack (moveInfo);
1701
1714
TabletCounters->Cumulative ()[NHive::COUNTER_TABLETS_MOVED].Increment (1 );
@@ -2672,6 +2685,25 @@ void THive::ExecuteStartTablet(TFullTabletId tabletId, const TActorId& local, ui
2672
2685
Execute (CreateStartTablet (tabletId, local, cookie, external));
2673
2686
}
2674
2687
2688
+ void THive::QueuePing (const TActorId& local) {
2689
+ NodePingQueue.push (local);
2690
+ }
2691
+
2692
+ void THive::ProcessNodePingQueue () {
2693
+ while (!NodePingQueue.empty () && NodePingsInProgress.size () < GetMaxPingsInFlight ()) {
2694
+ TActorId local = NodePingQueue.front ();
2695
+ TNodeId node = local.NodeId ();
2696
+ NodePingQueue.pop ();
2697
+ NodePingsInProgress.insert (node);
2698
+ SendPing (local, node);
2699
+ }
2700
+ }
2701
+
2702
+ void THive::RemoveFromPingInProgress (TNodeId node) {
2703
+ NodePingsInProgress.erase (node);
2704
+ ProcessNodePingQueue ();
2705
+ }
2706
+
2675
2707
void THive::SendPing (const TActorId& local, TNodeId id) {
2676
2708
Send (local,
2677
2709
new TEvLocal::TEvPing (HiveId,
0 commit comments