Skip to content

Commit 4ebd552

Browse files
authored
24-3-9-hotfix: Limit inflight cross-database scheme requests in the replication controller (#9394)
1 parent 8c0b69c commit 4ebd552

File tree

6 files changed

+127
-3
lines changed

6 files changed

+127
-3
lines changed

ydb/core/protos/replication.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,13 @@ package NKikimrReplication;
66
option java_package = "ru.yandex.kikimr.proto";
77

88
message TReplicationDefaults {
9+
message TSchemeOperationLimits {
10+
optional uint32 InflightCreateStreamLimit = 1 [default = 1];
11+
optional uint32 InflightDropStreamLimit = 2 [default = 1];
12+
}
13+
914
optional int32 RetentionPeriodSeconds = 1 [default = 86400]; // 1d
15+
optional TSchemeOperationLimits SchemeOperationLimits = 2;
1016
}
1117

1218
message TStaticCredentials {

ydb/core/tx/replication/controller/controller.cpp

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "controller.h"
22
#include "controller_impl.h"
33

4+
#include <ydb/core/base/appdata.h>
45
#include <ydb/core/discovery/discovery.h>
56
#include <ydb/core/engine/minikql/flat_local_tx_factory.h>
67

@@ -60,6 +61,8 @@ STFUNC(TController::StateWork) {
6061
HFunc(TEvPrivate::TEvProcessQueues, Handle);
6162
HFunc(TEvPrivate::TEvRemoveWorker, Handle);
6263
HFunc(TEvPrivate::TEvDescribeTargetsResult, Handle);
64+
HFunc(TEvPrivate::TEvRequestCreateStream, Handle);
65+
HFunc(TEvPrivate::TEvRequestDropStream, Handle);
6366
HFunc(TEvDiscovery::TEvDiscoveryData, Handle);
6467
HFunc(TEvDiscovery::TEvError, Handle);
6568
HFunc(TEvService::TEvStatus, Handle);
@@ -148,13 +151,53 @@ void TController::Handle(TEvPrivate::TEvAssignStreamName::TPtr& ev, const TActor
148151
RunTxAssignStreamName(ev, ctx);
149152
}
150153

154+
template <typename TEvent>
155+
void ProcessLimiterQueue(TDeque<TActorId>& requested, THashSet<TActorId>& inflight, ui32 limit, const TActorContext& ctx) {
156+
while (!requested.empty() && inflight.size() < limit) {
157+
const auto& actorId = requested.front();
158+
ctx.Send(actorId, new TEvent());
159+
inflight.insert(actorId);
160+
requested.pop_front();
161+
}
162+
}
163+
164+
void TController::ProcessCreateStreamQueue(const TActorContext& ctx) {
165+
const auto& limits = AppData()->ReplicationConfig.GetSchemeOperationLimits();
166+
ProcessLimiterQueue<TEvPrivate::TEvAllowCreateStream>(RequestedCreateStream, InflightCreateStream, limits.GetInflightCreateStreamLimit(), ctx);
167+
}
168+
169+
void TController::ProcessDropStreamQueue(const TActorContext& ctx) {
170+
const auto& limits = AppData()->ReplicationConfig.GetSchemeOperationLimits();
171+
ProcessLimiterQueue<TEvPrivate::TEvAllowDropStream>(RequestedDropStream, InflightDropStream, limits.GetInflightDropStreamLimit(), ctx);
172+
}
173+
174+
void TController::Handle(TEvPrivate::TEvRequestCreateStream::TPtr& ev, const TActorContext& ctx) {
175+
CLOG_T(ctx, "Handle " << ev->Get()->ToString());
176+
177+
RequestedCreateStream.push_back(ev->Sender);
178+
ProcessCreateStreamQueue(ctx);
179+
}
180+
151181
void TController::Handle(TEvPrivate::TEvCreateStreamResult::TPtr& ev, const TActorContext& ctx) {
152182
CLOG_T(ctx, "Handle " << ev->Get()->ToString());
183+
184+
InflightCreateStream.erase(ev->Sender);
185+
ProcessCreateStreamQueue(ctx);
153186
RunTxCreateStreamResult(ev, ctx);
154187
}
155188

189+
void TController::Handle(TEvPrivate::TEvRequestDropStream::TPtr& ev, const TActorContext& ctx) {
190+
CLOG_T(ctx, "Handle " << ev->Get()->ToString());
191+
192+
RequestedDropStream.push_back(ev->Sender);
193+
ProcessDropStreamQueue(ctx);
194+
}
195+
156196
void TController::Handle(TEvPrivate::TEvDropStreamResult::TPtr& ev, const TActorContext& ctx) {
157197
CLOG_T(ctx, "Handle " << ev->Get()->ToString());
198+
199+
InflightDropStream.erase(ev->Sender);
200+
ProcessDropStreamQueue(ctx);
158201
RunTxDropStreamResult(ev, ctx);
159202
}
160203

ydb/core/tx/replication/controller/controller_impl.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include <ydb/library/actors/core/interconnect.h>
1818
#include <ydb/library/yverify_stream/yverify_stream.h>
1919

20+
#include <util/generic/deque.h>
2021
#include <util/generic/hash.h>
2122
#include <util/generic/hash_set.h>
2223

@@ -83,6 +84,8 @@ class TController
8384
void Handle(TEvPrivate::TEvProcessQueues::TPtr& ev, const TActorContext& ctx);
8485
void Handle(TEvPrivate::TEvRemoveWorker::TPtr& ev, const TActorContext& ctx);
8586
void Handle(TEvPrivate::TEvDescribeTargetsResult::TPtr& ev, const TActorContext& ctx);
87+
void Handle(TEvPrivate::TEvRequestCreateStream::TPtr& ev, const TActorContext& ctx);
88+
void Handle(TEvPrivate::TEvRequestDropStream::TPtr& ev, const TActorContext& ctx);
8689
void Handle(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx);
8790
void Handle(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx);
8891
void Handle(TEvService::TEvStatus::TPtr& ev, const TActorContext& ctx);
@@ -103,6 +106,8 @@ class TController
103106
void RemoveWorker(const TWorkerId& id, const TActorContext& ctx);
104107
bool MaybeRemoveWorker(const TWorkerId& id, const TActorContext& ctx);
105108
void UpdateLag(const TWorkerId& id, TDuration lag);
109+
void ProcessCreateStreamQueue(const TActorContext& ctx);
110+
void ProcessDropStreamQueue(const TActorContext& ctx);
106111

107112
// local transactions
108113
class TTxInitSchema;
@@ -178,6 +183,13 @@ class TController
178183
bool ProcessQueuesScheduled = false;
179184
static constexpr ui32 ProcessBatchLimit = 100;
180185

186+
// create stream limiter
187+
TDeque<TActorId> RequestedCreateStream;
188+
THashSet<TActorId> InflightCreateStream;
189+
// drop stream limiter
190+
TDeque<TActorId> RequestedDropStream;
191+
THashSet<TActorId> InflightDropStream;
192+
181193
}; // TController
182194

183195
}

ydb/core/tx/replication/controller/private_events.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ struct TEvPrivate {
3232
EvAlterDstResult,
3333
EvRemoveWorker,
3434
EvDescribeTargetsResult,
35+
EvRequestCreateStream,
36+
EvAllowCreateStream,
37+
EvRequestDropStream,
38+
EvAllowDropStream,
3539

3640
EvEnd,
3741
};
@@ -221,6 +225,18 @@ struct TEvPrivate {
221225
TString ToString() const override;
222226
};
223227

228+
struct TEvRequestCreateStream: public TEventLocal<TEvRequestCreateStream, EvRequestCreateStream> {
229+
};
230+
231+
struct TEvAllowCreateStream: public TEventLocal<TEvAllowCreateStream, EvAllowCreateStream> {
232+
};
233+
234+
struct TEvRequestDropStream: public TEventLocal<TEvRequestDropStream, EvRequestDropStream> {
235+
};
236+
237+
struct TEvAllowDropStream: public TEventLocal<TEvAllowDropStream, EvAllowDropStream> {
238+
};
239+
224240
}; // TEvPrivate
225241

226242
}

ydb/core/tx/replication/controller/stream_creator.cpp

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,24 @@ class TStreamCreator: public TActorBootstrapped<TStreamCreator> {
2828
.AddAttribute("__async_replication", NJson::WriteJson(attrs, false));
2929
}
3030

31+
void RequestPermission() {
32+
Send(Parent, new TEvPrivate::TEvRequestCreateStream());
33+
Become(&TThis::StateRequestPermission);
34+
}
35+
36+
STATEFN(StateRequestPermission) {
37+
switch (ev->GetTypeRewrite()) {
38+
hFunc(TEvPrivate::TEvAllowCreateStream, Handle);
39+
default:
40+
return StateBase(ev);
41+
}
42+
}
43+
44+
void Handle(TEvPrivate::TEvAllowCreateStream::TPtr& ev) {
45+
LOG_T("Handle " << ev->Get()->ToString());
46+
CreateStream();
47+
}
48+
3149
void CreateStream() {
3250
switch (Kind) {
3351
case TReplication::ETargetKind::Table:
@@ -103,6 +121,10 @@ class TStreamCreator: public TActorBootstrapped<TStreamCreator> {
103121
LOG_T("Handle " << ev->Get()->ToString());
104122
auto& result = ev->Get()->Result;
105123

124+
if (result.GetStatus() == NYdb::EStatus::ALREADY_EXISTS) {
125+
return Reply(NYdb::TStatus(NYdb::EStatus::SUCCESS, NYql::TIssues()));
126+
}
127+
106128
if (!result.IsSuccess()) {
107129
if (IsRetryableError(result)) {
108130
LOG_D("Retry CreateConsumer");
@@ -155,7 +177,7 @@ class TStreamCreator: public TActorBootstrapped<TStreamCreator> {
155177
}
156178

157179
void Bootstrap() {
158-
CreateStream();
180+
RequestPermission();
159181
}
160182

161183
STATEFN(StateBase) {

ydb/core/tx/replication/controller/stream_remover.cpp

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,24 @@
1010
namespace NKikimr::NReplication::NController {
1111

1212
class TStreamRemover: public TActorBootstrapped<TStreamRemover> {
13+
void RequestPermission() {
14+
Send(Parent, new TEvPrivate::TEvRequestDropStream());
15+
Become(&TThis::StateRequestPermission);
16+
}
17+
18+
STATEFN(StateRequestPermission) {
19+
switch (ev->GetTypeRewrite()) {
20+
hFunc(TEvPrivate::TEvAllowDropStream, Handle);
21+
default:
22+
return StateBase(ev);
23+
}
24+
}
25+
26+
void Handle(TEvPrivate::TEvAllowDropStream::TPtr& ev) {
27+
LOG_T("Handle " << ev->Get()->ToString());
28+
DropStream();
29+
}
30+
1331
void DropStream() {
1432
switch (Kind) {
1533
case TReplication::ETargetKind::Table:
@@ -26,7 +44,8 @@ class TStreamRemover: public TActorBootstrapped<TStreamRemover> {
2644
switch (ev->GetTypeRewrite()) {
2745
hFunc(TEvYdbProxy::TEvAlterTableResponse, Handle);
2846
sFunc(TEvents::TEvWakeup, DropStream);
29-
sFunc(TEvents::TEvPoison, PassAway);
47+
default:
48+
return StateBase(ev);
3049
}
3150
}
3251

@@ -77,7 +96,13 @@ class TStreamRemover: public TActorBootstrapped<TStreamRemover> {
7796
}
7897

7998
void Bootstrap() {
80-
DropStream();
99+
RequestPermission();
100+
}
101+
102+
STATEFN(StateBase) {
103+
switch (ev->GetTypeRewrite()) {
104+
sFunc(TEvents::TEvPoison, PassAway);
105+
}
81106
}
82107

83108
private:

0 commit comments

Comments
 (0)