|
13 | 13 | using namespace NYdb;
|
14 | 14 | using namespace NYdb::NQuery;
|
15 | 15 | using namespace NYdb::NTopic;
|
| 16 | +using namespace NYdb::NReplication; |
16 | 17 |
|
17 | 18 | namespace {
|
18 | 19 |
|
@@ -188,14 +189,22 @@ struct MainTestCase {
|
188 | 189 | }
|
189 | 190 |
|
190 | 191 | auto DescribeTransfer() {
|
191 |
| - NYdb::NReplication::TReplicationClient client(Driver); |
| 192 | + TReplicationClient client(Driver); |
192 | 193 |
|
193 |
| - NYdb::NReplication::TDescribeReplicationSettings settings; |
| 194 | + TDescribeReplicationSettings settings; |
194 | 195 | settings.IncludeStats(true);
|
195 | 196 |
|
196 | 197 | return client.DescribeReplication(TString("/") + GetEnv("YDB_DATABASE") + "/" + TransferName, settings);
|
197 | 198 | }
|
198 | 199 |
|
| 200 | + auto DescribeTopic() { |
| 201 | + TDescribeTopicSettings settings; |
| 202 | + settings.IncludeLocation(true); |
| 203 | + settings.IncludeStats(true); |
| 204 | + |
| 205 | + return TopicClient.DescribeTopic(TopicName, settings); |
| 206 | + } |
| 207 | + |
199 | 208 | void Write(const TMessage& message) {
|
200 | 209 | TWriteSessionSettings writeSettings;
|
201 | 210 | writeSettings.Path(TopicName);
|
@@ -255,7 +264,7 @@ struct MainTestCase {
|
255 | 264 | break;
|
256 | 265 | }
|
257 | 266 |
|
258 |
| - UNIT_ASSERT_C(attempt, "Unable to wait replication result"); |
| 267 | + UNIT_ASSERT_C(attempt, "Unable to wait transfer result"); |
259 | 268 | Sleep(TDuration::Seconds(1));
|
260 | 269 | }
|
261 | 270 | }
|
@@ -828,6 +837,63 @@ Y_UNIT_TEST_SUITE(Transfer)
|
828 | 837 | {
|
829 | 838 | auto result = testCase.DescribeTransfer().ExtractValueSync();
|
830 | 839 | UNIT_ASSERT_C(!result.IsSuccess(), result.GetIssues().ToOneLineString());
|
| 840 | + UNIT_ASSERT_VALUES_EQUAL(EStatus::SCHEME_ERROR, result.GetStatus()); |
| 841 | + } |
| 842 | + } |
| 843 | + |
| 844 | + Y_UNIT_TEST(CreateAndDropConsumer) |
| 845 | + { |
| 846 | + MainTestCase testCase; |
| 847 | + testCase.CreateTable(R"( |
| 848 | + CREATE TABLE `%s` ( |
| 849 | + Key Uint64 NOT NULL, |
| 850 | + Message Utf8 NOT NULL, |
| 851 | + PRIMARY KEY (Key) |
| 852 | + ) WITH ( |
| 853 | + STORE = COLUMN |
| 854 | + ); |
| 855 | + )"); |
| 856 | + |
| 857 | + testCase.CreateTopic(); |
| 858 | + testCase.CreateTransfer(R"( |
| 859 | + $l = ($x) -> { |
| 860 | + return [ |
| 861 | + <| |
| 862 | + Key:CAST($x._offset AS Uint64), |
| 863 | + Message:CAST($x._data AS Utf8) |
| 864 | + |> |
| 865 | + ]; |
| 866 | + }; |
| 867 | + )"); |
| 868 | + |
| 869 | + for (size_t i = 20; i--; ) { |
| 870 | + auto result = testCase.DescribeTopic().ExtractValueSync(); |
| 871 | + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToOneLineString()); |
| 872 | + auto& consumers = result.GetTopicDescription().GetConsumers(); |
| 873 | + if (1 == consumers.size()) { |
| 874 | + UNIT_ASSERT_VALUES_EQUAL(1, consumers.size()); |
| 875 | + Cerr << "Consumer name is '" << consumers[0].GetConsumerName() << "'" << Endl << Flush; |
| 876 | + UNIT_ASSERT_VALUES_EQUAL_C(35, consumers[0].GetConsumerName().size(), "Consumer name is random uuid"); |
| 877 | + break; |
| 878 | + } |
| 879 | + |
| 880 | + UNIT_ASSERT_C(i, "Unable to wait consumer has been created"); |
| 881 | + Sleep(TDuration::Seconds(1)); |
| 882 | + } |
| 883 | + |
| 884 | + testCase.DropTransfer(); |
| 885 | + |
| 886 | + for (size_t i = 20; i--; ) { |
| 887 | + auto result = testCase.DescribeTopic().ExtractValueSync(); |
| 888 | + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToOneLineString()); |
| 889 | + auto& consumers = result.GetTopicDescription().GetConsumers(); |
| 890 | + if (0 == consumers.size()) { |
| 891 | + UNIT_ASSERT_VALUES_EQUAL(0, consumers.size()); |
| 892 | + break; |
| 893 | + } |
| 894 | + |
| 895 | + UNIT_ASSERT_C(i, "Unable to wait consumer has been removed"); |
| 896 | + Sleep(TDuration::Seconds(1)); |
831 | 897 | }
|
832 | 898 | }
|
833 | 899 |
|
|
0 commit comments