diff --git a/ydb/core/tx/replication/controller/event_util.cpp b/ydb/core/tx/replication/controller/event_util.cpp index 48825932868d..77080559a7bd 100644 --- a/ydb/core/tx/replication/controller/event_util.cpp +++ b/ydb/core/tx/replication/controller/event_util.cpp @@ -16,6 +16,7 @@ THolder MakeRunWorkerEv( replication->GetConfig().GetSrcConnectionParams(), replication->GetConfig().GetConsistencySettings(), target.GetStreamPath(), + target.GetStreamConsumerName(), target.GetDstPathId()); } @@ -27,6 +28,7 @@ THolder MakeRunWorkerEv( const NKikimrReplication::TConnectionParams& connectionParams, const NKikimrReplication::TConsistencySettings& consistencySettings, const TString& srcStreamPath, + const TString& srcStreamConsumerName, const TPathId& dstPathId) { auto ev = MakeHolder(); @@ -41,7 +43,7 @@ THolder MakeRunWorkerEv( readerSettings.MutableConnectionParams()->CopyFrom(connectionParams); readerSettings.SetTopicPath(srcStreamPath); readerSettings.SetTopicPartitionId(workerId); - readerSettings.SetConsumerName(ReplicationConsumerName); + readerSettings.SetConsumerName(srcStreamConsumerName); switch(config->GetKind()) { case TReplication::ETargetKind::Table: diff --git a/ydb/core/tx/replication/controller/event_util.h b/ydb/core/tx/replication/controller/event_util.h index 3a76cf0a2c77..e37dc961635e 100644 --- a/ydb/core/tx/replication/controller/event_util.h +++ b/ydb/core/tx/replication/controller/event_util.h @@ -19,6 +19,7 @@ THolder MakeRunWorkerEv( const NKikimrReplication::TConnectionParams& connectionParams, const NKikimrReplication::TConsistencySettings& consistencySettings, const TString& srcStreamPath, + const TString& srcStreamConsumerName, const TPathId& dstPathId); } diff --git a/ydb/core/tx/replication/controller/replication.h b/ydb/core/tx/replication/controller/replication.h index c95222d382a8..3a922d64ecdb 100644 --- a/ydb/core/tx/replication/controller/replication.h +++ b/ydb/core/tx/replication/controller/replication.h @@ -82,6 +82,8 @@ class TReplication: public TSimpleRefCount { virtual const TString& GetStreamName() const = 0; virtual void SetStreamName(const TString& value) = 0; + virtual const TString& GetStreamConsumerName() const = 0; + virtual void SetStreamConsumerName(const TString& value) = 0; virtual TString GetStreamPath() const = 0; virtual EStreamState GetStreamState() const = 0; diff --git a/ydb/core/tx/replication/controller/schema.h b/ydb/core/tx/replication/controller/schema.h index b30ab4a839ad..16a7e4031217 100644 --- a/ydb/core/tx/replication/controller/schema.h +++ b/ydb/core/tx/replication/controller/schema.h @@ -58,9 +58,10 @@ struct TControllerSchema: NIceDb::Schema { struct TargetId: Column<2, NScheme::NTypeIds::Uint64> {}; struct Name: Column<3, NScheme::NTypeIds::Utf8> {}; struct State: Column<4, NScheme::NTypeIds::Uint8> { using Type = TReplication::EStreamState; }; + struct ConsumerName: Column<5, NScheme::NTypeIds::Utf8> {}; using TKey = TableKey; - using TColumns = TableColumns; + using TColumns = TableColumns; }; struct TxIds: Table<5> { diff --git a/ydb/core/tx/replication/controller/stream_consumer_remover.cpp b/ydb/core/tx/replication/controller/stream_consumer_remover.cpp new file mode 100644 index 000000000000..4fde8184f0ca --- /dev/null +++ b/ydb/core/tx/replication/controller/stream_consumer_remover.cpp @@ -0,0 +1,135 @@ +#include "logging.h" +#include "private_events.h" +#include "stream_consumer_remover.h" +#include "util.h" + +#include +#include +#include +#include + +namespace NKikimr::NReplication::NController { + +class TStreamConsumerRemover: public TActorBootstrapped { + void RequestPermission() { + Send(Parent, new TEvPrivate::TEvRequestDropStream()); + Become(&TThis::StateRequestPermission); + } + + STATEFN(StateRequestPermission) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvPrivate::TEvAllowDropStream, Handle); + default: + return StateBase(ev); + } + } + + void Handle(TEvPrivate::TEvAllowDropStream::TPtr& ev) { + LOG_T("Handle " << ev->Get()->ToString()); + DropStreamConsumer(); + } + + void DropStreamConsumer() { + Send(YdbProxy, new TEvYdbProxy::TEvAlterTopicRequest(SrcPath, NYdb::NTopic::TAlterTopicSettings() + .AppendDropConsumers(ConsumerName))); + + Become(&TThis::StateWork); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvYdbProxy::TEvAlterTopicResponse, Handle); + sFunc(TEvents::TEvWakeup, DropStreamConsumer); + default: + return StateBase(ev); + } + } + + void Handle(TEvYdbProxy::TEvAlterTopicResponse::TPtr& ev) { + LOG_T("Handle " << ev->Get()->ToString()); + auto& result = ev->Get()->Result; + + if (!result.IsSuccess()) { + if (IsRetryableError(result)) { + LOG_D("Retry"); + return Schedule(TDuration::Seconds(10), new TEvents::TEvWakeup); + } + + LOG_E("Error" + << ": status# " << result.GetStatus() + << ", issues# " << result.GetIssues().ToOneLineString()); + } else { + LOG_I("Success" + << ": issues# " << result.GetIssues().ToOneLineString()); + } + + Send(Parent, new TEvPrivate::TEvDropStreamResult(ReplicationId, TargetId, std::move(result))); + PassAway(); + } + +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::REPLICATION_CONTROLLER_STREAM_REMOVER; + } + + explicit TStreamConsumerRemover( + const TActorId& parent, + const TActorId& proxy, + ui64 rid, + ui64 tid, + TReplication::ETargetKind kind, + const TString& srcPath, + const TString& consumerName) + : Parent(parent) + , YdbProxy(proxy) + , ReplicationId(rid) + , TargetId(tid) + , Kind(kind) + , SrcPath(srcPath) + , ConsumerName(consumerName) + , LogPrefix("StreamConsumerRemover", ReplicationId, TargetId) + { + } + + void Bootstrap() { + switch (Kind) { + case TReplication::ETargetKind::Table: + case TReplication::ETargetKind::IndexTable: + Y_ABORT("Unreachable"); + case TReplication::ETargetKind::Transfer: + return RequestPermission(); + } + } + + STATEFN(StateBase) { + switch (ev->GetTypeRewrite()) { + sFunc(TEvents::TEvPoison, PassAway); + } + } + +private: + const TActorId Parent; + const TActorId YdbProxy; + const ui64 ReplicationId; + const ui64 TargetId; + const TReplication::ETargetKind Kind; + const TString SrcPath; + const TString ConsumerName; + const TActorLogPrefix LogPrefix; + +}; // TStreamRemover + +IActor* CreateStreamConsumerRemover(TReplication* replication, ui64 targetId, const TActorContext& ctx) { + const auto* target = replication->FindTarget(targetId); + Y_ABORT_UNLESS(target); + return CreateStreamConsumerRemover(ctx.SelfID, replication->GetYdbProxy(), + replication->GetId(), target->GetId(), target->GetKind(), target->GetSrcPath(), target->GetStreamConsumerName()); +} + +IActor* CreateStreamConsumerRemover(const TActorId& parent, const TActorId& proxy, ui64 rid, ui64 tid, + TReplication::ETargetKind kind, const TString& srcPath, const TString& consumerName) +{ + return new TStreamConsumerRemover(parent, proxy, rid, tid, kind, srcPath, consumerName); +} + +} diff --git a/ydb/core/tx/replication/controller/stream_consumer_remover.h b/ydb/core/tx/replication/controller/stream_consumer_remover.h new file mode 100644 index 000000000000..38d4e9288d28 --- /dev/null +++ b/ydb/core/tx/replication/controller/stream_consumer_remover.h @@ -0,0 +1,11 @@ +#pragma once + +#include "replication.h" + +namespace NKikimr::NReplication::NController { + +IActor* CreateStreamConsumerRemover(TReplication* replication, ui64 targetId, const TActorContext& ctx); +IActor* CreateStreamConsumerRemover(const TActorId& parent, const TActorId& proxy, ui64 rid, ui64 tid, + TReplication::ETargetKind kind, const TString& srcPath, const TString& consumerName); + +} diff --git a/ydb/core/tx/replication/controller/stream_creator.cpp b/ydb/core/tx/replication/controller/stream_creator.cpp index 90ab28673176..c9d938b52012 100644 --- a/ydb/core/tx/replication/controller/stream_creator.cpp +++ b/ydb/core/tx/replication/controller/stream_creator.cpp @@ -117,7 +117,7 @@ class TStreamCreator: public TActorBootstrapped { const auto streamPath = BuildStreamPath(); const auto settings = NYdb::NTopic::TAlterTopicSettings() .BeginAddConsumer() - .ConsumerName(ReplicationConsumerName) + .ConsumerName(SrcConsumerName) .EndAddConsumer(); Send(YdbProxy, new TEvYdbProxy::TEvAlterTopicRequest(streamPath, settings)); @@ -175,6 +175,7 @@ class TStreamCreator: public TActorBootstrapped { ui64 tid, const TReplication::ITarget::IConfig::TPtr& config, const TString& streamName, + const TString& consumerName, const TDuration& retentionPeriod, const std::optional& resolvedTimestamps, bool supportsTopicAutopartitioning) @@ -184,6 +185,7 @@ class TStreamCreator: public TActorBootstrapped { , TargetId(tid) , Kind(config->GetKind()) , SrcPath(config->GetSrcPath()) + , SrcConsumerName(consumerName) , Changefeed(MakeChangefeed(streamName, retentionPeriod, resolvedTimestamps, NJson::TJsonMap{ {"path", config->GetDstPath()}, {"id", ToString(rid)}, @@ -210,6 +212,7 @@ class TStreamCreator: public TActorBootstrapped { const ui64 TargetId; const TReplication::ETargetKind Kind; const TString SrcPath; + const TString SrcConsumerName; const NYdb::NTable::TChangefeedDescription Changefeed; const TActorLogPrefix LogPrefix; @@ -226,19 +229,19 @@ IActor* CreateStreamCreator(TReplication* replication, ui64 targetId, const TAct return CreateStreamCreator(ctx.SelfID, replication->GetYdbProxy(), replication->GetId(), target->GetId(), - target->GetConfig(), target->GetStreamName(), + target->GetConfig(), target->GetStreamName(), target->GetStreamConsumerName(), TDuration::Seconds(AppData()->ReplicationConfig.GetRetentionPeriodSeconds()), resolvedTimestamps, AppData()->FeatureFlags.GetEnableTopicAutopartitioningForReplication()); } IActor* CreateStreamCreator(const TActorId& parent, const TActorId& proxy, ui64 rid, ui64 tid, const TReplication::ITarget::IConfig::TPtr& config, - const TString& streamName, const TDuration& retentionPeriod, + const TString& streamName, const TString& consumerName, const TDuration& retentionPeriod, const std::optional& resolvedTimestamps, bool supportsTopicAutopartitioning) { return new TStreamCreator(parent, proxy, rid, tid, config, - streamName, retentionPeriod, resolvedTimestamps, supportsTopicAutopartitioning); + streamName, consumerName, retentionPeriod, resolvedTimestamps, supportsTopicAutopartitioning); } } diff --git a/ydb/core/tx/replication/controller/stream_creator.h b/ydb/core/tx/replication/controller/stream_creator.h index 718499e9c565..00b156425a1d 100644 --- a/ydb/core/tx/replication/controller/stream_creator.h +++ b/ydb/core/tx/replication/controller/stream_creator.h @@ -10,7 +10,7 @@ IActor* CreateStreamCreator(TReplication* replication, ui64 targetId, const TAct IActor* CreateStreamCreator(const TActorId& parent, const TActorId& proxy, ui64 rid, ui64 tid, const TReplication::ITarget::IConfig::TPtr& config, - const TString& streamName, const TDuration& streamRetentionPeriod, + const TString& streamName, const TString& consumerName, const TDuration& streamRetentionPeriod, const std::optional& resolvedTimestamps = std::nullopt, bool supportsTopicAutopartitioning = false); diff --git a/ydb/core/tx/replication/controller/stream_creator_ut.cpp b/ydb/core/tx/replication/controller/stream_creator_ut.cpp index 9f40132df7f5..87b521b8ce5c 100644 --- a/ydb/core/tx/replication/controller/stream_creator_ut.cpp +++ b/ydb/core/tx/replication/controller/stream_creator_ut.cpp @@ -41,7 +41,7 @@ Y_UNIT_TEST_SUITE(StreamCreator) { env.GetRuntime().Register(CreateStreamCreator( env.GetSender(), env.GetYdbProxy(), 1 /* rid */, 1 /* tid */, std::make_shared("/Root/Table", "/Root/Replica"), - "Stream", TDuration::Hours(1), resolvedTimestamps + "Stream", "replicationConsumer", TDuration::Hours(1), resolvedTimestamps )); { auto ev = env.GetRuntime().GrabEdgeEvent(env.GetSender()); diff --git a/ydb/core/tx/replication/controller/stream_remover.cpp b/ydb/core/tx/replication/controller/stream_remover.cpp index ad007d7f5632..feccc81dd4c7 100644 --- a/ydb/core/tx/replication/controller/stream_remover.cpp +++ b/ydb/core/tx/replication/controller/stream_remover.cpp @@ -37,10 +37,7 @@ class TStreamRemover: public TActorBootstrapped { .AppendDropChangefeeds(StreamName))); break; case TReplication::ETargetKind::Transfer: - // TODO drop consumer - Send(Parent, new TEvPrivate::TEvDropStreamResult(ReplicationId, TargetId, NYdb::TStatus{NYdb::EStatus::SUCCESS, NYdb::NIssue::TIssues{}})); - PassAway(); - return; + Y_ABORT("Unreachable"); } Become(&TThis::StateWork); @@ -102,7 +99,13 @@ class TStreamRemover: public TActorBootstrapped { } void Bootstrap() { - RequestPermission(); + switch (Kind) { + case TReplication::ETargetKind::Table: + case TReplication::ETargetKind::IndexTable: + return RequestPermission(); + case TReplication::ETargetKind::Transfer: + Y_ABORT("Unreachable"); + } } STATEFN(StateBase) { diff --git a/ydb/core/tx/replication/controller/target_base.cpp b/ydb/core/tx/replication/controller/target_base.cpp index 59ff79a38b1d..2a425706dce3 100644 --- a/ydb/core/tx/replication/controller/target_base.cpp +++ b/ydb/core/tx/replication/controller/target_base.cpp @@ -97,6 +97,14 @@ void TTargetBase::SetStreamName(const TString& value) { StreamName = value; } +const TString& TTargetBase::GetStreamConsumerName() const { + return StreamConsumerName; +} + +void TTargetBase::SetStreamConsumerName(const TString& value) { + StreamConsumerName = value; +} + EStreamState TTargetBase::GetStreamState() const { return StreamState; } diff --git a/ydb/core/tx/replication/controller/target_base.h b/ydb/core/tx/replication/controller/target_base.h index f3a474752b26..76c2d1f6c93f 100644 --- a/ydb/core/tx/replication/controller/target_base.h +++ b/ydb/core/tx/replication/controller/target_base.h @@ -56,6 +56,8 @@ class TTargetBase const TString& GetStreamName() const override; void SetStreamName(const TString& value) override; + const TString& GetStreamConsumerName() const override; + void SetStreamConsumerName(const TString& value) override; EStreamState GetStreamState() const override; void SetStreamState(EStreamState value) override; @@ -81,6 +83,7 @@ class TTargetBase EDstState DstState = EDstState::Creating; TPathId DstPathId; TString StreamName; + TString StreamConsumerName; EStreamState StreamState = EStreamState::Ready; TString Issue; diff --git a/ydb/core/tx/replication/controller/target_table.cpp b/ydb/core/tx/replication/controller/target_table.cpp index e60b71693e68..f76058250df9 100644 --- a/ydb/core/tx/replication/controller/target_table.cpp +++ b/ydb/core/tx/replication/controller/target_table.cpp @@ -1,5 +1,6 @@ #include "event_util.h" #include "logging.h" +#include "stream_consumer_remover.h" #include "target_table.h" #include "util.h" @@ -33,7 +34,7 @@ class TTableWorkerRegistar: public TActorBootstrapped { auto ev = MakeRunWorkerEv( ReplicationId, TargetId, Config, partition.GetPartitionId(), - ConnectionParams, ConsistencySettings, SrcStreamPath, DstPathId); + ConnectionParams, ConsistencySettings, SrcStreamPath, SrcStreamConsumerName, DstPathId); Send(Parent, std::move(ev)); } @@ -58,6 +59,7 @@ class TTableWorkerRegistar: public TActorBootstrapped { ui64 rid, ui64 tid, const TString& srcStreamPath, + const TString& srcStreamConsumerName, const TPathId& dstPathId, const TReplication::ITarget::IConfig::TPtr& config) : Parent(parent) @@ -67,6 +69,7 @@ class TTableWorkerRegistar: public TActorBootstrapped { , ReplicationId(rid) , TargetId(tid) , SrcStreamPath(srcStreamPath) + , SrcStreamConsumerName(srcStreamConsumerName) , DstPathId(dstPathId) , LogPrefix("TableWorkerRegistar", ReplicationId, TargetId) , Config(config) @@ -94,6 +97,7 @@ class TTableWorkerRegistar: public TActorBootstrapped { const ui64 ReplicationId; const ui64 TargetId; const TString SrcStreamPath; + const TString SrcStreamConsumerName; const TPathId DstPathId; const TActorLogPrefix LogPrefix; const TReplication::ITarget::IConfig::TPtr Config; @@ -111,7 +115,7 @@ IActor* TTargetTableBase::CreateWorkerRegistar(const TActorContext& ctx) const { const auto& config = replication->GetConfig(); return new TTableWorkerRegistar(ctx.SelfID, replication->GetYdbProxy(), config.GetSrcConnectionParams(), config.GetConsistencySettings(), - replication->GetId(), GetId(), BuildStreamPath(), GetDstPathId(), GetConfig()); + replication->GetId(), GetId(), BuildStreamPath(), GetStreamConsumerName(), GetDstPathId(), GetConfig()); } TTargetTable::TTargetTable(TReplication* replication, ui64 id, const IConfig::TPtr& config) @@ -149,6 +153,37 @@ void TTargetTransfer::UpdateConfig(const NKikimrReplication::TReplicationConfig& t.GetTransformLambda()); } +void TTargetTransfer::Progress(const TActorContext& ctx) { + auto replication = GetReplication(); + + switch (GetStreamState()) { + case EStreamState::Removing: + if (GetWorkers()) { + RemoveWorkers(ctx); + } else if (!StreamConsumerRemover) { + StreamConsumerRemover = ctx.Register(CreateStreamConsumerRemover(replication, GetId(), ctx)); + } + return; + case EStreamState::Creating: + case EStreamState::Ready: + case EStreamState::Removed: + case EStreamState::Error: + break; + } + + TTargetWithStream::Progress(ctx); +} + +void TTargetTransfer::Shutdown(const TActorContext& ctx) { + for (auto* x : TVector{&StreamConsumerRemover}) { + if (auto actorId = std::exchange(*x, {})) { + ctx.Send(actorId, new TEvents::TEvPoison()); + } + } + + TTargetWithStream::Shutdown(ctx); +} + TString TTargetTransfer::BuildStreamPath() const { return CanonizePath(GetSrcPath()); } diff --git a/ydb/core/tx/replication/controller/target_table.h b/ydb/core/tx/replication/controller/target_table.h index 6b43fd2fc1e5..23c70dff7cd8 100644 --- a/ydb/core/tx/replication/controller/target_table.h +++ b/ydb/core/tx/replication/controller/target_table.h @@ -68,8 +68,14 @@ class TTargetTransfer: public TTargetTableBase { void UpdateConfig(const NKikimrReplication::TReplicationConfig&) override; + void Progress(const TActorContext& ctx) override; + void Shutdown(const TActorContext& ctx) override; + protected: TString BuildStreamPath() const override; + +private: + TActorId StreamConsumerRemover; }; } diff --git a/ydb/core/tx/replication/controller/tx_assign_stream_name.cpp b/ydb/core/tx/replication/controller/tx_assign_stream_name.cpp index 262cf958b6d3..5c6649375bd7 100644 --- a/ydb/core/tx/replication/controller/tx_assign_stream_name.cpp +++ b/ydb/core/tx/replication/controller/tx_assign_stream_name.cpp @@ -4,6 +4,8 @@ namespace NKikimr::NReplication::NController { +extern const TString ReplicationConsumerName; + class TController::TTxAssignStreamName: public TTxBase { TEvPrivate::TEvAssignStreamName::TPtr Ev; TReplication::TPtr Replication; @@ -49,9 +51,15 @@ class TController::TTxAssignStreamName: public TTxBase { target->SetStreamName(CreateGuidAsString()); + TString consumerName = Replication->GetConfig().HasTransferSpecific() + ? CreateGuidAsString() + : ReplicationConsumerName; + target->SetStreamConsumerName(consumerName); + NIceDb::TNiceDb db(txc.DB); db.Table().Key(rid, tid).Update( - NIceDb::TUpdate(target->GetStreamName()) + NIceDb::TUpdate(target->GetStreamName()), + NIceDb::TUpdate(target->GetStreamConsumerName()) ); CLOG_N(ctx, "Stream name assigned" diff --git a/ydb/core/tx/replication/controller/tx_init.cpp b/ydb/core/tx/replication/controller/tx_init.cpp index 99db3921c94c..749dd1834d0d 100644 --- a/ydb/core/tx/replication/controller/tx_init.cpp +++ b/ydb/core/tx/replication/controller/tx_init.cpp @@ -132,6 +132,7 @@ class TController::TTxInit: public TTxBase { const auto tid = rowset.GetValue(); const auto name = rowset.GetValue(); const auto state = rowset.GetValue(); + const auto consumerName = rowset.GetValueOrDefault(ReplicationConsumerName); auto replication = Self->Find(rid); Y_VERIFY_S(replication, "Unknown replication: " << rid); @@ -143,6 +144,7 @@ class TController::TTxInit: public TTxBase { target->SetStreamName(name); target->SetStreamState(state); + target->SetStreamConsumerName(consumerName); if (!rowset.Next()) { return false; diff --git a/ydb/core/tx/replication/controller/ya.make b/ydb/core/tx/replication/controller/ya.make index 056a10b2465a..758383bf8755 100644 --- a/ydb/core/tx/replication/controller/ya.make +++ b/ydb/core/tx/replication/controller/ya.make @@ -31,6 +31,7 @@ SRCS( replication.cpp secret_resolver.cpp session_info.cpp + stream_consumer_remover.cpp stream_creator.cpp stream_remover.cpp sys_params.cpp diff --git a/ydb/tests/functional/transfer/main.cpp b/ydb/tests/functional/transfer/main.cpp index e991653079fb..ce0dea4b3ce9 100644 --- a/ydb/tests/functional/transfer/main.cpp +++ b/ydb/tests/functional/transfer/main.cpp @@ -13,6 +13,7 @@ using namespace NYdb; using namespace NYdb::NQuery; using namespace NYdb::NTopic; +using namespace NYdb::NReplication; namespace { @@ -188,14 +189,22 @@ struct MainTestCase { } auto DescribeTransfer() { - NYdb::NReplication::TReplicationClient client(Driver); + TReplicationClient client(Driver); - NYdb::NReplication::TDescribeReplicationSettings settings; + TDescribeReplicationSettings settings; settings.IncludeStats(true); return client.DescribeReplication(TString("/") + GetEnv("YDB_DATABASE") + "/" + TransferName, settings); } + auto DescribeTopic() { + TDescribeTopicSettings settings; + settings.IncludeLocation(true); + settings.IncludeStats(true); + + return TopicClient.DescribeTopic(TopicName, settings); + } + void Write(const TMessage& message) { TWriteSessionSettings writeSettings; writeSettings.Path(TopicName); @@ -255,7 +264,7 @@ struct MainTestCase { break; } - UNIT_ASSERT_C(attempt, "Unable to wait replication result"); + UNIT_ASSERT_C(attempt, "Unable to wait transfer result"); Sleep(TDuration::Seconds(1)); } } @@ -828,6 +837,63 @@ Y_UNIT_TEST_SUITE(Transfer) { auto result = testCase.DescribeTransfer().ExtractValueSync(); UNIT_ASSERT_C(!result.IsSuccess(), result.GetIssues().ToOneLineString()); + UNIT_ASSERT_VALUES_EQUAL(EStatus::SCHEME_ERROR, result.GetStatus()); + } + } + + Y_UNIT_TEST(CreateAndDropConsumer) + { + MainTestCase testCase; + testCase.CreateTable(R"( + CREATE TABLE `%s` ( + Key Uint64 NOT NULL, + Message Utf8 NOT NULL, + PRIMARY KEY (Key) + ) WITH ( + STORE = COLUMN + ); + )"); + + testCase.CreateTopic(); + testCase.CreateTransfer(R"( + $l = ($x) -> { + return [ + <| + Key:CAST($x._offset AS Uint64), + Message:CAST($x._data AS Utf8) + |> + ]; + }; + )"); + + for (size_t i = 20; i--; ) { + auto result = testCase.DescribeTopic().ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToOneLineString()); + auto& consumers = result.GetTopicDescription().GetConsumers(); + if (1 == consumers.size()) { + UNIT_ASSERT_VALUES_EQUAL(1, consumers.size()); + Cerr << "Consumer name is '" << consumers[0].GetConsumerName() << "'" << Endl << Flush; + UNIT_ASSERT_VALUES_EQUAL_C(35, consumers[0].GetConsumerName().size(), "Consumer name is random uuid"); + break; + } + + UNIT_ASSERT_C(i, "Unable to wait consumer has been created"); + Sleep(TDuration::Seconds(1)); + } + + testCase.DropTransfer(); + + for (size_t i = 20; i--; ) { + auto result = testCase.DescribeTopic().ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToOneLineString()); + auto& consumers = result.GetTopicDescription().GetConsumers(); + if (0 == consumers.size()) { + UNIT_ASSERT_VALUES_EQUAL(0, consumers.size()); + break; + } + + UNIT_ASSERT_C(i, "Unable to wait consumer has been removed"); + Sleep(TDuration::Seconds(1)); } }