Skip to content

Add unique consumer for each transfer from topic #15242

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ydb/core/tx/replication/controller/event_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ THolder<TEvService::TEvRunWorker> MakeRunWorkerEv(
replication->GetConfig().GetSrcConnectionParams(),
replication->GetConfig().GetConsistencySettings(),
target.GetStreamPath(),
target.GetStreamConsumerName(),
target.GetDstPathId());
}

Expand All @@ -27,6 +28,7 @@ THolder<TEvService::TEvRunWorker> MakeRunWorkerEv(
const NKikimrReplication::TConnectionParams& connectionParams,
const NKikimrReplication::TConsistencySettings& consistencySettings,
const TString& srcStreamPath,
const TString& srcStreamConsumerName,
const TPathId& dstPathId)
{
auto ev = MakeHolder<TEvService::TEvRunWorker>();
Expand All @@ -41,7 +43,7 @@ THolder<TEvService::TEvRunWorker> MakeRunWorkerEv(
readerSettings.MutableConnectionParams()->CopyFrom(connectionParams);
readerSettings.SetTopicPath(srcStreamPath);
readerSettings.SetTopicPartitionId(workerId);
readerSettings.SetConsumerName(ReplicationConsumerName);
readerSettings.SetConsumerName(srcStreamConsumerName);

switch(config->GetKind()) {
case TReplication::ETargetKind::Table:
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/replication/controller/event_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ THolder<TEvService::TEvRunWorker> MakeRunWorkerEv(
const NKikimrReplication::TConnectionParams& connectionParams,
const NKikimrReplication::TConsistencySettings& consistencySettings,
const TString& srcStreamPath,
const TString& srcStreamConsumerName,
const TPathId& dstPathId);

}
2 changes: 2 additions & 0 deletions ydb/core/tx/replication/controller/replication.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ class TReplication: public TSimpleRefCount<TReplication> {

virtual const TString& GetStreamName() const = 0;
virtual void SetStreamName(const TString& value) = 0;
virtual const TString& GetStreamConsumerName() const = 0;
virtual void SetStreamConsumerName(const TString& value) = 0;
virtual TString GetStreamPath() const = 0;

virtual EStreamState GetStreamState() const = 0;
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/replication/controller/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ struct TControllerSchema: NIceDb::Schema {
struct TargetId: Column<2, NScheme::NTypeIds::Uint64> {};
struct Name: Column<3, NScheme::NTypeIds::Utf8> {};
struct State: Column<4, NScheme::NTypeIds::Uint8> { using Type = TReplication::EStreamState; };
struct ConsumerName: Column<5, NScheme::NTypeIds::Utf8> {};

using TKey = TableKey<ReplicationId, TargetId>;
using TColumns = TableColumns<ReplicationId, TargetId, Name, State>;
using TColumns = TableColumns<ReplicationId, TargetId, Name, State, ConsumerName>;
};

struct TxIds: Table<5> {
Expand Down
135 changes: 135 additions & 0 deletions ydb/core/tx/replication/controller/stream_consumer_remover.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
#include "logging.h"
#include "private_events.h"
#include "stream_consumer_remover.h"
#include "util.h"

#include <ydb/core/tx/replication/ydb_proxy/ydb_proxy.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/hfunc.h>
#include <ydb-cpp-sdk/client/types/status/status.h>

namespace NKikimr::NReplication::NController {

class TStreamConsumerRemover: public TActorBootstrapped<TStreamConsumerRemover> {
void RequestPermission() {
Send(Parent, new TEvPrivate::TEvRequestDropStream());
Become(&TThis::StateRequestPermission);
}

STATEFN(StateRequestPermission) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvPrivate::TEvAllowDropStream, Handle);
default:
return StateBase(ev);
}
}

void Handle(TEvPrivate::TEvAllowDropStream::TPtr& ev) {
LOG_T("Handle " << ev->Get()->ToString());
DropStreamConsumer();
}

void DropStreamConsumer() {
Send(YdbProxy, new TEvYdbProxy::TEvAlterTopicRequest(SrcPath, NYdb::NTopic::TAlterTopicSettings()
.AppendDropConsumers(ConsumerName)));

Become(&TThis::StateWork);
}

STATEFN(StateWork) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvYdbProxy::TEvAlterTopicResponse, Handle);
sFunc(TEvents::TEvWakeup, DropStreamConsumer);
default:
return StateBase(ev);
}
}

void Handle(TEvYdbProxy::TEvAlterTopicResponse::TPtr& ev) {
LOG_T("Handle " << ev->Get()->ToString());
auto& result = ev->Get()->Result;

if (!result.IsSuccess()) {
if (IsRetryableError(result)) {
LOG_D("Retry");
return Schedule(TDuration::Seconds(10), new TEvents::TEvWakeup);
}

LOG_E("Error"
<< ": status# " << result.GetStatus()
<< ", issues# " << result.GetIssues().ToOneLineString());
} else {
LOG_I("Success"
<< ": issues# " << result.GetIssues().ToOneLineString());
}

Send(Parent, new TEvPrivate::TEvDropStreamResult(ReplicationId, TargetId, std::move(result)));
PassAway();
}

public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::REPLICATION_CONTROLLER_STREAM_REMOVER;
}

explicit TStreamConsumerRemover(
const TActorId& parent,
const TActorId& proxy,
ui64 rid,
ui64 tid,
TReplication::ETargetKind kind,
const TString& srcPath,
const TString& consumerName)
: Parent(parent)
, YdbProxy(proxy)
, ReplicationId(rid)
, TargetId(tid)
, Kind(kind)
, SrcPath(srcPath)
, ConsumerName(consumerName)
, LogPrefix("StreamConsumerRemover", ReplicationId, TargetId)
{
}

void Bootstrap() {
switch (Kind) {
case TReplication::ETargetKind::Table:
case TReplication::ETargetKind::IndexTable:
Y_ABORT("Unreachable");
case TReplication::ETargetKind::Transfer:
return RequestPermission();
}
}

STATEFN(StateBase) {
switch (ev->GetTypeRewrite()) {
sFunc(TEvents::TEvPoison, PassAway);
}
}

private:
const TActorId Parent;
const TActorId YdbProxy;
const ui64 ReplicationId;
const ui64 TargetId;
const TReplication::ETargetKind Kind;
const TString SrcPath;
const TString ConsumerName;
const TActorLogPrefix LogPrefix;

}; // TStreamRemover

IActor* CreateStreamConsumerRemover(TReplication* replication, ui64 targetId, const TActorContext& ctx) {
const auto* target = replication->FindTarget(targetId);
Y_ABORT_UNLESS(target);
return CreateStreamConsumerRemover(ctx.SelfID, replication->GetYdbProxy(),
replication->GetId(), target->GetId(), target->GetKind(), target->GetSrcPath(), target->GetStreamConsumerName());
}

IActor* CreateStreamConsumerRemover(const TActorId& parent, const TActorId& proxy, ui64 rid, ui64 tid,
TReplication::ETargetKind kind, const TString& srcPath, const TString& consumerName)
{
return new TStreamConsumerRemover(parent, proxy, rid, tid, kind, srcPath, consumerName);
}

}
11 changes: 11 additions & 0 deletions ydb/core/tx/replication/controller/stream_consumer_remover.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#pragma once

#include "replication.h"

namespace NKikimr::NReplication::NController {

IActor* CreateStreamConsumerRemover(TReplication* replication, ui64 targetId, const TActorContext& ctx);
IActor* CreateStreamConsumerRemover(const TActorId& parent, const TActorId& proxy, ui64 rid, ui64 tid,
TReplication::ETargetKind kind, const TString& srcPath, const TString& consumerName);

}
11 changes: 7 additions & 4 deletions ydb/core/tx/replication/controller/stream_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class TStreamCreator: public TActorBootstrapped<TStreamCreator> {
const auto streamPath = BuildStreamPath();
const auto settings = NYdb::NTopic::TAlterTopicSettings()
.BeginAddConsumer()
.ConsumerName(ReplicationConsumerName)
.ConsumerName(SrcConsumerName)
.EndAddConsumer();

Send(YdbProxy, new TEvYdbProxy::TEvAlterTopicRequest(streamPath, settings));
Expand Down Expand Up @@ -175,6 +175,7 @@ class TStreamCreator: public TActorBootstrapped<TStreamCreator> {
ui64 tid,
const TReplication::ITarget::IConfig::TPtr& config,
const TString& streamName,
const TString& consumerName,
const TDuration& retentionPeriod,
const std::optional<TDuration>& resolvedTimestamps,
bool supportsTopicAutopartitioning)
Expand All @@ -184,6 +185,7 @@ class TStreamCreator: public TActorBootstrapped<TStreamCreator> {
, TargetId(tid)
, Kind(config->GetKind())
, SrcPath(config->GetSrcPath())
, SrcConsumerName(consumerName)
, Changefeed(MakeChangefeed(streamName, retentionPeriod, resolvedTimestamps, NJson::TJsonMap{
{"path", config->GetDstPath()},
{"id", ToString(rid)},
Expand All @@ -210,6 +212,7 @@ class TStreamCreator: public TActorBootstrapped<TStreamCreator> {
const ui64 TargetId;
const TReplication::ETargetKind Kind;
const TString SrcPath;
const TString SrcConsumerName;
const NYdb::NTable::TChangefeedDescription Changefeed;
const TActorLogPrefix LogPrefix;

Expand All @@ -226,19 +229,19 @@ IActor* CreateStreamCreator(TReplication* replication, ui64 targetId, const TAct

return CreateStreamCreator(ctx.SelfID, replication->GetYdbProxy(),
replication->GetId(), target->GetId(),
target->GetConfig(), target->GetStreamName(),
target->GetConfig(), target->GetStreamName(), target->GetStreamConsumerName(),
TDuration::Seconds(AppData()->ReplicationConfig.GetRetentionPeriodSeconds()), resolvedTimestamps,
AppData()->FeatureFlags.GetEnableTopicAutopartitioningForReplication());
}

IActor* CreateStreamCreator(const TActorId& parent, const TActorId& proxy, ui64 rid, ui64 tid,
const TReplication::ITarget::IConfig::TPtr& config,
const TString& streamName, const TDuration& retentionPeriod,
const TString& streamName, const TString& consumerName, const TDuration& retentionPeriod,
const std::optional<TDuration>& resolvedTimestamps,
bool supportsTopicAutopartitioning)
{
return new TStreamCreator(parent, proxy, rid, tid, config,
streamName, retentionPeriod, resolvedTimestamps, supportsTopicAutopartitioning);
streamName, consumerName, retentionPeriod, resolvedTimestamps, supportsTopicAutopartitioning);
}

}
2 changes: 1 addition & 1 deletion ydb/core/tx/replication/controller/stream_creator.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ IActor* CreateStreamCreator(TReplication* replication, ui64 targetId, const TAct

IActor* CreateStreamCreator(const TActorId& parent, const TActorId& proxy, ui64 rid, ui64 tid,
const TReplication::ITarget::IConfig::TPtr& config,
const TString& streamName, const TDuration& streamRetentionPeriod,
const TString& streamName, const TString& consumerName, const TDuration& streamRetentionPeriod,
const std::optional<TDuration>& resolvedTimestamps = std::nullopt,
bool supportsTopicAutopartitioning = false);

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/replication/controller/stream_creator_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Y_UNIT_TEST_SUITE(StreamCreator) {
env.GetRuntime().Register(CreateStreamCreator(
env.GetSender(), env.GetYdbProxy(), 1 /* rid */, 1 /* tid */,
std::make_shared<TTargetTable::TTableConfig>("/Root/Table", "/Root/Replica"),
"Stream", TDuration::Hours(1), resolvedTimestamps
"Stream", "replicationConsumer", TDuration::Hours(1), resolvedTimestamps
));
{
auto ev = env.GetRuntime().GrabEdgeEvent<TEvPrivate::TEvRequestCreateStream>(env.GetSender());
Expand Down
13 changes: 8 additions & 5 deletions ydb/core/tx/replication/controller/stream_remover.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,7 @@ class TStreamRemover: public TActorBootstrapped<TStreamRemover> {
.AppendDropChangefeeds(StreamName)));
break;
case TReplication::ETargetKind::Transfer:
// TODO drop consumer
Send(Parent, new TEvPrivate::TEvDropStreamResult(ReplicationId, TargetId, NYdb::TStatus{NYdb::EStatus::SUCCESS, NYdb::NIssue::TIssues{}}));
PassAway();
return;
Y_ABORT("Unreachable");
}

Become(&TThis::StateWork);
Expand Down Expand Up @@ -102,7 +99,13 @@ class TStreamRemover: public TActorBootstrapped<TStreamRemover> {
}

void Bootstrap() {
RequestPermission();
switch (Kind) {
case TReplication::ETargetKind::Table:
case TReplication::ETargetKind::IndexTable:
return RequestPermission();
case TReplication::ETargetKind::Transfer:
Y_ABORT("Unreachable");
}
}

STATEFN(StateBase) {
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/tx/replication/controller/target_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,14 @@ void TTargetBase::SetStreamName(const TString& value) {
StreamName = value;
}

const TString& TTargetBase::GetStreamConsumerName() const {
return StreamConsumerName;
}

void TTargetBase::SetStreamConsumerName(const TString& value) {
StreamConsumerName = value;
}

EStreamState TTargetBase::GetStreamState() const {
return StreamState;
}
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tx/replication/controller/target_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class TTargetBase

const TString& GetStreamName() const override;
void SetStreamName(const TString& value) override;
const TString& GetStreamConsumerName() const override;
void SetStreamConsumerName(const TString& value) override;

EStreamState GetStreamState() const override;
void SetStreamState(EStreamState value) override;
Expand All @@ -81,6 +83,7 @@ class TTargetBase
EDstState DstState = EDstState::Creating;
TPathId DstPathId;
TString StreamName;
TString StreamConsumerName;
EStreamState StreamState = EStreamState::Ready;
TString Issue;

Expand Down
Loading
Loading