Skip to content

Commit 8bc3161

Browse files
alexvrublinkov
authored andcommitted
Improve BlobDepot observability (#15354)
1 parent 5f155ea commit 8bc3161

20 files changed

+264
-43
lines changed

ydb/core/base/counters.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ LWTRACE_DEFINE_PROVIDER(MONITORING_PROVIDER)
77
namespace NKikimr {
88

99
static const THashSet<TString> DATABASE_SERVICES
10-
= {{ TString("compile"),
10+
= {{
11+
TString("blob_depot_agent"),
12+
TString("compile"),
1113
TString("coordinator"),
1214
TString("dsproxy"),
1315
TString("dsproxy_mon"),

ydb/core/blob_depot/agent/agent.cpp

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ namespace NKikimr::NBlobDepot {
3131
void TBlobDepotAgent::Bootstrap() {
3232
Become(&TThis::StateFunc);
3333

34+
SetupCounters();
35+
3436
if (TabletId && TabletId != Max<ui64>()) {
3537
ConnectToBlobDepot();
3638
}
@@ -40,6 +42,75 @@ namespace NKikimr::NBlobDepot {
4042
HandlePushMetrics();
4143
}
4244

45+
void TBlobDepotAgent::SetupCounters() {
46+
AgentCounters = GetServiceCounters(AppData()->Counters, "blob_depot_agent")
47+
->GetSubgroup("group", ::ToString(VirtualGroupId));
48+
49+
auto connectivity = AgentCounters->GetSubgroup("subsystem", "connectivity");
50+
51+
ModeConnectPending = connectivity->GetCounter("Mode/ConnectPending", false);
52+
ModeRegistering = connectivity->GetCounter("Mode/Registering", false);
53+
ModeConnected = connectivity->GetCounter("Mode/Connected", false);
54+
55+
auto pendingEventQueue = AgentCounters->GetSubgroup("subsystem", "pendingEventQueue");
56+
57+
PendingEventQueueItems = pendingEventQueue->GetCounter("Items", false);
58+
PendingEventQueueBytes = pendingEventQueue->GetCounter("Bytes", false);
59+
60+
auto requests = AgentCounters->GetSubgroup("subsystem", "requests");
61+
62+
auto makeHist = [&] {
63+
return NMonitoring::ExplicitHistogram({
64+
0.25, 0.5,
65+
1, 2, 4, 8, 16, 32, 64, 128, 256, 512,
66+
1024, 2048, 4096, 8192, 16384, 32768,
67+
65536
68+
});
69+
};
70+
71+
#define XX(ITEM) \
72+
do { \
73+
auto subgroup = requests->GetSubgroup("request", #ITEM); \
74+
RequestsReceived[TEvBlobStorage::ITEM] = subgroup->GetCounter("Received", true); \
75+
SuccessResponseTime[TEvBlobStorage::ITEM] = subgroup->GetNamedHistogram("sensor", "SuccessResponseTime_us", makeHist()); \
76+
ErrorResponseTime[TEvBlobStorage::ITEM] = subgroup->GetNamedHistogram("sensor", "ErrorResponseTime_us", makeHist()); \
77+
} while (false);
78+
79+
ENUMERATE_INCOMING_EVENTS(XX)
80+
#undef XX
81+
82+
auto s3 = AgentCounters->GetSubgroup("subsystem", "s3");
83+
84+
S3GetBytesOk = s3->GetCounter("GetBytesOk", true);
85+
S3GetsOk = s3->GetCounter("GetsOk", true);
86+
S3GetsError = s3->GetCounter("GetsError", true);
87+
88+
S3PutBytesOk = s3->GetCounter("PutBytesOk", true);
89+
S3PutsOk = s3->GetCounter("PutsOk", true);
90+
S3PutsError = s3->GetCounter("PutsError", true);
91+
}
92+
93+
void TBlobDepotAgent::SwitchMode(EMode mode) {
94+
auto getCounter = [&](EMode mode) -> NMonitoring::TCounterForPtr* {
95+
switch (mode) {
96+
case EMode::None: return nullptr;
97+
case EMode::ConnectPending: return ModeConnectPending.Get();
98+
case EMode::Registering: return ModeRegistering.Get();
99+
case EMode::Connected: return ModeConnected.Get();
100+
}
101+
};
102+
103+
if (Mode != mode) {
104+
if (auto *p = getCounter(Mode)) {
105+
--*p;
106+
}
107+
if (auto *p = getCounter(mode)) {
108+
++*p;
109+
}
110+
Mode = mode;
111+
}
112+
}
113+
43114
IActor *CreateBlobDepotAgent(ui32 virtualGroupId, TIntrusivePtr<TBlobStorageGroupInfo> info, TActorId proxyId) {
44115
return new TBlobDepotAgent(virtualGroupId, std::move(info), proxyId);
45116
}

ydb/core/blob_depot/agent/agent_impl.h

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,37 @@ namespace NKikimr::NBlobDepot {
195195
TActorId PipeServerId;
196196
bool IsConnected = false;
197197

198+
NMonitoring::TDynamicCounterPtr AgentCounters;
199+
200+
NMonitoring::TDynamicCounters::TCounterPtr ModeConnectPending;
201+
NMonitoring::TDynamicCounters::TCounterPtr ModeRegistering;
202+
NMonitoring::TDynamicCounters::TCounterPtr ModeConnected;
203+
204+
NMonitoring::TDynamicCounters::TCounterPtr PendingEventQueueItems;
205+
NMonitoring::TDynamicCounters::TCounterPtr PendingEventQueueBytes;
206+
207+
THashMap<ui32, NMonitoring::TDynamicCounters::TCounterPtr> RequestsReceived;
208+
THashMap<ui32, NMonitoring::THistogramPtr> SuccessResponseTime;
209+
THashMap<ui32, NMonitoring::THistogramPtr> ErrorResponseTime;
210+
211+
NMonitoring::TDynamicCounters::TCounterPtr S3GetBytesOk;
212+
NMonitoring::TDynamicCounters::TCounterPtr S3GetsOk;
213+
NMonitoring::TDynamicCounters::TCounterPtr S3GetsError;
214+
NMonitoring::TDynamicCounters::TCounterPtr S3PutBytesOk;
215+
NMonitoring::TDynamicCounters::TCounterPtr S3PutsOk;
216+
NMonitoring::TDynamicCounters::TCounterPtr S3PutsError;
217+
218+
enum class EMode {
219+
None,
220+
ConnectPending,
221+
Registering,
222+
Connected
223+
};
224+
225+
EMode Mode = EMode::None;
226+
227+
void SwitchMode(EMode mode);
228+
198229
private:
199230
struct TEvPrivate {
200231
enum {
@@ -258,6 +289,9 @@ namespace NKikimr::NBlobDepot {
258289

259290
void PassAway() override {
260291
ClearPendingEventQueue("BlobDepot agent destroyed");
292+
if (AgentCounters) {
293+
GetServiceCounters(AppData()->Counters, "blob_depot_agent")->RemoveSubgroup("group", ::ToString(VirtualGroupId));
294+
}
261295
NTabletPipe::CloseAndForgetClient(SelfId(), PipeId);
262296
if (S3WrapperId) {
263297
TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, S3WrapperId, SelfId(), nullptr, 0));
@@ -332,6 +366,7 @@ namespace NKikimr::NBlobDepot {
332366

333367
void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev);
334368
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev);
369+
void SetupCounters();
335370
void ConnectToBlobDepot();
336371
void OnConnect();
337372
void OnDisconnect();
@@ -361,6 +396,7 @@ namespace NKikimr::NBlobDepot {
361396
{
362397
protected:
363398
std::unique_ptr<IEventHandle> Event; // original query event
399+
const TMonotonic Received;
364400
const ui64 QueryId;
365401
mutable TString QueryIdString;
366402
const TMonotonic StartTime;
@@ -373,7 +409,7 @@ namespace NKikimr::NBlobDepot {
373409
static constexpr TDuration WatchdogDuration = TDuration::Seconds(10);
374410

375411
public:
376-
TQuery(TBlobDepotAgent& agent, std::unique_ptr<IEventHandle> event);
412+
TQuery(TBlobDepotAgent& agent, std::unique_ptr<IEventHandle> event, TMonotonic received);
377413
virtual ~TQuery();
378414

379415
void CheckQueryExecutionTime(TMonotonic now);
@@ -422,8 +458,8 @@ namespace NKikimr::NBlobDepot {
422458
template<typename TEvent>
423459
class TBlobStorageQuery : public TQuery {
424460
public:
425-
TBlobStorageQuery(TBlobDepotAgent& agent, std::unique_ptr<IEventHandle> event)
426-
: TQuery(agent, std::move(event))
461+
TBlobStorageQuery(TBlobDepotAgent& agent, std::unique_ptr<IEventHandle> event, TMonotonic received)
462+
: TQuery(agent, std::move(event), received)
427463
, Request(*Event->Get<TEvent>())
428464
{
429465
ExecutionRelay = std::move(Request.ExecutionRelay);
@@ -437,6 +473,7 @@ namespace NKikimr::NBlobDepot {
437473
std::unique_ptr<IEventHandle> Event;
438474
size_t Size;
439475
TMonotonic ExpirationTimestamp;
476+
TMonotonic Received;
440477
};
441478

442479
std::deque<TPendingEvent> PendingEventQ;
@@ -448,13 +485,13 @@ namespace NKikimr::NBlobDepot {
448485
TIntrusiveListWithAutoDelete<TQuery, TQuery::TDeleter, TExecutingQueries> DeletePendingQueries;
449486
bool ProcessPendingEventInFlight = false;
450487

451-
template<ui32 EventType> TQuery *CreateQuery(std::unique_ptr<IEventHandle> ev);
488+
template<ui32 EventType> TQuery *CreateQuery(std::unique_ptr<IEventHandle> ev, TMonotonic received);
452489
void HandleStorageProxy(TAutoPtr<IEventHandle> ev);
453490
void HandleAssimilate(TAutoPtr<IEventHandle> ev);
454491
void HandlePendingEvent();
455492
void HandleProcessPendingEvent();
456493
void ClearPendingEventQueue(const TString& reason);
457-
void ProcessStorageEvent(std::unique_ptr<IEventHandle> ev);
494+
void ProcessStorageEvent(std::unique_ptr<IEventHandle> ev, TMonotonic received);
458495
void HandlePendingEventQueueWatchdog();
459496
void Handle(TEvBlobStorage::TEvBunchOfEvents::TPtr ev);
460497
void HandleQueryWatchdog();

ydb/core/blob_depot/agent/comm.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ namespace NKikimr::NBlobDepot {
1414
ConnectToBlobDepot();
1515
} else {
1616
PipeServerId = msg.ServerId;
17+
SwitchMode(EMode::Registering);
1718
}
1819
}
1920

@@ -34,6 +35,7 @@ namespace NKikimr::NBlobDepot {
3435
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA05, "ConnectToBlobDepot", (AgentId, LogId), (PipeId, PipeId), (RequestId, id));
3536
NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvRegisterAgent(VirtualGroupId, AgentInstanceId), id);
3637
RegisterRequest(id, this, nullptr, {}, true);
38+
SwitchMode(EMode::ConnectPending);
3739
}
3840

3941
void TBlobDepotAgent::Handle(TRequestContext::TPtr /*context*/, NKikimrBlobDepot::TEvRegisterAgentResult& msg) {
@@ -147,6 +149,8 @@ namespace NKikimr::NBlobDepot {
147149

148150
void TBlobDepotAgent::OnConnect() {
149151
IsConnected = true;
152+
SwitchMode(EMode::Connected);
153+
150154
HandlePendingEvent();
151155
}
152156

@@ -163,6 +167,7 @@ namespace NKikimr::NBlobDepot {
163167

164168
ClearPendingEventQueue("BlobDepot tablet disconnected");
165169

170+
SwitchMode(EMode::None);
166171
IsConnected = false;
167172
}
168173

0 commit comments

Comments
 (0)