Skip to content

Commit e2c166d

Browse files
Add unique consumer for each transfer from topic (#15242)
Co-authored-by: Ilnaz Nizametdinov <[email protected]>
1 parent 7fe0768 commit e2c166d

18 files changed

+306
-19
lines changed

ydb/core/tx/replication/controller/event_util.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ THolder<TEvService::TEvRunWorker> MakeRunWorkerEv(
1616
replication->GetConfig().GetSrcConnectionParams(),
1717
replication->GetConfig().GetConsistencySettings(),
1818
target.GetStreamPath(),
19+
target.GetStreamConsumerName(),
1920
target.GetDstPathId());
2021
}
2122

@@ -27,6 +28,7 @@ THolder<TEvService::TEvRunWorker> MakeRunWorkerEv(
2728
const NKikimrReplication::TConnectionParams& connectionParams,
2829
const NKikimrReplication::TConsistencySettings& consistencySettings,
2930
const TString& srcStreamPath,
31+
const TString& srcStreamConsumerName,
3032
const TPathId& dstPathId)
3133
{
3234
auto ev = MakeHolder<TEvService::TEvRunWorker>();
@@ -41,7 +43,7 @@ THolder<TEvService::TEvRunWorker> MakeRunWorkerEv(
4143
readerSettings.MutableConnectionParams()->CopyFrom(connectionParams);
4244
readerSettings.SetTopicPath(srcStreamPath);
4345
readerSettings.SetTopicPartitionId(workerId);
44-
readerSettings.SetConsumerName(ReplicationConsumerName);
46+
readerSettings.SetConsumerName(srcStreamConsumerName);
4547

4648
switch(config->GetKind()) {
4749
case TReplication::ETargetKind::Table:

ydb/core/tx/replication/controller/event_util.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ THolder<TEvService::TEvRunWorker> MakeRunWorkerEv(
1919
const NKikimrReplication::TConnectionParams& connectionParams,
2020
const NKikimrReplication::TConsistencySettings& consistencySettings,
2121
const TString& srcStreamPath,
22+
const TString& srcStreamConsumerName,
2223
const TPathId& dstPathId);
2324

2425
}

ydb/core/tx/replication/controller/replication.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ class TReplication: public TSimpleRefCount<TReplication> {
8282

8383
virtual const TString& GetStreamName() const = 0;
8484
virtual void SetStreamName(const TString& value) = 0;
85+
virtual const TString& GetStreamConsumerName() const = 0;
86+
virtual void SetStreamConsumerName(const TString& value) = 0;
8587
virtual TString GetStreamPath() const = 0;
8688

8789
virtual EStreamState GetStreamState() const = 0;

ydb/core/tx/replication/controller/schema.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,10 @@ struct TControllerSchema: NIceDb::Schema {
5858
struct TargetId: Column<2, NScheme::NTypeIds::Uint64> {};
5959
struct Name: Column<3, NScheme::NTypeIds::Utf8> {};
6060
struct State: Column<4, NScheme::NTypeIds::Uint8> { using Type = TReplication::EStreamState; };
61+
struct ConsumerName: Column<5, NScheme::NTypeIds::Utf8> {};
6162

6263
using TKey = TableKey<ReplicationId, TargetId>;
63-
using TColumns = TableColumns<ReplicationId, TargetId, Name, State>;
64+
using TColumns = TableColumns<ReplicationId, TargetId, Name, State, ConsumerName>;
6465
};
6566

6667
struct TxIds: Table<5> {
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
#include "logging.h"
2+
#include "private_events.h"
3+
#include "stream_consumer_remover.h"
4+
#include "util.h"
5+
6+
#include <ydb/core/tx/replication/ydb_proxy/ydb_proxy.h>
7+
#include <ydb/library/actors/core/actor_bootstrapped.h>
8+
#include <ydb/library/actors/core/hfunc.h>
9+
#include <ydb-cpp-sdk/client/types/status/status.h>
10+
11+
namespace NKikimr::NReplication::NController {
12+
13+
class TStreamConsumerRemover: public TActorBootstrapped<TStreamConsumerRemover> {
14+
void RequestPermission() {
15+
Send(Parent, new TEvPrivate::TEvRequestDropStream());
16+
Become(&TThis::StateRequestPermission);
17+
}
18+
19+
STATEFN(StateRequestPermission) {
20+
switch (ev->GetTypeRewrite()) {
21+
hFunc(TEvPrivate::TEvAllowDropStream, Handle);
22+
default:
23+
return StateBase(ev);
24+
}
25+
}
26+
27+
void Handle(TEvPrivate::TEvAllowDropStream::TPtr& ev) {
28+
LOG_T("Handle " << ev->Get()->ToString());
29+
DropStreamConsumer();
30+
}
31+
32+
void DropStreamConsumer() {
33+
Send(YdbProxy, new TEvYdbProxy::TEvAlterTopicRequest(SrcPath, NYdb::NTopic::TAlterTopicSettings()
34+
.AppendDropConsumers(ConsumerName)));
35+
36+
Become(&TThis::StateWork);
37+
}
38+
39+
STATEFN(StateWork) {
40+
switch (ev->GetTypeRewrite()) {
41+
hFunc(TEvYdbProxy::TEvAlterTopicResponse, Handle);
42+
sFunc(TEvents::TEvWakeup, DropStreamConsumer);
43+
default:
44+
return StateBase(ev);
45+
}
46+
}
47+
48+
void Handle(TEvYdbProxy::TEvAlterTopicResponse::TPtr& ev) {
49+
LOG_T("Handle " << ev->Get()->ToString());
50+
auto& result = ev->Get()->Result;
51+
52+
if (!result.IsSuccess()) {
53+
if (IsRetryableError(result)) {
54+
LOG_D("Retry");
55+
return Schedule(TDuration::Seconds(10), new TEvents::TEvWakeup);
56+
}
57+
58+
LOG_E("Error"
59+
<< ": status# " << result.GetStatus()
60+
<< ", issues# " << result.GetIssues().ToOneLineString());
61+
} else {
62+
LOG_I("Success"
63+
<< ": issues# " << result.GetIssues().ToOneLineString());
64+
}
65+
66+
Send(Parent, new TEvPrivate::TEvDropStreamResult(ReplicationId, TargetId, std::move(result)));
67+
PassAway();
68+
}
69+
70+
public:
71+
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
72+
return NKikimrServices::TActivity::REPLICATION_CONTROLLER_STREAM_REMOVER;
73+
}
74+
75+
explicit TStreamConsumerRemover(
76+
const TActorId& parent,
77+
const TActorId& proxy,
78+
ui64 rid,
79+
ui64 tid,
80+
TReplication::ETargetKind kind,
81+
const TString& srcPath,
82+
const TString& consumerName)
83+
: Parent(parent)
84+
, YdbProxy(proxy)
85+
, ReplicationId(rid)
86+
, TargetId(tid)
87+
, Kind(kind)
88+
, SrcPath(srcPath)
89+
, ConsumerName(consumerName)
90+
, LogPrefix("StreamConsumerRemover", ReplicationId, TargetId)
91+
{
92+
}
93+
94+
void Bootstrap() {
95+
switch (Kind) {
96+
case TReplication::ETargetKind::Table:
97+
case TReplication::ETargetKind::IndexTable:
98+
Y_ABORT("Unreachable");
99+
case TReplication::ETargetKind::Transfer:
100+
return RequestPermission();
101+
}
102+
}
103+
104+
STATEFN(StateBase) {
105+
switch (ev->GetTypeRewrite()) {
106+
sFunc(TEvents::TEvPoison, PassAway);
107+
}
108+
}
109+
110+
private:
111+
const TActorId Parent;
112+
const TActorId YdbProxy;
113+
const ui64 ReplicationId;
114+
const ui64 TargetId;
115+
const TReplication::ETargetKind Kind;
116+
const TString SrcPath;
117+
const TString ConsumerName;
118+
const TActorLogPrefix LogPrefix;
119+
120+
}; // TStreamRemover
121+
122+
IActor* CreateStreamConsumerRemover(TReplication* replication, ui64 targetId, const TActorContext& ctx) {
123+
const auto* target = replication->FindTarget(targetId);
124+
Y_ABORT_UNLESS(target);
125+
return CreateStreamConsumerRemover(ctx.SelfID, replication->GetYdbProxy(),
126+
replication->GetId(), target->GetId(), target->GetKind(), target->GetSrcPath(), target->GetStreamConsumerName());
127+
}
128+
129+
IActor* CreateStreamConsumerRemover(const TActorId& parent, const TActorId& proxy, ui64 rid, ui64 tid,
130+
TReplication::ETargetKind kind, const TString& srcPath, const TString& consumerName)
131+
{
132+
return new TStreamConsumerRemover(parent, proxy, rid, tid, kind, srcPath, consumerName);
133+
}
134+
135+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
#pragma once
2+
3+
#include "replication.h"
4+
5+
namespace NKikimr::NReplication::NController {
6+
7+
IActor* CreateStreamConsumerRemover(TReplication* replication, ui64 targetId, const TActorContext& ctx);
8+
IActor* CreateStreamConsumerRemover(const TActorId& parent, const TActorId& proxy, ui64 rid, ui64 tid,
9+
TReplication::ETargetKind kind, const TString& srcPath, const TString& consumerName);
10+
11+
}

ydb/core/tx/replication/controller/stream_creator.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ class TStreamCreator: public TActorBootstrapped<TStreamCreator> {
117117
const auto streamPath = BuildStreamPath();
118118
const auto settings = NYdb::NTopic::TAlterTopicSettings()
119119
.BeginAddConsumer()
120-
.ConsumerName(ReplicationConsumerName)
120+
.ConsumerName(SrcConsumerName)
121121
.EndAddConsumer();
122122

123123
Send(YdbProxy, new TEvYdbProxy::TEvAlterTopicRequest(streamPath, settings));
@@ -175,6 +175,7 @@ class TStreamCreator: public TActorBootstrapped<TStreamCreator> {
175175
ui64 tid,
176176
const TReplication::ITarget::IConfig::TPtr& config,
177177
const TString& streamName,
178+
const TString& consumerName,
178179
const TDuration& retentionPeriod,
179180
const std::optional<TDuration>& resolvedTimestamps,
180181
bool supportsTopicAutopartitioning)
@@ -184,6 +185,7 @@ class TStreamCreator: public TActorBootstrapped<TStreamCreator> {
184185
, TargetId(tid)
185186
, Kind(config->GetKind())
186187
, SrcPath(config->GetSrcPath())
188+
, SrcConsumerName(consumerName)
187189
, Changefeed(MakeChangefeed(streamName, retentionPeriod, resolvedTimestamps, NJson::TJsonMap{
188190
{"path", config->GetDstPath()},
189191
{"id", ToString(rid)},
@@ -210,6 +212,7 @@ class TStreamCreator: public TActorBootstrapped<TStreamCreator> {
210212
const ui64 TargetId;
211213
const TReplication::ETargetKind Kind;
212214
const TString SrcPath;
215+
const TString SrcConsumerName;
213216
const NYdb::NTable::TChangefeedDescription Changefeed;
214217
const TActorLogPrefix LogPrefix;
215218

@@ -226,19 +229,19 @@ IActor* CreateStreamCreator(TReplication* replication, ui64 targetId, const TAct
226229

227230
return CreateStreamCreator(ctx.SelfID, replication->GetYdbProxy(),
228231
replication->GetId(), target->GetId(),
229-
target->GetConfig(), target->GetStreamName(),
232+
target->GetConfig(), target->GetStreamName(), target->GetStreamConsumerName(),
230233
TDuration::Seconds(AppData()->ReplicationConfig.GetRetentionPeriodSeconds()), resolvedTimestamps,
231234
AppData()->FeatureFlags.GetEnableTopicAutopartitioningForReplication());
232235
}
233236

234237
IActor* CreateStreamCreator(const TActorId& parent, const TActorId& proxy, ui64 rid, ui64 tid,
235238
const TReplication::ITarget::IConfig::TPtr& config,
236-
const TString& streamName, const TDuration& retentionPeriod,
239+
const TString& streamName, const TString& consumerName, const TDuration& retentionPeriod,
237240
const std::optional<TDuration>& resolvedTimestamps,
238241
bool supportsTopicAutopartitioning)
239242
{
240243
return new TStreamCreator(parent, proxy, rid, tid, config,
241-
streamName, retentionPeriod, resolvedTimestamps, supportsTopicAutopartitioning);
244+
streamName, consumerName, retentionPeriod, resolvedTimestamps, supportsTopicAutopartitioning);
242245
}
243246

244247
}

ydb/core/tx/replication/controller/stream_creator.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ IActor* CreateStreamCreator(TReplication* replication, ui64 targetId, const TAct
1010

1111
IActor* CreateStreamCreator(const TActorId& parent, const TActorId& proxy, ui64 rid, ui64 tid,
1212
const TReplication::ITarget::IConfig::TPtr& config,
13-
const TString& streamName, const TDuration& streamRetentionPeriod,
13+
const TString& streamName, const TString& consumerName, const TDuration& streamRetentionPeriod,
1414
const std::optional<TDuration>& resolvedTimestamps = std::nullopt,
1515
bool supportsTopicAutopartitioning = false);
1616

ydb/core/tx/replication/controller/stream_creator_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ Y_UNIT_TEST_SUITE(StreamCreator) {
4141
env.GetRuntime().Register(CreateStreamCreator(
4242
env.GetSender(), env.GetYdbProxy(), 1 /* rid */, 1 /* tid */,
4343
std::make_shared<TTargetTable::TTableConfig>("/Root/Table", "/Root/Replica"),
44-
"Stream", TDuration::Hours(1), resolvedTimestamps
44+
"Stream", "replicationConsumer", TDuration::Hours(1), resolvedTimestamps
4545
));
4646
{
4747
auto ev = env.GetRuntime().GrabEdgeEvent<TEvPrivate::TEvRequestCreateStream>(env.GetSender());

ydb/core/tx/replication/controller/stream_remover.cpp

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,7 @@ class TStreamRemover: public TActorBootstrapped<TStreamRemover> {
3737
.AppendDropChangefeeds(StreamName)));
3838
break;
3939
case TReplication::ETargetKind::Transfer:
40-
// TODO drop consumer
41-
Send(Parent, new TEvPrivate::TEvDropStreamResult(ReplicationId, TargetId, NYdb::TStatus{NYdb::EStatus::SUCCESS, NYdb::NIssue::TIssues{}}));
42-
PassAway();
43-
return;
40+
Y_ABORT("Unreachable");
4441
}
4542

4643
Become(&TThis::StateWork);
@@ -102,7 +99,13 @@ class TStreamRemover: public TActorBootstrapped<TStreamRemover> {
10299
}
103100

104101
void Bootstrap() {
105-
RequestPermission();
102+
switch (Kind) {
103+
case TReplication::ETargetKind::Table:
104+
case TReplication::ETargetKind::IndexTable:
105+
return RequestPermission();
106+
case TReplication::ETargetKind::Transfer:
107+
Y_ABORT("Unreachable");
108+
}
106109
}
107110

108111
STATEFN(StateBase) {

ydb/core/tx/replication/controller/target_base.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,14 @@ void TTargetBase::SetStreamName(const TString& value) {
9797
StreamName = value;
9898
}
9999

100+
const TString& TTargetBase::GetStreamConsumerName() const {
101+
return StreamConsumerName;
102+
}
103+
104+
void TTargetBase::SetStreamConsumerName(const TString& value) {
105+
StreamConsumerName = value;
106+
}
107+
100108
EStreamState TTargetBase::GetStreamState() const {
101109
return StreamState;
102110
}

ydb/core/tx/replication/controller/target_base.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ class TTargetBase
5656

5757
const TString& GetStreamName() const override;
5858
void SetStreamName(const TString& value) override;
59+
const TString& GetStreamConsumerName() const override;
60+
void SetStreamConsumerName(const TString& value) override;
5961

6062
EStreamState GetStreamState() const override;
6163
void SetStreamState(EStreamState value) override;
@@ -81,6 +83,7 @@ class TTargetBase
8183
EDstState DstState = EDstState::Creating;
8284
TPathId DstPathId;
8385
TString StreamName;
86+
TString StreamConsumerName;
8487
EStreamState StreamState = EStreamState::Ready;
8588
TString Issue;
8689

0 commit comments

Comments
 (0)