Skip to content

Commit 54e72e1

Browse files
authored
Merge f9b02f0 into c5b531e
2 parents c5b531e + f9b02f0 commit 54e72e1

File tree

3 files changed

+71
-1
lines changed

3 files changed

+71
-1
lines changed

ydb/public/tools/lib/cmds/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,12 @@ def deploy(arguments):
356356
if kafka_api_port != 0:
357357
optionals['kafka_api_port'] = kafka_api_port
358358

359+
enabled_grpc_services = arguments.enabled_grpc_services.copy() # type: typing.List[str]
360+
if 'YDB_GRPC_SERVICES' in os.environ:
361+
services = os.environ['YDB_GRPC_SERVICES'].split(",")
362+
for service in services:
363+
enabled_grpc_services.append(service)
364+
359365
configuration = KikimrConfigGenerator(
360366
erasure=parse_erasure(arguments),
361367
binary_paths=[arguments.ydb_binary_path] if arguments.ydb_binary_path else None,
@@ -375,7 +381,7 @@ def deploy(arguments):
375381
use_log_files=not arguments.dont_use_log_files,
376382
default_users=default_users(),
377383
extra_feature_flags=enable_feature_flags,
378-
extra_grpc_services=arguments.enabled_grpc_services,
384+
extra_grpc_services=enabled_grpc_services,
379385
generic_connector_config=generic_connector_config(),
380386
**optionals
381387
)

ydb/tests/functional/transfer/main.cpp

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <ydb-cpp-sdk/client/topic/client.h>
77
#include <ydb-cpp-sdk/client/proto/accessor.h>
88
#include <ydb-cpp-sdk/client/draft/ydb_scripting.h>
9+
#include <ydb-cpp-sdk/client/draft/ydb_replication.h>
910

1011
#include <library/cpp/threading/local_executor/local_executor.h>
1112

@@ -179,6 +180,22 @@ struct MainTestCase {
179180
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
180181
}
181182

183+
void DropTransfer() {
184+
auto res = Session.ExecuteQuery(Sprintf(R"(
185+
DROP TRANSFER `%s`;
186+
)", TransferName.data()), TTxControl::NoTx()).GetValueSync();
187+
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
188+
}
189+
190+
auto DescribeTransfer() {
191+
NYdb::NReplication::TReplicationClient client(Driver);
192+
193+
NYdb::NReplication::TDescribeReplicationSettings settings;
194+
settings.IncludeStats(true);
195+
196+
return client.DescribeReplication(TString("/") + GetEnv("YDB_DATABASE") + "/" + TransferName, settings);
197+
}
198+
182199
void Write(const TMessage& message) {
183200
TWriteSessionSettings writeSettings;
184201
writeSettings.Path(TopicName);
@@ -768,5 +785,51 @@ Y_UNIT_TEST_SUITE(Transfer)
768785
});
769786
}
770787

788+
Y_UNIT_TEST(DropTransfer)
789+
{
790+
MainTestCase testCase;
791+
testCase.Run({
792+
.TableDDL = R"(
793+
CREATE TABLE `%s` (
794+
Key Uint64 NOT NULL,
795+
Message Utf8 NOT NULL,
796+
PRIMARY KEY (Key)
797+
) WITH (
798+
STORE = COLUMN
799+
);
800+
)",
801+
802+
.Lambda = R"(
803+
$l = ($x) -> {
804+
return [
805+
<|
806+
Key:CAST($x._offset AS Uint64),
807+
Message:CAST($x._data AS Utf8)
808+
|>
809+
];
810+
};
811+
)",
812+
813+
.Messages = {{"Message-1"}},
814+
815+
.Expectations = {{
816+
_C("Key", ui64(0)),
817+
_C("Message", TString("Message-1")),
818+
}}
819+
});
820+
821+
{
822+
auto result = testCase.DescribeTransfer().ExtractValueSync();
823+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToOneLineString());
824+
}
825+
826+
testCase.DropTransfer();
827+
828+
{
829+
auto result = testCase.DescribeTransfer().ExtractValueSync();
830+
UNIT_ASSERT_C(!result.IsSuccess(), result.GetIssues().ToOneLineString());
831+
}
832+
}
833+
771834
}
772835

ydb/tests/functional/transfer/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ ENV(YDB_USE_IN_MEMORY_PDISKS=true)
55
ENV(YDB_ERASURE=block_4-2)
66

77
ENV(YDB_FEATURE_FLAGS="enable_topic_transfer")
8+
ENV(YDB_GRPC_SERVICES="replication")
89

910
PEERDIR(
1011
library/cpp/threading/local_executor

0 commit comments

Comments
 (0)