Skip to content

Improve BlobDepot observability #15354

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ydb/core/base/counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ LWTRACE_DEFINE_PROVIDER(MONITORING_PROVIDER)
namespace NKikimr {

static const THashSet<TString> DATABASE_SERVICES
= {{ TString("compile"),
= {{
TString("blob_depot_agent"),
TString("compile"),
TString("coordinator"),
TString("dsproxy"),
TString("dsproxy_mon"),
Expand Down
71 changes: 71 additions & 0 deletions ydb/core/blob_depot/agent/agent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ namespace NKikimr::NBlobDepot {
void TBlobDepotAgent::Bootstrap() {
Become(&TThis::StateFunc);

SetupCounters();

if (TabletId && TabletId != Max<ui64>()) {
ConnectToBlobDepot();
}
Expand All @@ -40,6 +42,75 @@ namespace NKikimr::NBlobDepot {
HandlePushMetrics();
}

void TBlobDepotAgent::SetupCounters() {
AgentCounters = GetServiceCounters(AppData()->Counters, "blob_depot_agent")
->GetSubgroup("group", ::ToString(VirtualGroupId));

auto connectivity = AgentCounters->GetSubgroup("subsystem", "connectivity");

ModeConnectPending = connectivity->GetCounter("Mode/ConnectPending", false);
ModeRegistering = connectivity->GetCounter("Mode/Registering", false);
ModeConnected = connectivity->GetCounter("Mode/Connected", false);

auto pendingEventQueue = AgentCounters->GetSubgroup("subsystem", "pendingEventQueue");

PendingEventQueueItems = pendingEventQueue->GetCounter("Items", false);
PendingEventQueueBytes = pendingEventQueue->GetCounter("Bytes", false);

auto requests = AgentCounters->GetSubgroup("subsystem", "requests");

auto makeHist = [&] {
return NMonitoring::ExplicitHistogram({
0.25, 0.5,
1, 2, 4, 8, 16, 32, 64, 128, 256, 512,
1024, 2048, 4096, 8192, 16384, 32768,
65536
});
};

#define XX(ITEM) \
do { \
auto subgroup = requests->GetSubgroup("request", #ITEM); \
RequestsReceived[TEvBlobStorage::ITEM] = subgroup->GetCounter("Received", true); \
SuccessResponseTime[TEvBlobStorage::ITEM] = subgroup->GetNamedHistogram("sensor", "SuccessResponseTime_us", makeHist()); \
ErrorResponseTime[TEvBlobStorage::ITEM] = subgroup->GetNamedHistogram("sensor", "ErrorResponseTime_us", makeHist()); \
} while (false);

ENUMERATE_INCOMING_EVENTS(XX)
#undef XX

auto s3 = AgentCounters->GetSubgroup("subsystem", "s3");

S3GetBytesOk = s3->GetCounter("GetBytesOk", true);
S3GetsOk = s3->GetCounter("GetsOk", true);
S3GetsError = s3->GetCounter("GetsError", true);

S3PutBytesOk = s3->GetCounter("PutBytesOk", true);
S3PutsOk = s3->GetCounter("PutsOk", true);
S3PutsError = s3->GetCounter("PutsError", true);
}

void TBlobDepotAgent::SwitchMode(EMode mode) {
auto getCounter = [&](EMode mode) -> NMonitoring::TCounterForPtr* {
switch (mode) {
case EMode::None: return nullptr;
case EMode::ConnectPending: return ModeConnectPending.Get();
case EMode::Registering: return ModeRegistering.Get();
case EMode::Connected: return ModeConnected.Get();
}
};

if (Mode != mode) {
if (auto *p = getCounter(Mode)) {
--*p;
}
if (auto *p = getCounter(mode)) {
++*p;
}
Mode = mode;
}
}

IActor *CreateBlobDepotAgent(ui32 virtualGroupId, TIntrusivePtr<TBlobStorageGroupInfo> info, TActorId proxyId) {
return new TBlobDepotAgent(virtualGroupId, std::move(info), proxyId);
}
Expand Down
47 changes: 42 additions & 5 deletions ydb/core/blob_depot/agent/agent_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,37 @@ namespace NKikimr::NBlobDepot {
TActorId PipeServerId;
bool IsConnected = false;

NMonitoring::TDynamicCounterPtr AgentCounters;

NMonitoring::TDynamicCounters::TCounterPtr ModeConnectPending;
NMonitoring::TDynamicCounters::TCounterPtr ModeRegistering;
NMonitoring::TDynamicCounters::TCounterPtr ModeConnected;

NMonitoring::TDynamicCounters::TCounterPtr PendingEventQueueItems;
NMonitoring::TDynamicCounters::TCounterPtr PendingEventQueueBytes;

THashMap<ui32, NMonitoring::TDynamicCounters::TCounterPtr> RequestsReceived;
THashMap<ui32, NMonitoring::THistogramPtr> SuccessResponseTime;
THashMap<ui32, NMonitoring::THistogramPtr> ErrorResponseTime;

NMonitoring::TDynamicCounters::TCounterPtr S3GetBytesOk;
NMonitoring::TDynamicCounters::TCounterPtr S3GetsOk;
NMonitoring::TDynamicCounters::TCounterPtr S3GetsError;
NMonitoring::TDynamicCounters::TCounterPtr S3PutBytesOk;
NMonitoring::TDynamicCounters::TCounterPtr S3PutsOk;
NMonitoring::TDynamicCounters::TCounterPtr S3PutsError;

enum class EMode {
None,
ConnectPending,
Registering,
Connected
};

EMode Mode = EMode::None;

void SwitchMode(EMode mode);

private:
struct TEvPrivate {
enum {
Expand Down Expand Up @@ -258,6 +289,9 @@ namespace NKikimr::NBlobDepot {

void PassAway() override {
ClearPendingEventQueue("BlobDepot agent destroyed");
if (AgentCounters) {
GetServiceCounters(AppData()->Counters, "blob_depot_agent")->RemoveSubgroup("group", ::ToString(VirtualGroupId));
}
NTabletPipe::CloseAndForgetClient(SelfId(), PipeId);
if (S3WrapperId) {
TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, S3WrapperId, SelfId(), nullptr, 0));
Expand Down Expand Up @@ -332,6 +366,7 @@ namespace NKikimr::NBlobDepot {

void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev);
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev);
void SetupCounters();
void ConnectToBlobDepot();
void OnConnect();
void OnDisconnect();
Expand Down Expand Up @@ -361,6 +396,7 @@ namespace NKikimr::NBlobDepot {
{
protected:
std::unique_ptr<IEventHandle> Event; // original query event
const TMonotonic Received;
const ui64 QueryId;
mutable TString QueryIdString;
const TMonotonic StartTime;
Expand All @@ -373,7 +409,7 @@ namespace NKikimr::NBlobDepot {
static constexpr TDuration WatchdogDuration = TDuration::Seconds(10);

public:
TQuery(TBlobDepotAgent& agent, std::unique_ptr<IEventHandle> event);
TQuery(TBlobDepotAgent& agent, std::unique_ptr<IEventHandle> event, TMonotonic received);
virtual ~TQuery();

void CheckQueryExecutionTime(TMonotonic now);
Expand Down Expand Up @@ -422,8 +458,8 @@ namespace NKikimr::NBlobDepot {
template<typename TEvent>
class TBlobStorageQuery : public TQuery {
public:
TBlobStorageQuery(TBlobDepotAgent& agent, std::unique_ptr<IEventHandle> event)
: TQuery(agent, std::move(event))
TBlobStorageQuery(TBlobDepotAgent& agent, std::unique_ptr<IEventHandle> event, TMonotonic received)
: TQuery(agent, std::move(event), received)
, Request(*Event->Get<TEvent>())
{
ExecutionRelay = std::move(Request.ExecutionRelay);
Expand All @@ -437,6 +473,7 @@ namespace NKikimr::NBlobDepot {
std::unique_ptr<IEventHandle> Event;
size_t Size;
TMonotonic ExpirationTimestamp;
TMonotonic Received;
};

std::deque<TPendingEvent> PendingEventQ;
Expand All @@ -448,13 +485,13 @@ namespace NKikimr::NBlobDepot {
TIntrusiveListWithAutoDelete<TQuery, TQuery::TDeleter, TExecutingQueries> DeletePendingQueries;
bool ProcessPendingEventInFlight = false;

template<ui32 EventType> TQuery *CreateQuery(std::unique_ptr<IEventHandle> ev);
template<ui32 EventType> TQuery *CreateQuery(std::unique_ptr<IEventHandle> ev, TMonotonic received);
void HandleStorageProxy(TAutoPtr<IEventHandle> ev);
void HandleAssimilate(TAutoPtr<IEventHandle> ev);
void HandlePendingEvent();
void HandleProcessPendingEvent();
void ClearPendingEventQueue(const TString& reason);
void ProcessStorageEvent(std::unique_ptr<IEventHandle> ev);
void ProcessStorageEvent(std::unique_ptr<IEventHandle> ev, TMonotonic received);
void HandlePendingEventQueueWatchdog();
void Handle(TEvBlobStorage::TEvBunchOfEvents::TPtr ev);
void HandleQueryWatchdog();
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/blob_depot/agent/comm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace NKikimr::NBlobDepot {
ConnectToBlobDepot();
} else {
PipeServerId = msg.ServerId;
SwitchMode(EMode::Registering);
}
}

Expand All @@ -34,6 +35,7 @@ namespace NKikimr::NBlobDepot {
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA05, "ConnectToBlobDepot", (AgentId, LogId), (PipeId, PipeId), (RequestId, id));
NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvRegisterAgent(VirtualGroupId, AgentInstanceId), id);
RegisterRequest(id, this, nullptr, {}, true);
SwitchMode(EMode::ConnectPending);
}

void TBlobDepotAgent::Handle(TRequestContext::TPtr /*context*/, NKikimrBlobDepot::TEvRegisterAgentResult& msg) {
Expand Down Expand Up @@ -147,6 +149,8 @@ namespace NKikimr::NBlobDepot {

void TBlobDepotAgent::OnConnect() {
IsConnected = true;
SwitchMode(EMode::Connected);

HandlePendingEvent();
}

Expand All @@ -163,6 +167,7 @@ namespace NKikimr::NBlobDepot {

ClearPendingEventQueue("BlobDepot tablet disconnected");

SwitchMode(EMode::None);
IsConnected = false;
}

Expand Down
Loading
Loading