diff --git a/ydb/core/viewer/counters_hosts.h b/ydb/core/viewer/counters_hosts.h index 9a65349aae2d..2232f16b00ba 100644 --- a/ydb/core/viewer/counters_hosts.h +++ b/ydb/core/viewer/counters_hosts.h @@ -16,10 +16,13 @@ using namespace NActors; using namespace NNodeWhiteboard; class TCountersHostsList : public TActorBootstrapped { + using TBase = TActorBootstrapped; + IViewer* Viewer; NMon::TEvHttpInfo::TPtr Event; THolder NodesInfo; TMap> NodesResponses; + THashSet TcpProxies; ui32 NodesRequested = 0; ui32 NodesReceived = 0; bool StaticNodesOnly = false; @@ -35,47 +38,48 @@ class TCountersHostsList : public TActorBootstrapped { , Event(ev) {} - void Bootstrap(const TActorContext& ctx) { + void Bootstrap() { const auto& params(Event->Get()->Request.GetParams()); StaticNodesOnly = FromStringWithDefault(params.Get("static_only"), StaticNodesOnly); DynamicNodesOnly = FromStringWithDefault(params.Get("dynamic_only"), DynamicNodesOnly); const TActorId nameserviceId = GetNameserviceActorId(); - ctx.Send(nameserviceId, new TEvInterconnect::TEvListNodes()); - ctx.Schedule(TDuration::Seconds(10), new TEvents::TEvWakeup()); + Send(nameserviceId, new TEvInterconnect::TEvListNodes()); + Schedule(TDuration::Seconds(10), new TEvents::TEvWakeup()); Become(&TThis::StateRequestedList); } STFUNC(StateRequestedList) { switch (ev->GetTypeRewrite()) { - HFunc(TEvInterconnect::TEvNodesInfo, Handle); - CFunc(TEvents::TSystem::Wakeup, Timeout); + hFunc(TEvInterconnect::TEvNodesInfo, Handle); + cFunc(TEvents::TSystem::Wakeup, Timeout); } } STFUNC(StateRequestedSysInfo) { switch (ev->GetTypeRewrite()) { - HFunc(TEvWhiteboard::TEvSystemStateResponse, Handle); - HFunc(TEvents::TEvUndelivered, Undelivered); - HFunc(TEvInterconnect::TEvNodeDisconnected, Disconnected); - CFunc(TEvents::TSystem::Wakeup, Timeout); + hFunc(TEvWhiteboard::TEvSystemStateResponse, Handle); + hFunc(TEvents::TEvUndelivered, Undelivered); + hFunc(TEvInterconnect::TEvNodeDisconnected, Disconnected); + hFunc(TEvInterconnect::TEvNodeConnected, Connected); + cFunc(TEvents::TSystem::Wakeup, Timeout); } } - void SendRequest(ui32 nodeId, const TActorContext& ctx) { + void SendRequest(ui32 nodeId) { TActorId whiteboardServiceId = MakeNodeWhiteboardServiceId(nodeId); THolder request = MakeHolder(); - ctx.Send(whiteboardServiceId, request.Release(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, nodeId); - ++NodesRequested; + Send(whiteboardServiceId, request.Release(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, nodeId); + NodesRequested++; } - void NodeStateInfoReceived(const TActorContext& ctx) { + void NodeStateInfoReceived() { ++NodesReceived; if (NodesRequested == NodesReceived) { - ReplyAndDie(ctx); + ReplyAndDie(); } } - void Handle(TEvInterconnect::TEvNodesInfo::TPtr& ev, const TActorContext& ctx) { + void Handle(TEvInterconnect::TEvNodesInfo::TPtr& ev) { NodesInfo = ev->Release(); ui32 minAllowedNodeId = std::numeric_limits::min(); ui32 maxAllowedNodeId = std::numeric_limits::max(); @@ -90,33 +94,38 @@ class TCountersHostsList : public TActorBootstrapped { } for (const auto& nodeInfo : NodesInfo->Nodes) { if (nodeInfo.NodeId >= minAllowedNodeId && nodeInfo.NodeId <= maxAllowedNodeId) { - SendRequest(nodeInfo.NodeId, ctx); + SendRequest(nodeInfo.NodeId); } } Become(&TThis::StateRequestedSysInfo); } - void Handle(TEvWhiteboard::TEvSystemStateResponse::TPtr& ev, const TActorContext& ctx) { + void Handle(TEvWhiteboard::TEvSystemStateResponse::TPtr& ev) { ui64 nodeId = ev.Get()->Cookie; NodesResponses[nodeId] = ev->Release(); - NodeStateInfoReceived(ctx); + NodeStateInfoReceived(); } - void Undelivered(TEvents::TEvUndelivered::TPtr& ev, const TActorContext& ctx) { + void Undelivered(TEvents::TEvUndelivered::TPtr& ev) { ui32 nodeId = ev.Get()->Cookie; if (NodesResponses.emplace(nodeId, nullptr).second) { - NodeStateInfoReceived(ctx); + NodeStateInfoReceived(); } } - void Disconnected(TEvInterconnect::TEvNodeDisconnected::TPtr& ev, const TActorContext& ctx) { + void Disconnected(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) { ui32 nodeId = ev->Get()->NodeId; + TcpProxies.erase(ev->Sender); if (NodesResponses.emplace(nodeId, nullptr).second) { - NodeStateInfoReceived(ctx); + NodeStateInfoReceived(); } } - void ReplyAndDie(const TActorContext& ctx) { + void Connected(TEvInterconnect::TEvNodeConnected::TPtr& ev) { + TcpProxies.insert(ev->Sender); + } + + void ReplyAndDie() { TStringStream text; for (const auto& [nodeId, sysInfo] : NodesResponses) { if (sysInfo) { @@ -147,12 +156,19 @@ class TCountersHostsList : public TActorBootstrapped { } } } - ctx.Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPOKTEXT(Event->Get()) + text.Str(), 0, NMon::IEvHttpInfoRes::EContentType::Custom)); - Die(ctx); + Send(Event->Sender, new NMon::TEvHttpInfoRes(Viewer->GetHTTPOKTEXT(Event->Get()) + text.Str(), 0, NMon::IEvHttpInfoRes::EContentType::Custom)); + PassAway(); + } + + void PassAway() { + for (auto &tcpPorxy: TcpProxies) { + Send(tcpPorxy, new TEvents::TEvUnsubscribe); + } + TBase::PassAway(); } - void Timeout(const TActorContext &ctx) { - ReplyAndDie(ctx); + void Timeout() { + ReplyAndDie(); } }; diff --git a/ydb/core/viewer/json_vdisk_req.h b/ydb/core/viewer/json_vdisk_req.h index 61b805d9a498..28459d510b56 100644 --- a/ydb/core/viewer/json_vdisk_req.h +++ b/ydb/core/viewer/json_vdisk_req.h @@ -60,6 +60,8 @@ class TJsonVDiskRequest : public TViewerPipeClient TcpProxyId; + public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::VIEWER_HANDLER; @@ -111,6 +113,7 @@ class TJsonVDiskRequest : public TViewerPipeClientSender; + } + void Disconnected() { + TcpProxyId = {}; if (!RetryRequest()) { TBase::RequestDone(); } @@ -170,6 +178,13 @@ class TJsonVDiskRequest : public TViewerPipeClientSend(*TcpProxyId, new TEvents::TEvUnsubscribe); + } + TBase::PassAway(); + } + void ReplyAndPassAway(const TString &error = "") { try { TStringStream json; @@ -182,10 +197,8 @@ class TJsonVDiskRequest : public TViewerPipeClient