From cf7b0428c25f2e453fc9823f9a48e90f889a570a Mon Sep 17 00:00:00 2001 From: Vitalii Gridnev Date: Tue, 19 Dec 2023 12:45:48 +0300 Subject: [PATCH] fix sequence requests lost and add debugging tools to sequence proxy --- ydb/core/kqp/runtime/kqp_sequencer_actor.cpp | 2 +- ydb/core/kqp/session_actor/kqp_query_state.h | 6 ++- .../kqp/session_actor/kqp_session_actor.cpp | 11 ++-- .../sequenceproxy/sequenceproxy_allocate.cpp | 1 + .../tx/sequenceproxy/sequenceproxy_impl.cpp | 53 +++++++++++++++---- .../tx/sequenceproxy/sequenceproxy_impl.h | 22 +++++++- 6 files changed, 76 insertions(+), 19 deletions(-) diff --git a/ydb/core/kqp/runtime/kqp_sequencer_actor.cpp b/ydb/core/kqp/runtime/kqp_sequencer_actor.cpp index 87616ce4877e..3c0c9cc9708c 100644 --- a/ydb/core/kqp/runtime/kqp_sequencer_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_sequencer_actor.cpp @@ -183,7 +183,7 @@ class TKqpSequencerActor : public NActors::TActorBootstrapped 0 && WaitingReplies == 0) { Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); } diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 209224b19755..0eb2c0ee046b 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -15,6 +15,8 @@ #include #include +#include + #include #include @@ -33,7 +35,7 @@ class TKqpQueryState : public TNonCopyable { TKqpQueryState(TEvKqp::TEvQueryRequest::TPtr& ev, ui64 queryId, const TString& database, const TString& cluster, TKqpDbCountersPtr dbCounters, bool longSession, const NKikimrConfig::TTableServiceConfig& tableServiceConfig, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, - NWilson::TTraceId&& traceId, const TString& sessionId) + NWilson::TTraceId&& traceId, const TString& sessionId, TMonotonic startedAt) : QueryId(queryId) , Database(database) , Cluster(cluster) @@ -46,6 +48,7 @@ class TKqpQueryState : public TNonCopyable { , StartTime(TInstant::Now()) , KeepSession(ev->Get()->GetKeepSession() || longSession) , UserToken(ev->Get()->GetUserToken()) + , StartedAt(startedAt) { RequestEv.reset(ev->Release().Release()); @@ -98,6 +101,7 @@ class TKqpQueryState : public TNonCopyable { NKqpProto::TKqpStatsQuery Stats; bool KeepSession = false; TIntrusiveConstPtr UserToken; + NActors::TMonotonic StartedAt; THashMap TableVersions; diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 8264f7327b2c..beb05142a3c0 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -218,7 +218,8 @@ class TKqpSessionActor : public TActorBootstrapped { ev->Get()->SetClientLostAction(selfId, as); QueryState = std::make_shared( ev, QueryId, Settings.Database, Settings.Cluster, Settings.DbCounters, Settings.LongSession, - Settings.TableService, Settings.QueryService, std::move(id), SessionId); + Settings.TableService, Settings.QueryService, std::move(id), SessionId, + AppData()->MonotonicTimeProvider->Now()); if (QueryState->UserRequestContext->TraceId.empty()) { QueryState->UserRequestContext->TraceId = UlidGen.Next().ToString(); } @@ -1309,10 +1310,12 @@ class TKqpSessionActor : public TActorBootstrapped { TString logMsg = TStringBuilder() << "got TEvAbortExecution in " << CurrentStateFuncName(); LOG_I(logMsg << ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode()) << " send to: " << ExecuterId); + TString reason = TStringBuilder() << "Request timeout exceeded, cancelling after " + << (AppData()->MonotonicTimeProvider->Now() - QueryState->StartedAt).MilliSeconds() + << " milliseconds."; + if (ExecuterId) { - auto abortEv = MakeHolder( - msg.GetStatusCode(), - "Request timeout exceeded"); + auto abortEv = MakeHolder(msg.GetStatusCode(), reason); Send(ExecuterId, abortEv.Release(), IEventHandle::FlagTrackDelivery); } else { const auto& issues = ev->Get()->GetIssues(); diff --git a/ydb/core/tx/sequenceproxy/sequenceproxy_allocate.cpp b/ydb/core/tx/sequenceproxy/sequenceproxy_allocate.cpp index 88b8e1ba60f1..be9da3e15049 100644 --- a/ydb/core/tx/sequenceproxy/sequenceproxy_allocate.cpp +++ b/ydb/core/tx/sequenceproxy/sequenceproxy_allocate.cpp @@ -142,6 +142,7 @@ namespace NSequenceProxy { auto& info = AllocateInFlight[cookie]; info.Database = database; info.PathId = pathId; + Counters->SequenceShardAllocateCount->Collect(cache); Register(new TAllocateActor(SelfId(), cookie, tabletId, pathId, cache)); return cookie; } diff --git a/ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp b/ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp index d9faf8d5111f..ed193ac9b640 100644 --- a/ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp +++ b/ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp @@ -1,5 +1,6 @@ #include "sequenceproxy_impl.h" +#include #include #include @@ -15,7 +16,21 @@ namespace NKikimr { namespace NSequenceProxy { + TSequenceProxyCounters::TSequenceProxyCounters() { + auto group = GetServiceCounters(AppData()->Counters, "proxy"); + SequenceShardAllocateCount = group->GetHistogram( + "SequenceProxy/SequenceShard/AllocateCountPerRequest", + NMonitoring::ExponentialHistogram(20, 2, 1)); + + ErrorsCount = group->GetCounter("SequenceProxy/Errors", true); + RequestCount = group->GetCounter("SequenceProxy/Requests", true); + ResponseCount = group->GetCounter("SequenceProxy/Responses", true); + NextValLatency = group->GetHistogram("SequenceProxy/Latency", + NMonitoring::ExponentialHistogram(20, 2, 1)); + }; + void TSequenceProxy::Bootstrap() { + Counters.Reset(new TSequenceProxyCounters()); LogPrefix = TStringBuilder() << "TSequenceProxy [Node " << SelfId().NodeId() << "] "; Become(&TThis::StateWork); } @@ -30,6 +45,7 @@ namespace NSequenceProxy { request.Sender = ev->Sender; request.Cookie = ev->Cookie; request.UserToken = std::move(msg->UserToken); + request.StartAt = AppData()->MonotonicTimeProvider->Now(); std::visit( [&](const auto& path) { DoNextVal(std::move(request), msg->Database, path); @@ -37,6 +53,21 @@ namespace NSequenceProxy { msg->Path); } + void TSequenceProxy::Reply(const TNextValRequestInfo& request, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) { + Counters->ResponseCount->Inc(); + auto milliseconds = (AppData()->MonotonicTimeProvider->Now() - request.StartAt).MilliSeconds(); + Counters->NextValLatency->Collect(milliseconds); + Counters->ErrorsCount->Inc(); + Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(status, issues), 0, request.Cookie); + } + + void TSequenceProxy::Reply(const TNextValRequestInfo& request, const TPathId& pathId, i64 value) { + Counters->ResponseCount->Inc(); + auto milliseconds = (AppData()->MonotonicTimeProvider->Now() - request.StartAt).MilliSeconds(); + Counters->NextValLatency->Collect(milliseconds); + Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(pathId, value), 0, request.Cookie); + } + void TSequenceProxy::MaybeStartResolve(const TString& database, const TString& path, TSequenceByName& info) { if (!info.ResolveInProgress && !info.NewNextValResolve.empty()) { info.PendingNextValResolve = std::move(info.NewNextValResolve); @@ -46,12 +77,15 @@ namespace NSequenceProxy { } void TSequenceProxy::DoNextVal(TNextValRequestInfo&& request, const TString& database, const TString& path) { + Counters->RequestCount->Inc(); auto& info = Databases[database].SequenceByName[path]; info.NewNextValResolve.emplace_back(std::move(request)); MaybeStartResolve(database, path, info); } void TSequenceProxy::DoNextVal(TNextValRequestInfo&& request, const TString& database, const TPathId& pathId, bool needRefresh) { + Counters->RequestCount->Inc(); + auto& info = Databases[database].SequenceByPathId[pathId]; if (!info.ResolveInProgress && (needRefresh || !info.SequenceInfo)) { StartResolve(database, pathId, !info.SequenceInfo); @@ -77,14 +111,13 @@ namespace NSequenceProxy { OnChanged(database, pathId, info); } - void TSequenceProxy::OnResolveError(const TString& database, const TString& path, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) { + void TSequenceProxy::OnResolveError(const TString& database, const TString& path, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) { auto& info = Databases[database].SequenceByName[path]; Y_ABORT_UNLESS(info.ResolveInProgress); info.ResolveInProgress = false; while (!info.PendingNextValResolve.empty()) { - const auto& request = info.PendingNextValResolve.front(); - Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(status, issues), 0, request.Cookie); + Reply(info.PendingNextValResolve.front(), status, issues); info.PendingNextValResolve.pop_front(); } @@ -111,14 +144,13 @@ namespace NSequenceProxy { MaybeStartResolve(database, path, info); } - void TSequenceProxy::OnResolveError(const TString& database, const TPathId& pathId, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) { + void TSequenceProxy::OnResolveError(const TString& database, const TPathId& pathId, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) { auto& info = Databases[database].SequenceByPathId[pathId]; Y_ABORT_UNLESS(info.ResolveInProgress); info.ResolveInProgress = false; while (!info.PendingNextValResolve.empty()) { - const auto& request = info.PendingNextValResolve.front(); - Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(status, issues), 0, request.Cookie); + Reply(info.PendingNextValResolve.front(), status, issues); info.PendingNextValResolve.pop_front(); } } @@ -144,7 +176,7 @@ namespace NSequenceProxy { info.PendingNextVal.emplace_back(std::move(request)); ++info.TotalRequested; } - resolved.pop_back(); + resolved.pop_front(); } OnChanged(database, pathId, info); @@ -173,8 +205,7 @@ namespace NSequenceProxy { } else { // We will answer up to cache requests with this error while (cache > 0 && !info.PendingNextVal.empty()) { - const auto& request = info.PendingNextVal.front(); - Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(msg->Status, msg->Issues), 0, request.Cookie); + Reply(info.PendingNextVal.front(), msg->Status, msg->Issues); info.PendingNextVal.pop_front(); --info.TotalRequested; --cache; @@ -209,7 +240,7 @@ namespace NSequenceProxy { << "Access denied for " << request.UserToken->GetUserSID() << " to sequence " << pathId; NYql::TIssueManager issueManager; issueManager.RaiseIssue(MakeIssue(NKikimrIssues::TIssuesIds::ACCESS_DENIED, error)); - Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(Ydb::StatusIds::UNAUTHORIZED, issueManager.GetIssues())); + Reply(request, Ydb::StatusIds::UNAUTHORIZED, issueManager.GetIssues()); return true; } } @@ -226,7 +257,7 @@ namespace NSequenceProxy { Y_ABORT_UNLESS(!info.CachedAllocations.empty()); auto& front = info.CachedAllocations.front(); Y_ABORT_UNLESS(front.Count > 0); - Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(pathId, front.Start), 0, request.Cookie); + Reply(request, pathId, front.Start); --info.TotalCached; if (--front.Count > 0) { front.Start += front.Increment; diff --git a/ydb/core/tx/sequenceproxy/sequenceproxy_impl.h b/ydb/core/tx/sequenceproxy/sequenceproxy_impl.h index 2e9ab3ce56eb..65e9f8993c83 100644 --- a/ydb/core/tx/sequenceproxy/sequenceproxy_impl.h +++ b/ydb/core/tx/sequenceproxy/sequenceproxy_impl.h @@ -5,12 +5,26 @@ #include #include +#include +#include #include #include +#include namespace NKikimr { namespace NSequenceProxy { + struct TSequenceProxyCounters : TAtomicRefCount { + ::NMonitoring::TDynamicCounters::TCounterPtr RequestCount; + ::NMonitoring::TDynamicCounters::TCounterPtr ResponseCount; + ::NMonitoring::TDynamicCounters::TCounterPtr ErrorsCount; + + ::NMonitoring::THistogramPtr SequenceShardAllocateCount; + ::NMonitoring::THistogramPtr NextValLatency; + + TSequenceProxyCounters(); + }; + class TSequenceProxy : public TActorBootstrapped { public: TSequenceProxy(const TSequenceProxySettings& settings) @@ -58,6 +72,7 @@ namespace NSequenceProxy { TActorId Sender; ui64 Cookie; TIntrusivePtr UserToken; + TMonotonic StartAt; }; struct TCachedAllocation { @@ -127,13 +142,15 @@ namespace NSequenceProxy { void Handle(TEvPrivate::TEvAllocateResult::TPtr& ev); void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev); + void Reply(const TNextValRequestInfo& request, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues); + void Reply(const TNextValRequestInfo& request, const TPathId& pathId, i64 value); ui64 StartResolve(const TString& database, const std::variant& path, bool syncVersion); ui64 StartAllocate(ui64 tabletId, const TString& database, const TPathId& pathId, ui64 cache); void MaybeStartResolve(const TString& database, const TString& path, TSequenceByName& info); void DoNextVal(TNextValRequestInfo&& request, const TString& database, const TString& path); void DoNextVal(TNextValRequestInfo&& request, const TString& database, const TPathId& pathId, bool needRefresh = true); - void OnResolveError(const TString& database, const TString& path, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues); - void OnResolveError(const TString& database, const TPathId& pathId, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues); + void OnResolveError(const TString& database, const TString& path, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues); + void OnResolveError(const TString& database, const TPathId& pathId, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues); void OnResolveResult(const TString& database, const TString& path, TResolveResult&& result); void OnResolveResult(const TString& database, const TPathId& pathId, TResolveResult&& result); void OnResolved(const TString& database, const TPathId& pathId, TSequenceByPathId& info, TList& resolved); @@ -148,6 +165,7 @@ namespace NSequenceProxy { THashMap ResolveInFlight; THashMap AllocateInFlight; ui64 LastCookie = 0; + TIntrusivePtr Counters; }; } // namespace NSequenceProxy