Skip to content

Commit c1d5952

Browse files
authored
fix sequence requests lost and add debugging tools to sequence proxy (#570)
1 parent 34c1476 commit c1d5952

File tree

6 files changed

+76
-19
lines changed

6 files changed

+76
-19
lines changed

ydb/core/kqp/runtime/kqp_sequencer_actor.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ class TKqpSequencerActor : public NActors::TActorBootstrapped<TKqpSequencerActor
183183
finished = (status == NUdf::EFetchStatus::Finish)
184184
&& (UnprocessedRows == 0);
185185

186-
if (WaitingReplies == 0) {
186+
if (PendingRows.size() > 0 && WaitingReplies == 0) {
187187
Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
188188
}
189189

ydb/core/kqp/session_actor/kqp_query_state.h

+5-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
#include <ydb/core/kqp/common/kqp_user_request_context.h>
1616
#include <ydb/core/kqp/session_actor/kqp_tx.h>
1717

18+
#include <ydb/library/actors/core/monotonic_provider.h>
19+
1820
#include <util/generic/noncopyable.h>
1921
#include <util/generic/string.h>
2022

@@ -33,7 +35,7 @@ class TKqpQueryState : public TNonCopyable {
3335
TKqpQueryState(TEvKqp::TEvQueryRequest::TPtr& ev, ui64 queryId, const TString& database,
3436
const TString& cluster, TKqpDbCountersPtr dbCounters, bool longSession,
3537
const NKikimrConfig::TTableServiceConfig& tableServiceConfig, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
36-
NWilson::TTraceId&& traceId, const TString& sessionId)
38+
NWilson::TTraceId&& traceId, const TString& sessionId, TMonotonic startedAt)
3739
: QueryId(queryId)
3840
, Database(database)
3941
, Cluster(cluster)
@@ -46,6 +48,7 @@ class TKqpQueryState : public TNonCopyable {
4648
, StartTime(TInstant::Now())
4749
, KeepSession(ev->Get()->GetKeepSession() || longSession)
4850
, UserToken(ev->Get()->GetUserToken())
51+
, StartedAt(startedAt)
4952
{
5053
RequestEv.reset(ev->Release().Release());
5154

@@ -98,6 +101,7 @@ class TKqpQueryState : public TNonCopyable {
98101
NKqpProto::TKqpStatsQuery Stats;
99102
bool KeepSession = false;
100103
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
104+
NActors::TMonotonic StartedAt;
101105

102106
THashMap<NKikimr::TTableId, ui64> TableVersions;
103107

ydb/core/kqp/session_actor/kqp_session_actor.cpp

+7-4
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,8 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
218218
ev->Get()->SetClientLostAction(selfId, as);
219219
QueryState = std::make_shared<TKqpQueryState>(
220220
ev, QueryId, Settings.Database, Settings.Cluster, Settings.DbCounters, Settings.LongSession,
221-
Settings.TableService, Settings.QueryService, std::move(id), SessionId);
221+
Settings.TableService, Settings.QueryService, std::move(id), SessionId,
222+
AppData()->MonotonicTimeProvider->Now());
222223
if (QueryState->UserRequestContext->TraceId.empty()) {
223224
QueryState->UserRequestContext->TraceId = UlidGen.Next().ToString();
224225
}
@@ -1309,10 +1310,12 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
13091310
TString logMsg = TStringBuilder() << "got TEvAbortExecution in " << CurrentStateFuncName();
13101311
LOG_I(logMsg << ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode()) << " send to: " << ExecuterId);
13111312

1313+
TString reason = TStringBuilder() << "Request timeout exceeded, cancelling after "
1314+
<< (AppData()->MonotonicTimeProvider->Now() - QueryState->StartedAt).MilliSeconds()
1315+
<< " milliseconds.";
1316+
13121317
if (ExecuterId) {
1313-
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(
1314-
msg.GetStatusCode(),
1315-
"Request timeout exceeded");
1318+
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(msg.GetStatusCode(), reason);
13161319
Send(ExecuterId, abortEv.Release(), IEventHandle::FlagTrackDelivery);
13171320
} else {
13181321
const auto& issues = ev->Get()->GetIssues();

ydb/core/tx/sequenceproxy/sequenceproxy_allocate.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ namespace NSequenceProxy {
142142
auto& info = AllocateInFlight[cookie];
143143
info.Database = database;
144144
info.PathId = pathId;
145+
Counters->SequenceShardAllocateCount->Collect(cache);
145146
Register(new TAllocateActor(SelfId(), cookie, tabletId, pathId, cache));
146147
return cookie;
147148
}

ydb/core/tx/sequenceproxy/sequenceproxy_impl.cpp

+42-11
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "sequenceproxy_impl.h"
22

3+
#include <ydb/core/base/appdata_fwd.h>
34
#include <ydb/library/ydb_issue/issue_helpers.h>
45
#include <ydb/library/yql/public/issue/yql_issue_manager.h>
56

@@ -15,7 +16,21 @@
1516
namespace NKikimr {
1617
namespace NSequenceProxy {
1718

19+
TSequenceProxyCounters::TSequenceProxyCounters() {
20+
auto group = GetServiceCounters(AppData()->Counters, "proxy");
21+
SequenceShardAllocateCount = group->GetHistogram(
22+
"SequenceProxy/SequenceShard/AllocateCountPerRequest",
23+
NMonitoring::ExponentialHistogram(20, 2, 1));
24+
25+
ErrorsCount = group->GetCounter("SequenceProxy/Errors", true);
26+
RequestCount = group->GetCounter("SequenceProxy/Requests", true);
27+
ResponseCount = group->GetCounter("SequenceProxy/Responses", true);
28+
NextValLatency = group->GetHistogram("SequenceProxy/Latency",
29+
NMonitoring::ExponentialHistogram(20, 2, 1));
30+
};
31+
1832
void TSequenceProxy::Bootstrap() {
33+
Counters.Reset(new TSequenceProxyCounters());
1934
LogPrefix = TStringBuilder() << "TSequenceProxy [Node " << SelfId().NodeId() << "] ";
2035
Become(&TThis::StateWork);
2136
}
@@ -30,13 +45,29 @@ namespace NSequenceProxy {
3045
request.Sender = ev->Sender;
3146
request.Cookie = ev->Cookie;
3247
request.UserToken = std::move(msg->UserToken);
48+
request.StartAt = AppData()->MonotonicTimeProvider->Now();
3349
std::visit(
3450
[&](const auto& path) {
3551
DoNextVal(std::move(request), msg->Database, path);
3652
},
3753
msg->Path);
3854
}
3955

56+
void TSequenceProxy::Reply(const TNextValRequestInfo& request, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) {
57+
Counters->ResponseCount->Inc();
58+
auto milliseconds = (AppData()->MonotonicTimeProvider->Now() - request.StartAt).MilliSeconds();
59+
Counters->NextValLatency->Collect(milliseconds);
60+
Counters->ErrorsCount->Inc();
61+
Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(status, issues), 0, request.Cookie);
62+
}
63+
64+
void TSequenceProxy::Reply(const TNextValRequestInfo& request, const TPathId& pathId, i64 value) {
65+
Counters->ResponseCount->Inc();
66+
auto milliseconds = (AppData()->MonotonicTimeProvider->Now() - request.StartAt).MilliSeconds();
67+
Counters->NextValLatency->Collect(milliseconds);
68+
Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(pathId, value), 0, request.Cookie);
69+
}
70+
4071
void TSequenceProxy::MaybeStartResolve(const TString& database, const TString& path, TSequenceByName& info) {
4172
if (!info.ResolveInProgress && !info.NewNextValResolve.empty()) {
4273
info.PendingNextValResolve = std::move(info.NewNextValResolve);
@@ -46,12 +77,15 @@ namespace NSequenceProxy {
4677
}
4778

4879
void TSequenceProxy::DoNextVal(TNextValRequestInfo&& request, const TString& database, const TString& path) {
80+
Counters->RequestCount->Inc();
4981
auto& info = Databases[database].SequenceByName[path];
5082
info.NewNextValResolve.emplace_back(std::move(request));
5183
MaybeStartResolve(database, path, info);
5284
}
5385

5486
void TSequenceProxy::DoNextVal(TNextValRequestInfo&& request, const TString& database, const TPathId& pathId, bool needRefresh) {
87+
Counters->RequestCount->Inc();
88+
5589
auto& info = Databases[database].SequenceByPathId[pathId];
5690
if (!info.ResolveInProgress && (needRefresh || !info.SequenceInfo)) {
5791
StartResolve(database, pathId, !info.SequenceInfo);
@@ -77,14 +111,13 @@ namespace NSequenceProxy {
77111
OnChanged(database, pathId, info);
78112
}
79113

80-
void TSequenceProxy::OnResolveError(const TString& database, const TString& path, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) {
114+
void TSequenceProxy::OnResolveError(const TString& database, const TString& path, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) {
81115
auto& info = Databases[database].SequenceByName[path];
82116
Y_ABORT_UNLESS(info.ResolveInProgress);
83117
info.ResolveInProgress = false;
84118

85119
while (!info.PendingNextValResolve.empty()) {
86-
const auto& request = info.PendingNextValResolve.front();
87-
Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(status, issues), 0, request.Cookie);
120+
Reply(info.PendingNextValResolve.front(), status, issues);
88121
info.PendingNextValResolve.pop_front();
89122
}
90123

@@ -111,14 +144,13 @@ namespace NSequenceProxy {
111144
MaybeStartResolve(database, path, info);
112145
}
113146

114-
void TSequenceProxy::OnResolveError(const TString& database, const TPathId& pathId, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) {
147+
void TSequenceProxy::OnResolveError(const TString& database, const TPathId& pathId, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) {
115148
auto& info = Databases[database].SequenceByPathId[pathId];
116149
Y_ABORT_UNLESS(info.ResolveInProgress);
117150
info.ResolveInProgress = false;
118151

119152
while (!info.PendingNextValResolve.empty()) {
120-
const auto& request = info.PendingNextValResolve.front();
121-
Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(status, issues), 0, request.Cookie);
153+
Reply(info.PendingNextValResolve.front(), status, issues);
122154
info.PendingNextValResolve.pop_front();
123155
}
124156
}
@@ -144,7 +176,7 @@ namespace NSequenceProxy {
144176
info.PendingNextVal.emplace_back(std::move(request));
145177
++info.TotalRequested;
146178
}
147-
resolved.pop_back();
179+
resolved.pop_front();
148180
}
149181

150182
OnChanged(database, pathId, info);
@@ -173,8 +205,7 @@ namespace NSequenceProxy {
173205
} else {
174206
// We will answer up to cache requests with this error
175207
while (cache > 0 && !info.PendingNextVal.empty()) {
176-
const auto& request = info.PendingNextVal.front();
177-
Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(msg->Status, msg->Issues), 0, request.Cookie);
208+
Reply(info.PendingNextVal.front(), msg->Status, msg->Issues);
178209
info.PendingNextVal.pop_front();
179210
--info.TotalRequested;
180211
--cache;
@@ -209,7 +240,7 @@ namespace NSequenceProxy {
209240
<< "Access denied for " << request.UserToken->GetUserSID() << " to sequence " << pathId;
210241
NYql::TIssueManager issueManager;
211242
issueManager.RaiseIssue(MakeIssue(NKikimrIssues::TIssuesIds::ACCESS_DENIED, error));
212-
Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(Ydb::StatusIds::UNAUTHORIZED, issueManager.GetIssues()));
243+
Reply(request, Ydb::StatusIds::UNAUTHORIZED, issueManager.GetIssues());
213244
return true;
214245
}
215246
}
@@ -226,7 +257,7 @@ namespace NSequenceProxy {
226257
Y_ABORT_UNLESS(!info.CachedAllocations.empty());
227258
auto& front = info.CachedAllocations.front();
228259
Y_ABORT_UNLESS(front.Count > 0);
229-
Send(request.Sender, new TEvSequenceProxy::TEvNextValResult(pathId, front.Start), 0, request.Cookie);
260+
Reply(request, pathId, front.Start);
230261
--info.TotalCached;
231262
if (--front.Count > 0) {
232263
front.Start += front.Increment;

ydb/core/tx/sequenceproxy/sequenceproxy_impl.h

+20-2
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,26 @@
55
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
66
#include <ydb/core/tx/sequenceproxy/public/events.h>
77

8+
#include <ydb/core/base/counters.h>
9+
#include <library/cpp/monlib/dynamic_counters/counters.h>
810
#include <ydb/library/actors/core/actor_bootstrapped.h>
911
#include <ydb/library/actors/core/hfunc.h>
12+
#include <ydb/library/actors/core/monotonic_provider.h>
1013

1114
namespace NKikimr {
1215
namespace NSequenceProxy {
1316

17+
struct TSequenceProxyCounters : TAtomicRefCount<TSequenceProxyCounters> {
18+
::NMonitoring::TDynamicCounters::TCounterPtr RequestCount;
19+
::NMonitoring::TDynamicCounters::TCounterPtr ResponseCount;
20+
::NMonitoring::TDynamicCounters::TCounterPtr ErrorsCount;
21+
22+
::NMonitoring::THistogramPtr SequenceShardAllocateCount;
23+
::NMonitoring::THistogramPtr NextValLatency;
24+
25+
TSequenceProxyCounters();
26+
};
27+
1428
class TSequenceProxy : public TActorBootstrapped<TSequenceProxy> {
1529
public:
1630
TSequenceProxy(const TSequenceProxySettings& settings)
@@ -58,6 +72,7 @@ namespace NSequenceProxy {
5872
TActorId Sender;
5973
ui64 Cookie;
6074
TIntrusivePtr<NACLib::TUserToken> UserToken;
75+
TMonotonic StartAt;
6176
};
6277

6378
struct TCachedAllocation {
@@ -127,13 +142,15 @@ namespace NSequenceProxy {
127142
void Handle(TEvPrivate::TEvAllocateResult::TPtr& ev);
128143
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
129144

145+
void Reply(const TNextValRequestInfo& request, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues);
146+
void Reply(const TNextValRequestInfo& request, const TPathId& pathId, i64 value);
130147
ui64 StartResolve(const TString& database, const std::variant<TString, TPathId>& path, bool syncVersion);
131148
ui64 StartAllocate(ui64 tabletId, const TString& database, const TPathId& pathId, ui64 cache);
132149
void MaybeStartResolve(const TString& database, const TString& path, TSequenceByName& info);
133150
void DoNextVal(TNextValRequestInfo&& request, const TString& database, const TString& path);
134151
void DoNextVal(TNextValRequestInfo&& request, const TString& database, const TPathId& pathId, bool needRefresh = true);
135-
void OnResolveError(const TString& database, const TString& path, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues);
136-
void OnResolveError(const TString& database, const TPathId& pathId, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues);
152+
void OnResolveError(const TString& database, const TString& path, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues);
153+
void OnResolveError(const TString& database, const TPathId& pathId, Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues);
137154
void OnResolveResult(const TString& database, const TString& path, TResolveResult&& result);
138155
void OnResolveResult(const TString& database, const TPathId& pathId, TResolveResult&& result);
139156
void OnResolved(const TString& database, const TPathId& pathId, TSequenceByPathId& info, TList<TNextValRequestInfo>& resolved);
@@ -148,6 +165,7 @@ namespace NSequenceProxy {
148165
THashMap<ui64, TResolveInFlight> ResolveInFlight;
149166
THashMap<ui64, TAllocateInFlight> AllocateInFlight;
150167
ui64 LastCookie = 0;
168+
TIntrusivePtr<TSequenceProxyCounters> Counters;
151169
};
152170

153171
} // namespace NSequenceProxy

0 commit comments

Comments
 (0)