Skip to content

Commit eecf45c

Browse files
The TEvProposePartitionConfig message is sent only to the main partitions (ydb-platform#9599)
1 parent 16c4b26 commit eecf45c

File tree

2 files changed

+66
-8
lines changed

2 files changed

+66
-8
lines changed

ydb/core/persqueue/pq_impl.cpp

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,7 @@ class TMonitoringProxy : public TActorBootstrapped<TMonitoringProxy> {
554554
, TabletID(tabletId)
555555
, Inflight(inflight)
556556
{
557-
for (auto& p: Partitions) {
557+
for (auto& p : Partitions) {
558558
Results[p.first].push_back(Sprintf("Partition %u: NO DATA", p.first));
559559
}
560560
}
@@ -691,6 +691,10 @@ void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx)
691691
ClearNewConfig();
692692

693693
for (auto& p : Partitions) { //change config for already created partitions
694+
if (p.first.IsSupportivePartition()) {
695+
continue;
696+
}
697+
694698
ctx.Send(p.second.Actor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config, BootstrapConfigTx ? *BootstrapConfigTx : NKikimrPQ::TBootstrapConfig()));
695699
}
696700
ChangePartitionConfigInflight += Partitions.size();
@@ -1865,13 +1869,19 @@ void TPersQueue::Handle(TEvPersQueue::TEvOffsets::TPtr& ev, const TActorContext&
18651869
}
18661870
ui32 cnt = 0;
18671871
for (auto& p : Partitions) {
1868-
cnt += p.second.InitDone;
1872+
if (p.first.IsSupportivePartition()) {
1873+
continue;
1874+
}
1875+
1876+
cnt += p.second.InitDone;
18691877
}
18701878
TActorId ans = CreateOffsetsProxyActor(TabletID(), ev->Sender, cnt, ctx);
18711879

18721880
for (auto& p : Partitions) {
1873-
if (!p.second.InitDone)
1881+
if (!p.second.InitDone || p.first.IsSupportivePartition()) {
18741882
continue;
1883+
}
1884+
18751885
THolder<TEvPQ::TEvPartitionOffsets> event = MakeHolder<TEvPQ::TEvPartitionOffsets>(ans, ev->Get()->Record.HasClientId() ?
18761886
ev->Get()->Record.GetClientId() : "");
18771887
ctx.Send(p.second.Actor, event.Release());
@@ -1929,15 +1939,20 @@ void TPersQueue::Handle(TEvPersQueue::TEvStatus::TPtr& ev, const TActorContext&
19291939
}
19301940

19311941
ui32 cnt = 0;
1932-
for (auto& [_, partitionInfo] : Partitions) {
1933-
cnt += partitionInfo.InitDone;
1942+
for (auto& [partitionId, partitionInfo] : Partitions) {
1943+
if (partitionId.IsSupportivePartition()) {
1944+
continue;
1945+
}
1946+
1947+
cnt += partitionInfo.InitDone;
19341948
}
19351949

19361950
TActorId ans = CreateStatusProxyActor(TabletID(), ev->Sender, cnt, ev->Cookie, ctx);
19371951
for (auto& p : Partitions) {
1938-
if (!p.second.InitDone) {
1952+
if (!p.second.InitDone || p.first.IsSupportivePartition()) {
19391953
continue;
19401954
}
1955+
19411956
THolder<TEvPQ::TEvPartitionStatus> event;
19421957
if (ev->Get()->Record.GetConsumers().empty()) {
19431958
event = MakeHolder<TEvPQ::TEvPartitionStatus>(ans, ev->Get()->Record.HasClientId() ? ev->Get()->Record.GetClientId() : "",
@@ -4544,7 +4559,11 @@ void TPersQueue::SendProposeTransactionAbort(const TActorId& target,
45444559
void TPersQueue::SendEvProposePartitionConfig(const TActorContext& ctx,
45454560
TDistributedTransaction& tx)
45464561
{
4547-
for (auto& [_, partition] : Partitions) {
4562+
for (auto& [partitionId, partition] : Partitions) {
4563+
if (partitionId.IsSupportivePartition()) {
4564+
continue;
4565+
}
4566+
45484567
auto event = std::make_unique<TEvPQ::TEvProposePartitionConfig>(tx.Step, tx.TxId);
45494568

45504569
event->TopicConverter = tx.TopicConverter;
@@ -4555,7 +4574,7 @@ void TPersQueue::SendEvProposePartitionConfig(const TActorContext& ctx,
45554574
}
45564575

45574576
tx.PartitionRepliesCount = 0;
4558-
tx.PartitionRepliesExpected = Partitions.size();
4577+
tx.PartitionRepliesExpected = OriginalPartitionsCount;
45594578
}
45604579

45614580
TActorId TPersQueue::GetPartitionQuoter(const TPartitionId& partition) {

ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ class TFixture : public NUnitTest::TBaseFixture {
6262
std::optional<size_t> maxPartitionCount = std::nullopt);
6363
void DescribeTopic(const TString& path);
6464

65+
void AddConsumer(const TString& path,
66+
const TVector<TString>& consumers);
67+
6568
void WriteToTopicWithInvalidTxId(bool invalidTxId);
6669

6770
TTopicWriteSessionPtr CreateTopicWriteSession(const TString& topicPath,
@@ -192,6 +195,8 @@ void TFixture::SetUp(NUnitTest::TTestContext&)
192195
{
193196
NKikimr::Tests::TServerSettings settings = TTopicSdkTestSetup::MakeServerSettings();
194197
settings.SetEnableTopicServiceTx(true);
198+
settings.SetEnablePQConfigTransactionsAtSchemeShard(true);
199+
195200
Setup = std::make_unique<TTopicSdkTestSetup>(TEST_CASE_NAME, settings);
196201

197202
Driver = std::make_unique<TDriver>(Setup->MakeDriver());
@@ -328,6 +333,20 @@ void TFixture::DescribeTopic(const TString& path)
328333
Setup->DescribeTopic(path);
329334
}
330335

336+
void TFixture::AddConsumer(const TString& path,
337+
const TVector<TString>& consumers)
338+
{
339+
NTopic::TTopicClient client(GetDriver());
340+
NTopic::TAlterTopicSettings settings;
341+
342+
for (const auto& consumer : consumers) {
343+
settings.BeginAddConsumer(consumer);
344+
}
345+
346+
auto result = client.AlterTopic(path, settings).GetValueSync();
347+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
348+
}
349+
331350
const TDriver& TFixture::GetDriver() const
332351
{
333352
return *Driver;
@@ -1998,6 +2017,26 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_38, TFixture)
19982017
WriteMessagesInTx(0, 1);
19992018
}
20002019

2020+
Y_UNIT_TEST_F(WriteToTopic_Demo_39, TFixture)
2021+
{
2022+
CreateTopic("topic_A", TEST_CONSUMER);
2023+
2024+
NTable::TSession tableSession = CreateTableSession();
2025+
NTable::TTransaction tx = BeginTx(tableSession);
2026+
2027+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx);
2028+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx);
2029+
2030+
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
2031+
2032+
AddConsumer("topic_A", {"consumer"});
2033+
2034+
CommitTx(tx, EStatus::SUCCESS);
2035+
2036+
auto messages = ReadFromTopic("topic_A", "consumer", TDuration::Seconds(2));
2037+
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2);
2038+
}
2039+
20012040
}
20022041

20032042
}

0 commit comments

Comments
 (0)