diff --git a/ydb/core/base/counters.cpp b/ydb/core/base/counters.cpp index 915810b80074..2222070449e3 100644 --- a/ydb/core/base/counters.cpp +++ b/ydb/core/base/counters.cpp @@ -7,7 +7,9 @@ LWTRACE_DEFINE_PROVIDER(MONITORING_PROVIDER) namespace NKikimr { static const THashSet DATABASE_SERVICES - = {{ TString("compile"), + = {{ + TString("blob_depot_agent"), + TString("compile"), TString("coordinator"), TString("dsproxy"), TString("dsproxy_mon"), diff --git a/ydb/core/blob_depot/agent/agent.cpp b/ydb/core/blob_depot/agent/agent.cpp index a672ce383295..c8fd22e9b5e5 100644 --- a/ydb/core/blob_depot/agent/agent.cpp +++ b/ydb/core/blob_depot/agent/agent.cpp @@ -31,6 +31,8 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::Bootstrap() { Become(&TThis::StateFunc); + SetupCounters(); + if (TabletId && TabletId != Max()) { ConnectToBlobDepot(); } @@ -40,6 +42,75 @@ namespace NKikimr::NBlobDepot { HandlePushMetrics(); } + void TBlobDepotAgent::SetupCounters() { + AgentCounters = GetServiceCounters(AppData()->Counters, "blob_depot_agent") + ->GetSubgroup("group", ::ToString(VirtualGroupId)); + + auto connectivity = AgentCounters->GetSubgroup("subsystem", "connectivity"); + + ModeConnectPending = connectivity->GetCounter("Mode/ConnectPending", false); + ModeRegistering = connectivity->GetCounter("Mode/Registering", false); + ModeConnected = connectivity->GetCounter("Mode/Connected", false); + + auto pendingEventQueue = AgentCounters->GetSubgroup("subsystem", "pendingEventQueue"); + + PendingEventQueueItems = pendingEventQueue->GetCounter("Items", false); + PendingEventQueueBytes = pendingEventQueue->GetCounter("Bytes", false); + + auto requests = AgentCounters->GetSubgroup("subsystem", "requests"); + + auto makeHist = [&] { + return NMonitoring::ExplicitHistogram({ + 0.25, 0.5, + 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, + 1024, 2048, 4096, 8192, 16384, 32768, + 65536 + }); + }; + +#define XX(ITEM) \ + do { \ + auto subgroup = requests->GetSubgroup("request", #ITEM); \ + RequestsReceived[TEvBlobStorage::ITEM] = subgroup->GetCounter("Received", true); \ + SuccessResponseTime[TEvBlobStorage::ITEM] = subgroup->GetNamedHistogram("sensor", "SuccessResponseTime_us", makeHist()); \ + ErrorResponseTime[TEvBlobStorage::ITEM] = subgroup->GetNamedHistogram("sensor", "ErrorResponseTime_us", makeHist()); \ + } while (false); + + ENUMERATE_INCOMING_EVENTS(XX) +#undef XX + + auto s3 = AgentCounters->GetSubgroup("subsystem", "s3"); + + S3GetBytesOk = s3->GetCounter("GetBytesOk", true); + S3GetsOk = s3->GetCounter("GetsOk", true); + S3GetsError = s3->GetCounter("GetsError", true); + + S3PutBytesOk = s3->GetCounter("PutBytesOk", true); + S3PutsOk = s3->GetCounter("PutsOk", true); + S3PutsError = s3->GetCounter("PutsError", true); + } + + void TBlobDepotAgent::SwitchMode(EMode mode) { + auto getCounter = [&](EMode mode) -> NMonitoring::TCounterForPtr* { + switch (mode) { + case EMode::None: return nullptr; + case EMode::ConnectPending: return ModeConnectPending.Get(); + case EMode::Registering: return ModeRegistering.Get(); + case EMode::Connected: return ModeConnected.Get(); + } + }; + + if (Mode != mode) { + if (auto *p = getCounter(Mode)) { + --*p; + } + if (auto *p = getCounter(mode)) { + ++*p; + } + Mode = mode; + } + } + IActor *CreateBlobDepotAgent(ui32 virtualGroupId, TIntrusivePtr info, TActorId proxyId) { return new TBlobDepotAgent(virtualGroupId, std::move(info), proxyId); } diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h index 6adf340e63a1..2b061ce95393 100644 --- a/ydb/core/blob_depot/agent/agent_impl.h +++ b/ydb/core/blob_depot/agent/agent_impl.h @@ -195,6 +195,37 @@ namespace NKikimr::NBlobDepot { TActorId PipeServerId; bool IsConnected = false; + NMonitoring::TDynamicCounterPtr AgentCounters; + + NMonitoring::TDynamicCounters::TCounterPtr ModeConnectPending; + NMonitoring::TDynamicCounters::TCounterPtr ModeRegistering; + NMonitoring::TDynamicCounters::TCounterPtr ModeConnected; + + NMonitoring::TDynamicCounters::TCounterPtr PendingEventQueueItems; + NMonitoring::TDynamicCounters::TCounterPtr PendingEventQueueBytes; + + THashMap RequestsReceived; + THashMap SuccessResponseTime; + THashMap ErrorResponseTime; + + NMonitoring::TDynamicCounters::TCounterPtr S3GetBytesOk; + NMonitoring::TDynamicCounters::TCounterPtr S3GetsOk; + NMonitoring::TDynamicCounters::TCounterPtr S3GetsError; + NMonitoring::TDynamicCounters::TCounterPtr S3PutBytesOk; + NMonitoring::TDynamicCounters::TCounterPtr S3PutsOk; + NMonitoring::TDynamicCounters::TCounterPtr S3PutsError; + + enum class EMode { + None, + ConnectPending, + Registering, + Connected + }; + + EMode Mode = EMode::None; + + void SwitchMode(EMode mode); + private: struct TEvPrivate { enum { @@ -258,6 +289,9 @@ namespace NKikimr::NBlobDepot { void PassAway() override { ClearPendingEventQueue("BlobDepot agent destroyed"); + if (AgentCounters) { + GetServiceCounters(AppData()->Counters, "blob_depot_agent")->RemoveSubgroup("group", ::ToString(VirtualGroupId)); + } NTabletPipe::CloseAndForgetClient(SelfId(), PipeId); if (S3WrapperId) { TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, S3WrapperId, SelfId(), nullptr, 0)); @@ -332,6 +366,7 @@ namespace NKikimr::NBlobDepot { void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev); void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev); + void SetupCounters(); void ConnectToBlobDepot(); void OnConnect(); void OnDisconnect(); @@ -361,6 +396,7 @@ namespace NKikimr::NBlobDepot { { protected: std::unique_ptr Event; // original query event + const TMonotonic Received; const ui64 QueryId; mutable TString QueryIdString; const TMonotonic StartTime; @@ -373,7 +409,7 @@ namespace NKikimr::NBlobDepot { static constexpr TDuration WatchdogDuration = TDuration::Seconds(10); public: - TQuery(TBlobDepotAgent& agent, std::unique_ptr event); + TQuery(TBlobDepotAgent& agent, std::unique_ptr event, TMonotonic received); virtual ~TQuery(); void CheckQueryExecutionTime(TMonotonic now); @@ -422,8 +458,8 @@ namespace NKikimr::NBlobDepot { template class TBlobStorageQuery : public TQuery { public: - TBlobStorageQuery(TBlobDepotAgent& agent, std::unique_ptr event) - : TQuery(agent, std::move(event)) + TBlobStorageQuery(TBlobDepotAgent& agent, std::unique_ptr event, TMonotonic received) + : TQuery(agent, std::move(event), received) , Request(*Event->Get()) { ExecutionRelay = std::move(Request.ExecutionRelay); @@ -437,6 +473,7 @@ namespace NKikimr::NBlobDepot { std::unique_ptr Event; size_t Size; TMonotonic ExpirationTimestamp; + TMonotonic Received; }; std::deque PendingEventQ; @@ -448,13 +485,13 @@ namespace NKikimr::NBlobDepot { TIntrusiveListWithAutoDelete DeletePendingQueries; bool ProcessPendingEventInFlight = false; - template TQuery *CreateQuery(std::unique_ptr ev); + template TQuery *CreateQuery(std::unique_ptr ev, TMonotonic received); void HandleStorageProxy(TAutoPtr ev); void HandleAssimilate(TAutoPtr ev); void HandlePendingEvent(); void HandleProcessPendingEvent(); void ClearPendingEventQueue(const TString& reason); - void ProcessStorageEvent(std::unique_ptr ev); + void ProcessStorageEvent(std::unique_ptr ev, TMonotonic received); void HandlePendingEventQueueWatchdog(); void Handle(TEvBlobStorage::TEvBunchOfEvents::TPtr ev); void HandleQueryWatchdog(); diff --git a/ydb/core/blob_depot/agent/comm.cpp b/ydb/core/blob_depot/agent/comm.cpp index 82abae697911..add0ec5de77d 100644 --- a/ydb/core/blob_depot/agent/comm.cpp +++ b/ydb/core/blob_depot/agent/comm.cpp @@ -14,6 +14,7 @@ namespace NKikimr::NBlobDepot { ConnectToBlobDepot(); } else { PipeServerId = msg.ServerId; + SwitchMode(EMode::Registering); } } @@ -34,6 +35,7 @@ namespace NKikimr::NBlobDepot { STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA05, "ConnectToBlobDepot", (AgentId, LogId), (PipeId, PipeId), (RequestId, id)); NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvRegisterAgent(VirtualGroupId, AgentInstanceId), id); RegisterRequest(id, this, nullptr, {}, true); + SwitchMode(EMode::ConnectPending); } void TBlobDepotAgent::Handle(TRequestContext::TPtr /*context*/, NKikimrBlobDepot::TEvRegisterAgentResult& msg) { @@ -147,6 +149,8 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::OnConnect() { IsConnected = true; + SwitchMode(EMode::Connected); + HandlePendingEvent(); } @@ -163,6 +167,7 @@ namespace NKikimr::NBlobDepot { ClearPendingEventQueue("BlobDepot tablet disconnected"); + SwitchMode(EMode::None); IsConnected = false; } diff --git a/ydb/core/blob_depot/agent/query.cpp b/ydb/core/blob_depot/agent/query.cpp index 271451f932ac..7d3567177e81 100644 --- a/ydb/core/blob_depot/agent/query.cpp +++ b/ydb/core/blob_depot/agent/query.cpp @@ -4,9 +4,9 @@ namespace NKikimr::NBlobDepot { template<> - TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<0>(std::unique_ptr ev) { + TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<0>(std::unique_ptr ev, TMonotonic received) { switch (ev->GetTypeRewrite()) { -#define XX(TYPE) case TEvBlobStorage::TYPE: return CreateQuery(std::move(ev)); +#define XX(TYPE) case TEvBlobStorage::TYPE: return CreateQuery(std::move(ev), received); ENUMERATE_INCOMING_EVENTS(XX) #undef XX } @@ -15,8 +15,10 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::HandleStorageProxy(TAutoPtr ev) { bool doForward = false; + const ui32 type = ev->GetTypeRewrite(); + TMonotonic received = TActivationContext::Monotonic(); - switch (ev->GetTypeRewrite()) { + switch (type) { case TEvBlobStorage::EvGet: doForward = ev->Get()->Decommission || ev->Get()->PhantomCheck; @@ -33,33 +35,43 @@ namespace NKikimr::NBlobDepot { if (ProxyId) { TActivationContext::Forward(ev, ProxyId); } else { - CreateQuery<0>(std::unique_ptr(ev.Release()))->EndWithError(NKikimrProto::ERROR, "proxy has vanished"); + CreateQuery<0>(std::unique_ptr(ev.Release()), received) + ->EndWithError(NKikimrProto::ERROR, "proxy has vanished"); } return; } + if (const auto it = RequestsReceived.find(type); it != RequestsReceived.end()) { + ++*it->second; + } else { + Y_DEBUG_ABORT(); + } + std::unique_ptr p(ev.Release()); size_t size = 0; if (!IsConnected) { // check for queue overflow - switch (p->GetTypeRewrite()) { + switch (type) { #define XX(TYPE) case TEvBlobStorage::TYPE: size = p->Get()->CalculateSize(); break; ENUMERATE_INCOMING_EVENTS(XX) #undef XX } if (size + PendingEventBytes > MaxPendingEventBytes) { - CreateQuery<0>(std::move(p))->EndWithError(NKikimrProto::ERROR, "pending event queue overflow"); + CreateQuery<0>(std::move(p), received)->EndWithError(NKikimrProto::ERROR, "pending event queue overflow"); return; } } if (!IsConnected || !PendingEventQ.empty()) { PendingEventBytes += size; - PendingEventQ.push_back(TPendingEvent{std::move(p), size, TMonotonic::Now() + EventExpirationTime}); + PendingEventQ.push_back(TPendingEvent{std::move(p), size, received + EventExpirationTime, received}); + + ++*PendingEventQueueItems; + *PendingEventQueueBytes += size; } else { - ProcessStorageEvent(std::move(p)); + ProcessStorageEvent(std::move(p), received); } } @@ -70,9 +82,13 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::HandlePendingEvent() { for (THPTimer timer; !PendingEventQ.empty(); ) { TPendingEvent& item = PendingEventQ.front(); - ProcessStorageEvent(std::move(item.Event)); + ProcessStorageEvent(std::move(item.Event), item.Received); Y_ABORT_UNLESS(PendingEventBytes >= item.Size); PendingEventBytes -= item.Size; + + --*PendingEventQueueItems; + *PendingEventQueueBytes -= item.Size; + PendingEventQ.pop_front(); if (!PendingEventQ.empty() && TDuration::Seconds(timer.Passed()) >= TDuration::MilliSeconds(1)) { if (!ProcessPendingEventInFlight) { @@ -94,12 +110,16 @@ namespace NKikimr::NBlobDepot { for (auto& item : std::exchange(PendingEventQ, {})) { Y_ABORT_UNLESS(PendingEventBytes >= item.Size); PendingEventBytes -= item.Size; - CreateQuery<0>(std::move(item.Event))->EndWithError(NKikimrProto::ERROR, reason); + CreateQuery<0>(std::move(item.Event), item.Received)->EndWithError(NKikimrProto::ERROR, reason); } + + Y_ABORT_UNLESS(!PendingEventBytes); + *PendingEventQueueItems = 0; + *PendingEventQueueBytes = 0; } - void TBlobDepotAgent::ProcessStorageEvent(std::unique_ptr ev) { - TQuery *query = CreateQuery<0>(std::move(ev)); + void TBlobDepotAgent::ProcessStorageEvent(std::unique_ptr ev, TMonotonic received) { + TQuery *query = CreateQuery<0>(std::move(ev), received); STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA13, "new query", (AgentId, LogId), (QueryId, query->GetQueryId()), (Name, query->GetName())); if (!TabletId) { @@ -113,11 +133,18 @@ namespace NKikimr::NBlobDepot { if (!IsConnected) { const TMonotonic now = TActivationContext::Monotonic(); std::deque::iterator it; + size_t numItems = 0; + ui64 numBytes = 0; for (it = PendingEventQ.begin(); it != PendingEventQ.end() && it->ExpirationTimestamp <= now; ++it) { - CreateQuery<0>(std::move(it->Event))->EndWithError(NKikimrProto::ERROR, "pending event queue timeout"); + CreateQuery<0>(std::move(it->Event), it->Received) + ->EndWithError(NKikimrProto::ERROR, "pending event queue timeout"); PendingEventBytes -= it->Size; + ++numItems; + numBytes += it->Size; } PendingEventQ.erase(PendingEventQ.begin(), it); + *PendingEventQueueItems -= numItems; + *PendingEventQueueBytes -= numBytes; } TActivationContext::Schedule(TDuration::Seconds(1), new IEventHandle(TEvPrivate::EvPendingEventQueueWatchdog, 0, @@ -142,9 +169,10 @@ namespace NKikimr::NBlobDepot { {}, nullptr, 0)); } - TBlobDepotAgent::TQuery::TQuery(TBlobDepotAgent& agent, std::unique_ptr event) + TBlobDepotAgent::TQuery::TQuery(TBlobDepotAgent& agent, std::unique_ptr event, TMonotonic received) : TRequestSender(agent) , Event(std::move(event)) + , Received(received) , QueryId(RandomNumber()) , StartTime(TActivationContext::Monotonic()) , QueryWatchdogMapIter(agent.QueryWatchdogMap.emplace(StartTime + WatchdogDuration, this)) @@ -172,7 +200,15 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::TQuery::EndWithError(NKikimrProto::EReplyStatus status, const TString& errorReason) { STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA14, "query ends with error", (AgentId, Agent.LogId), (QueryId, GetQueryId()), (Status, status), (ErrorReason, errorReason), - (Duration, TActivationContext::Monotonic() - StartTime)); + (Duration, TActivationContext::Monotonic() - Received)); + + if (const auto it = Agent.ErrorResponseTime.find(Event->GetTypeRewrite()); it != Agent.ErrorResponseTime.end()) { + const TMonotonic now = TActivationContext::Monotonic(); + const TDuration passed = now - Received; + it->second->Collect(passed.MillisecondsFloat(), 1); + } else { + Y_DEBUG_ABORT(); + } std::unique_ptr response; switch (Event->GetTypeRewrite()) { @@ -194,7 +230,18 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::TQuery::EndWithSuccess(std::unique_ptr response) { STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA15, "query ends with success", (AgentId, Agent.LogId), - (QueryId, GetQueryId()), (Response, response->ToString()), (Duration, TActivationContext::Monotonic() - StartTime)); + (QueryId, GetQueryId()), + (Response, response->ToString()), + (Duration, TActivationContext::Monotonic() - Received)); + + if (const auto it = Agent.SuccessResponseTime.find(Event->GetTypeRewrite()); it != Agent.SuccessResponseTime.end()) { + const TMonotonic now = TActivationContext::Monotonic(); + const TDuration passed = now - Received; + it->second->Collect(passed.MillisecondsFloat(), 1); + } else { + Y_DEBUG_ABORT(); + } + switch (response->Type()) { #define XX(TYPE) \ case TEvBlobStorage::TYPE##Result: \ diff --git a/ydb/core/blob_depot/agent/read.cpp b/ydb/core/blob_depot/agent/read.cpp index 653ef5f5252d..085c87cd1017 100644 --- a/ydb/core/blob_depot/agent/read.cpp +++ b/ydb/core/blob_depot/agent/read.cpp @@ -182,8 +182,15 @@ namespace NKikimr::NBlobDepot { TString QueryId; ui64 ReadId; + NMonitoring::TDynamicCounters::TCounterPtr GetBytesOk; + NMonitoring::TDynamicCounters::TCounterPtr GetsOk; + NMonitoring::TDynamicCounters::TCounterPtr GetsError; + public: - TGetActor(size_t outputOffset, std::shared_ptr readContext, TQuery *query) + TGetActor(size_t outputOffset, std::shared_ptr readContext, TQuery *query, + NMonitoring::TDynamicCounters::TCounterPtr getBytesOk, + NMonitoring::TDynamicCounters::TCounterPtr getsOk, + NMonitoring::TDynamicCounters::TCounterPtr getsError) : TActor(&TThis::StateFunc) , OutputOffset(outputOffset) , ReadContext(std::move(readContext)) @@ -191,6 +198,9 @@ namespace NKikimr::NBlobDepot { , AgentLogId(query->Agent.LogId) , QueryId(query->GetQueryId()) , ReadId(ReadContext->GetTag()) + , GetBytesOk(std::move(getBytesOk)) + , GetsOk(std::move(getsOk)) + , GetsError(std::move(getsError)) {} void Handle(NWrappers::TEvExternalStorage::TEvGetObjectResponse::TPtr ev) { @@ -200,6 +210,13 @@ namespace NKikimr::NBlobDepot { (AgentId, AgentLogId), (QueryId, QueryId), (ReadId, ReadId), (Response, msg.Result), (BodyLen, std::size(msg.Body))); + if (msg.IsSuccess()) { + ++*GetsOk; + *GetBytesOk += msg.Body.size(); + } else { + ++*GetsError; + } + if (msg.IsSuccess()) { Finish(std::move(msg.Body), ""); } else if (const auto& error = msg.GetError(); error.GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY) { @@ -240,7 +257,8 @@ namespace NKikimr::NBlobDepot { STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA57, "starting S3 read", (AgentId, Agent.LogId), (QueryId, GetQueryId()), (ReadId, context->GetTag()), (Key, item.Key), (Offset, item.Offset), (Size, item.Size), (OutputOffset, item.OutputOffset)); - const TActorId actorId = Agent.RegisterWithSameMailbox(new TGetActor(item.OutputOffset, context, this)); + const TActorId actorId = Agent.RegisterWithSameMailbox(new TGetActor(item.OutputOffset, context, this, + Agent.S3GetBytesOk, Agent.S3GetsOk, Agent.S3GetsError)); auto request = std::make_unique( Aws::S3::Model::GetObjectRequest() .WithBucket(Agent.S3BackendSettings->GetSettings().GetBucket()) diff --git a/ydb/core/blob_depot/agent/storage_block.cpp b/ydb/core/blob_depot/agent/storage_block.cpp index 6afb7a80cf04..5807619746cd 100644 --- a/ydb/core/blob_depot/agent/storage_block.cpp +++ b/ydb/core/blob_depot/agent/storage_block.cpp @@ -4,7 +4,8 @@ namespace NKikimr::NBlobDepot { template<> - TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery(std::unique_ptr ev) { + TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery(std::unique_ptr ev, + TMonotonic received) { class TBlockQuery : public TBlobStorageQuery { struct TBlockContext : TRequestContext { TMonotonic Timestamp; @@ -63,7 +64,7 @@ namespace NKikimr::NBlobDepot { } }; - return new TBlockQuery(*this, std::move(ev)); + return new TBlockQuery(*this, std::move(ev), received); } } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/storage_collect_garbage.cpp b/ydb/core/blob_depot/agent/storage_collect_garbage.cpp index 3f7620da3c10..e5e38e329ed1 100644 --- a/ydb/core/blob_depot/agent/storage_collect_garbage.cpp +++ b/ydb/core/blob_depot/agent/storage_collect_garbage.cpp @@ -3,7 +3,8 @@ namespace NKikimr::NBlobDepot { template<> - TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery(std::unique_ptr ev) { + TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery(std::unique_ptr ev, + TMonotonic received) { class TCollectGarbageQuery : public TBlobStorageQuery { ui32 KeepIndex = 0; ui32 NumKeep; @@ -110,7 +111,7 @@ namespace NKikimr::NBlobDepot { } }; - return new TCollectGarbageQuery(*this, std::move(ev)); + return new TCollectGarbageQuery(*this, std::move(ev), received); } } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/storage_discover.cpp b/ydb/core/blob_depot/agent/storage_discover.cpp index 5cfec5491b31..37c7c4d5be1f 100644 --- a/ydb/core/blob_depot/agent/storage_discover.cpp +++ b/ydb/core/blob_depot/agent/storage_discover.cpp @@ -4,7 +4,8 @@ namespace NKikimr::NBlobDepot { template<> - TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery(std::unique_ptr ev) { + TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery(std::unique_ptr ev, + TMonotonic received) { class TDiscoverQuery : public TBlobStorageQuery { bool DoneWithBlockedGeneration = false; bool DoneWithData = false; @@ -182,7 +183,7 @@ namespace NKikimr::NBlobDepot { } }; - return new TDiscoverQuery(*this, std::move(ev)); + return new TDiscoverQuery(*this, std::move(ev), received); } } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/storage_get.cpp b/ydb/core/blob_depot/agent/storage_get.cpp index 4891c66cd67d..7d0e7bdc185a 100644 --- a/ydb/core/blob_depot/agent/storage_get.cpp +++ b/ydb/core/blob_depot/agent/storage_get.cpp @@ -4,7 +4,8 @@ namespace NKikimr::NBlobDepot { template<> - TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery(std::unique_ptr ev) { + TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery(std::unique_ptr ev, + TMonotonic received) { class TGetQuery : public TBlobStorageQuery { std::unique_ptr Response; ui32 AnswersRemain; @@ -213,7 +214,7 @@ namespace NKikimr::NBlobDepot { } }; - return new TGetQuery(*this, std::move(ev)); + return new TGetQuery(*this, std::move(ev), received); } } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/storage_get_block.cpp b/ydb/core/blob_depot/agent/storage_get_block.cpp index 60765ab1f71d..aadfd8b9d6b7 100644 --- a/ydb/core/blob_depot/agent/storage_get_block.cpp +++ b/ydb/core/blob_depot/agent/storage_get_block.cpp @@ -3,7 +3,8 @@ namespace NKikimr::NBlobDepot { template<> - TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery(std::unique_ptr ev) { + TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery(std::unique_ptr ev, + TMonotonic received) { class TGetBlockQuery : public TBlobStorageQuery { ui32 BlockedGeneration = 0; @@ -41,7 +42,7 @@ namespace NKikimr::NBlobDepot { return Request.TabletId; } }; - return new TGetBlockQuery(*this, std::move(ev)); + return new TGetBlockQuery(*this, std::move(ev), received); } } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/storage_patch.cpp b/ydb/core/blob_depot/agent/storage_patch.cpp index 1fbb328f345a..dceba968d971 100644 --- a/ydb/core/blob_depot/agent/storage_patch.cpp +++ b/ydb/core/blob_depot/agent/storage_patch.cpp @@ -3,7 +3,8 @@ namespace NKikimr::NBlobDepot { template<> - TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery(std::unique_ptr ev) { + TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery(std::unique_ptr ev, + TMonotonic received) { class TPatchQuery : public TBlobStorageQuery { public: using TBlobStorageQuery::TBlobStorageQuery; @@ -18,7 +19,7 @@ namespace NKikimr::NBlobDepot { } }; - return new TPatchQuery(*this, std::move(ev)); + return new TPatchQuery(*this, std::move(ev), received); } } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/storage_put.cpp b/ydb/core/blob_depot/agent/storage_put.cpp index 5ead9d58a5e3..9a5267abfa02 100644 --- a/ydb/core/blob_depot/agent/storage_put.cpp +++ b/ydb/core/blob_depot/agent/storage_put.cpp @@ -3,7 +3,8 @@ namespace NKikimr::NBlobDepot { template<> - TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery(std::unique_ptr ev) { + TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery(std::unique_ptr ev, + TMonotonic received) { class TPutQuery : public TBlobStorageQuery { const bool SuppressFooter = true; const bool IssueUncertainWrites = false; @@ -418,17 +419,22 @@ namespace NKikimr::NBlobDepot { WriterActorId = {}; if (error) { + ++*Agent.S3PutsError; + // LocatorInFlight is not reset here on purpose: OnDestroy will generate spoiled blob message to the // tablet EndWithError(NKikimrProto::ERROR, TStringBuilder() << "failed to put object to S3: " << *error); } else { + ++*Agent.S3PutsOk; + *Agent.S3PutBytesOk += LocatorInFlight->Len; + LocatorInFlight.reset(); IssueCommitBlobSeq(false); } } }; - return new TPutQuery(*this, std::move(ev)); + return new TPutQuery(*this, std::move(ev), received); } } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/storage_range.cpp b/ydb/core/blob_depot/agent/storage_range.cpp index 17d6e0cf6cfb..ee9badc5e496 100644 --- a/ydb/core/blob_depot/agent/storage_range.cpp +++ b/ydb/core/blob_depot/agent/storage_range.cpp @@ -3,7 +3,8 @@ namespace NKikimr::NBlobDepot { template<> - TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery(std::unique_ptr ev) { + TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery(std::unique_ptr ev, + TMonotonic received) { class TRangeQuery : public TBlobStorageQuery { struct TRead { TLogoBlobID Id; @@ -179,7 +180,7 @@ namespace NKikimr::NBlobDepot { } }; - return new TRangeQuery(*this, std::move(ev)); + return new TRangeQuery(*this, std::move(ev), received); } } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/storage_status.cpp b/ydb/core/blob_depot/agent/storage_status.cpp index 5b7713db95be..8cce89c8cd2b 100644 --- a/ydb/core/blob_depot/agent/storage_status.cpp +++ b/ydb/core/blob_depot/agent/storage_status.cpp @@ -3,7 +3,8 @@ namespace NKikimr::NBlobDepot { template<> - TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery(std::unique_ptr ev) { + TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery(std::unique_ptr ev, + TMonotonic received) { class TStatusQuery : public TBlobStorageQuery { public: using TBlobStorageQuery::TBlobStorageQuery; @@ -18,7 +19,7 @@ namespace NKikimr::NBlobDepot { } }; - return new TStatusQuery(*this, std::move(ev)); + return new TStatusQuery(*this, std::move(ev), received); } } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/assimilator.cpp b/ydb/core/blob_depot/assimilator.cpp index b59358ca1126..45b7c68f63c4 100644 --- a/ydb/core/blob_depot/assimilator.cpp +++ b/ydb/core/blob_depot/assimilator.cpp @@ -264,6 +264,7 @@ namespace NKikimr::NBlobDepot { } void Complete(const TActorContext&) override { + Self->Self->OnUpdateDecommitState(); Self->Self->Data->CommitTrash(this); Self->UpdateAssimilatorPosition(); @@ -539,6 +540,7 @@ namespace NKikimr::NBlobDepot { } void Complete(const TActorContext&) override { + Self->Self->OnUpdateDecommitState(); Self->ActionInProgress = false; Self->Action(); } @@ -592,7 +594,9 @@ namespace NKikimr::NBlobDepot { return true; } - void Complete(const TActorContext&) override {} + void Complete(const TActorContext&) override { + Self->OnUpdateDecommitState(); + } }; Self->GroupAssimilatorId = {}; @@ -706,4 +710,13 @@ namespace NKikimr::NBlobDepot { } } + void TBlobDepot::OnUpdateDecommitState() { + auto&& c = TabletCounters->Simple(); + const bool d = Configured && Config.GetIsDecommittingGroup(); // is decommission enabled for this tablet? + using E = EDecommitState; + c[NKikimrBlobDepot::COUNTER_DECOMMIT_MODE_PREPARING] = d && DecommitState < E::BlocksFinished; + c[NKikimrBlobDepot::COUNTER_DECOMMIT_MODE_IN_PROGRESS] = d && E::BlocksFinished <= DecommitState && DecommitState < E::Done; + c[NKikimrBlobDepot::COUNTER_DECOMMIT_MODE_DONE] = d && E::Done <= DecommitState; + } + } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h index b881a588dde3..a51b7f0708ca 100644 --- a/ydb/core/blob_depot/blob_depot_tablet.h +++ b/ydb/core/blob_depot/blob_depot_tablet.h @@ -168,6 +168,7 @@ namespace NKikimr::NBlobDepot { void OnActivateExecutor(const TActorContext&) override { STLOG(PRI_DEBUG, BLOB_DEPOT, BDT24, "OnActivateExecutor", (Id, GetLogId())); Executor()->RegisterExternalTabletCounters(TabletCountersPtr); + TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_MODE_STARTING] = 1; ExecuteTxInitSchema(); } @@ -186,6 +187,8 @@ namespace NKikimr::NBlobDepot { StartDataLoad(); UpdateThroughputs(); InitS3Manager(); + TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_MODE_STARTING] = 0; + TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_MODE_LOADING_KEYS] = 1; } void StartDataLoad(); @@ -339,6 +342,7 @@ namespace NKikimr::NBlobDepot { class TGroupAssimilator; void StartGroupAssimilator(); + void OnUpdateDecommitState(); //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Group metrics exchange diff --git a/ydb/core/blob_depot/data_load.cpp b/ydb/core/blob_depot/data_load.cpp index 113a869fd7c5..9ef88d767b9c 100644 --- a/ydb/core/blob_depot/data_load.cpp +++ b/ydb/core/blob_depot/data_load.cpp @@ -167,6 +167,8 @@ namespace NKikimr::NBlobDepot { void TBlobDepot::OnDataLoadComplete() { BarrierServer->OnDataLoaded(); StartGroupAssimilator(); + TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_MODE_LOADING_KEYS] = 0; + TabletCounters->Simple()[NKikimrBlobDepot::COUNTER_MODE_LOADED] = 1; } } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/op_load.cpp b/ydb/core/blob_depot/op_load.cpp index 3b8a31cdd086..db5fa18a31e8 100644 --- a/ydb/core/blob_depot/op_load.cpp +++ b/ydb/core/blob_depot/op_load.cpp @@ -116,6 +116,8 @@ namespace NKikimr::NBlobDepot { STLOG(PRI_DEBUG, BLOB_DEPOT, BDT20, "TTxLoad::Complete", (Id, Self->GetLogId()), (Configured, Self->Configured)); + Self->OnUpdateDecommitState(); + if (Self->Configured) { Self->StartOperation(); } diff --git a/ydb/core/protos/counters_blob_depot.proto b/ydb/core/protos/counters_blob_depot.proto index 81d0c6cc4e41..ab4ed54f107b 100644 --- a/ydb/core/protos/counters_blob_depot.proto +++ b/ydb/core/protos/counters_blob_depot.proto @@ -13,6 +13,12 @@ enum ESimpleCounters { COUNTER_TOTAL_S3_DATA_SIZE = 5 [(NKikimr.CounterOpts) = {Name: "TotalS3DataSize"}]; COUNTER_TOTAL_S3_TRASH_OBJECTS = 6 [(NKikimr.CounterOpts) = {Name: "TotalS3TrashObjects"}]; COUNTER_TOTAL_S3_TRASH_SIZE = 7 [(NKikimr.CounterOpts) = {Name: "TotalS3TrashSize"}]; + COUNTER_MODE_STARTING = 8 [(NKikimr.CounterOpts) = {Name: "Mode/Starting"}]; + COUNTER_MODE_LOADING_KEYS = 9 [(NKikimr.CounterOpts) = {Name: "Mode/LoadingKeys"}]; + COUNTER_MODE_LOADED = 10 [(NKikimr.CounterOpts) = {Name: "Mode/Loaded"}]; + COUNTER_DECOMMIT_MODE_PREPARING = 11 [(NKikimr.CounterOpts) = {Name: "DecommitMode/Preparing"}]; + COUNTER_DECOMMIT_MODE_IN_PROGRESS = 12 [(NKikimr.CounterOpts) = {Name: "DecommitMode/InProgress"}]; + COUNTER_DECOMMIT_MODE_DONE = 13 [(NKikimr.CounterOpts) = {Name: "DecommitMode/Done"}]; } enum ECumulativeCounters {