Skip to content

Commit b665ef7

Browse files
authored
control inflight pings in hive (#6916) (#7237)
1 parent 2d3a63b commit b665ef7

File tree

6 files changed

+51
-2
lines changed

6 files changed

+51
-2
lines changed

ydb/core/mind/hive/hive_impl.cpp

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,7 @@ void THive::Handle(TEvPrivate::TEvBootTablets::TPtr&) {
495495
for (auto* node : unimportantNodes) {
496496
node->Ping();
497497
}
498+
ProcessNodePingQueue();
498499
TVector<TTabletId> tabletsToReleaseFromParent;
499500
TSideEffects sideEffects;
500501
sideEffects.Reset(SelfId());
@@ -685,11 +686,13 @@ void THive::Cleanup() {
685686

686687
void THive::Handle(TEvLocal::TEvStatus::TPtr& ev) {
687688
BLOG_D("Handle TEvLocal::TEvStatus for Node " << ev->Sender.NodeId() << ": " << ev->Get()->Record.ShortDebugString());
689+
RemoveFromPingInProgress(ev->Sender.NodeId());
688690
Execute(CreateStatus(ev->Sender, ev->Get()->Record));
689691
}
690692

691693
void THive::Handle(TEvLocal::TEvSyncTablets::TPtr& ev) {
692694
BLOG_D("THive::Handle::TEvSyncTablets");
695+
RemoveFromPingInProgress(ev->Sender.NodeId());
693696
Execute(CreateSyncTablets(ev->Sender, ev->Get()->Record));
694697
}
695698

@@ -742,7 +745,10 @@ void THive::Handle(TEvInterconnect::TEvNodeConnected::TPtr &ev) {
742745
void THive::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr &ev) {
743746
TNodeId nodeId = ev->Get()->NodeId;
744747
BLOG_W("Handle TEvInterconnect::TEvNodeDisconnected, NodeId " << nodeId);
745-
ConnectedNodes.erase(nodeId);
748+
RemoveFromPingInProgress(nodeId);
749+
if (ConnectedNodes.erase(nodeId)) {
750+
UpdateCounterNodesConnected(-1);
751+
}
746752
Execute(CreateDisconnectNode(THolder<TEvInterconnect::TEvNodeDisconnected>(ev->Release().Release())));
747753
}
748754

@@ -912,6 +918,7 @@ void THive::Handle(TEvents::TEvUndelivered::TPtr &ev) {
912918
case TEvLocal::EvPing: {
913919
TNodeId nodeId = ev->Cookie;
914920
TNodeInfo* node = FindNode(nodeId);
921+
NodePingsInProgress.erase(nodeId);
915922
if (node != nullptr && ev->Sender == node->Local) {
916923
if (node->IsDisconnecting()) {
917924
// ping continiousily until we fully disconnected from the node
@@ -920,6 +927,7 @@ void THive::Handle(TEvents::TEvUndelivered::TPtr &ev) {
920927
KillNode(node->Id, node->Local);
921928
}
922929
}
930+
ProcessNodePingQueue();
923931
break;
924932
}
925933
};
@@ -1684,6 +1692,13 @@ void THive::UpdateCounterNodesConnected(i64 nodesConnectedDiff) {
16841692
}
16851693
}
16861694

1695+
void THive::UpdateCounterPingQueueSize() {
1696+
if (TabletCounters != nullptr) {
1697+
auto& counter = TabletCounters->Simple()[NHive::COUNTER_PINGQUEUE_SIZE];
1698+
counter.Set(NodePingQueue.size());
1699+
}
1700+
}
1701+
16871702
void THive::RecordTabletMove(const TTabletMoveInfo& moveInfo) {
16881703
TabletMoveHistory.PushBack(moveInfo);
16891704
TabletCounters->Cumulative()[NHive::COUNTER_TABLETS_MOVED].Increment(1);
@@ -2648,6 +2663,25 @@ void THive::ExecuteStartTablet(TFullTabletId tabletId, const TActorId& local, ui
26482663
Execute(CreateStartTablet(tabletId, local, cookie, external));
26492664
}
26502665

2666+
void THive::QueuePing(const TActorId& local) {
2667+
NodePingQueue.push(local);
2668+
}
2669+
2670+
void THive::ProcessNodePingQueue() {
2671+
while (!NodePingQueue.empty() && NodePingsInProgress.size() < GetMaxPingsInFlight()) {
2672+
TActorId local = NodePingQueue.front();
2673+
TNodeId node = local.NodeId();
2674+
NodePingQueue.pop();
2675+
NodePingsInProgress.insert(node);
2676+
SendPing(local, node);
2677+
}
2678+
}
2679+
2680+
void THive::RemoveFromPingInProgress(TNodeId node) {
2681+
NodePingsInProgress.erase(node);
2682+
ProcessNodePingQueue();
2683+
}
2684+
26512685
void THive::SendPing(const TActorId& local, TNodeId id) {
26522686
Send(local,
26532687
new TEvLocal::TEvPing(HiveId,

ydb/core/mind/hive/hive_impl.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,8 @@ class THive : public TActor<THive>, public TTabletExecutedFlat, public THiveShar
409409
TOwnershipKeeper Keeper;
410410
TEventPriorityQueue<THive> EventQueue{*this};
411411
std::vector<TActorId> ActorsWaitingToMoveTablets;
412+
std::queue<TActorId> NodePingQueue;
413+
std::unordered_set<TNodeId> NodePingsInProgress;
412414

413415
struct TPendingCreateTablet {
414416
NKikimrHive::TEvCreateTablet CreateTablet;
@@ -642,6 +644,7 @@ TTabletInfo* FindTabletEvenInDeleting(TTabletId tabletId, TFollowerId followerId
642644
void UpdateCounterBootQueueSize(ui64 bootQueueSize);
643645
void UpdateCounterEventQueueSize(i64 eventQueueSizeDiff);
644646
void UpdateCounterNodesConnected(i64 nodesConnectedDiff);
647+
void UpdateCounterPingQueueSize();
645648
void RecordTabletMove(const TTabletMoveInfo& info);
646649
bool DomainHasNodes(const TSubDomainKey &domainKey) const;
647650
void ProcessBootQueue();
@@ -670,7 +673,10 @@ TTabletInfo* FindTabletEvenInDeleting(TTabletId tabletId, TFollowerId followerId
670673
void UpdateRegisteredDataCenters();
671674
void AddRegisteredDataCentersNode(TDataCenterId dataCenterId, TNodeId nodeId);
672675
void RemoveRegisteredDataCentersNode(TDataCenterId dataCenterId, TNodeId nodeId);
676+
void QueuePing(const TActorId& local);
673677
void SendPing(const TActorId& local, TNodeId id);
678+
void RemoveFromPingInProgress(TNodeId node);
679+
void ProcessNodePingQueue();
674680
void SendReconnect(const TActorId& local);
675681
static THolder<TGroupFilter> BuildGroupParametersForChannel(const TLeaderTabletInfo& tablet, ui32 channelId);
676682
void KickTablet(const TTabletInfo& tablet);
@@ -923,6 +929,10 @@ TTabletInfo* FindTabletEvenInDeleting(TTabletId tabletId, TFollowerId followerId
923929
return CurrentConfig.GetStorageBalancerInflight();
924930
}
925931

932+
ui64 GetMaxPingsInFlight() const {
933+
return CurrentConfig.GetMaxPingsInFlight();
934+
}
935+
926936
static void ActualizeRestartStatistics(google::protobuf::RepeatedField<google::protobuf::uint64>& restartTimestamps, ui64 barrier);
927937
static ui64 GetRestartsPerPeriod(const google::protobuf::RepeatedField<google::protobuf::uint64>& restartTimestamps, ui64 barrier);
928938
static bool IsSystemTablet(TTabletTypes::EType type);

ydb/core/mind/hive/node_info.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,7 @@ void TNodeInfo::DeregisterInDomains() {
337337
void TNodeInfo::Ping() {
338338
Y_ABORT_UNLESS((bool)Local);
339339
BLOG_D("Node(" << Id << ") Ping(" << Local << ")");
340-
Hive.SendPing(Local, Id);
340+
Hive.QueuePing(Local);
341341
}
342342

343343
void TNodeInfo::SendReconnect(const TActorId& local) {

ydb/core/mind/hive/tx__register_node.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,9 @@ class TTxRegisterNode : public TTransactionBase<THive> {
7777
BLOG_D("THive::TTxRegisterNode(" << Local.NodeId() << ")::Complete");
7878
TNodeInfo* node = Self->FindNode(Local.NodeId());
7979
if (node != nullptr && node->Local) { // we send ping on every RegisterNode because we want to re-sync tablets upon every reconnection
80+
Self->NodePingsInProgress.erase(node->Id);
8081
node->Ping();
82+
Self->ProcessNodePingQueue();
8183
}
8284
}
8385
};

ydb/core/protos/config.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1479,6 +1479,7 @@ message THiveConfig {
14791479
optional double MinStorageScatterToBalance = 71 [default = 999]; // storage balancer trigger is disabled by default
14801480
optional double MinGroupUsageToBalance = 72 [default = 0.1];
14811481
optional uint64 StorageBalancerInflight = 73 [default = 1];
1482+
optional uint64 MaxPingsInFlight = 78 [default = 1000];
14821483
}
14831484

14841485
message TColumnShardConfig {

ydb/core/protos/counters_hive.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ enum ESimpleCounters {
2929
COUNTER_IMBALANCED_OBJECTS = 19 [(CounterOpts) = {Name: "ImbalancedObjects"}];
3030
COUNTER_WORST_OBJECT_VARIANCE = 20 [(CounterOpts) = {Name: "WorstObjectVariance"}];
3131
COUNTER_STORAGE_SCATTER = 21 [(CounterOpts) = {Name: "StorageScatter"}];
32+
RESERVED22 = 22;
33+
COUNTER_PINGQUEUE_SIZE = 23 [(CounterOpts) = {Name: "PingQueueSize"}];
3234
}
3335

3436
enum ECumulativeCounters {

0 commit comments

Comments
 (0)