@@ -42,6 +42,10 @@ class TFixture : public NUnitTest::TBaseFixture {
42
42
void WaitForEvent ();
43
43
};
44
44
45
+ struct TAlterTopicParameters {
46
+ TMaybe<ui32> PartitionsCount;
47
+ };
48
+
45
49
void SetUp (NUnitTest::TTestContext&) override ;
46
50
47
51
NTable::TSession CreateTableSession ();
@@ -67,6 +71,8 @@ class TFixture : public NUnitTest::TBaseFixture {
67
71
std::optional<size_t > maxPartitionCount = std::nullopt);
68
72
void DescribeTopic (const TString& path);
69
73
74
+ void AddConsumer (const TString& topic, const TVector<TString>& consumers);
75
+
70
76
void WriteToTopicWithInvalidTxId (bool invalidTxId);
71
77
72
78
TTopicWriteSessionPtr CreateTopicWriteSession (const TString& topicPath,
@@ -198,6 +204,7 @@ void TFixture::SetUp(NUnitTest::TTestContext&)
198
204
{
199
205
NKikimr::Tests::TServerSettings settings = TTopicSdkTestSetup::MakeServerSettings ();
200
206
settings.SetEnableTopicServiceTx (true );
207
+ settings.SetEnablePQConfigTransactionsAtSchemeShard (true );
201
208
Setup = std::make_unique<TTopicSdkTestSetup>(TEST_CASE_NAME, settings);
202
209
203
210
Driver = std::make_unique<TDriver>(Setup->MakeDriver ());
@@ -329,6 +336,20 @@ void TFixture::CreateTopic(const TString& path,
329
336
Setup->CreateTopic (path, consumer, partitionCount, maxPartitionCount);
330
337
}
331
338
339
+ void TFixture::AddConsumer (const TString& path,
340
+ const TVector<TString>& consumers)
341
+ {
342
+ NTopic::TTopicClient client (GetDriver ());
343
+ NTopic::TAlterTopicSettings settings;
344
+
345
+ for (const auto & consumer : consumers) {
346
+ settings.BeginAddConsumer (consumer);
347
+ }
348
+
349
+ auto result = client.AlterTopic (path, settings).GetValueSync ();
350
+ UNIT_ASSERT_C (result.IsSuccess (), result.GetIssues ().ToString ());
351
+ }
352
+
332
353
void TFixture::DescribeTopic (const TString& path)
333
354
{
334
355
Setup->DescribeTopic (path);
@@ -871,6 +892,23 @@ void TFixture::RestartLongTxService()
871
892
}
872
893
}
873
894
895
+ Y_UNIT_TEST_F (WriteToTopic_Demo_39, TFixture)
896
+ {
897
+ CreateTopic (" topic_A" , TEST_CONSUMER);
898
+
899
+ NTable::TSession tableSession = CreateTableSession ();
900
+ NTable::TTransaction tx = BeginTx (tableSession);
901
+
902
+ WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID, " message #1" , &tx);
903
+ WriteToTopic (" topic_A" , TEST_MESSAGE_GROUP_ID, " message #2" , &tx);
904
+
905
+ WaitForAcks (" topic_A" , TEST_MESSAGE_GROUP_ID);
906
+
907
+ AddConsumer (" topic_A" , {" consumer" });
908
+
909
+ CommitTx (tx, EStatus::SUCCESS);
910
+ }
911
+
874
912
Y_UNIT_TEST_F (WriteToTopic_Demo_1, TFixture)
875
913
{
876
914
CreateTopic (" topic_A" );
0 commit comments