diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index efc59001cd23..628d941c2532 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -554,7 +554,7 @@ class TMonitoringProxy : public TActorBootstrapped { , TabletID(tabletId) , Inflight(inflight) { - for (auto& p: Partitions) { + for (auto& p : Partitions) { Results[p.first].push_back(Sprintf("Partition %u: NO DATA", p.first)); } } @@ -691,6 +691,10 @@ void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx) ClearNewConfig(); for (auto& p : Partitions) { //change config for already created partitions + if (p.first.IsSupportivePartition()) { + continue; + } + ctx.Send(p.second.Actor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config, BootstrapConfigTx ? *BootstrapConfigTx : NKikimrPQ::TBootstrapConfig())); } ChangePartitionConfigInflight += Partitions.size(); @@ -1871,13 +1875,19 @@ void TPersQueue::Handle(TEvPersQueue::TEvOffsets::TPtr& ev, const TActorContext& } ui32 cnt = 0; for (auto& p : Partitions) { - cnt += p.second.InitDone; + if (p.first.IsSupportivePartition()) { + continue; + } + + cnt += p.second.InitDone; } TActorId ans = CreateOffsetsProxyActor(TabletID(), ev->Sender, cnt, ctx); for (auto& p : Partitions) { - if (!p.second.InitDone) + if (!p.second.InitDone || p.first.IsSupportivePartition()) { continue; + } + THolder event = MakeHolder(ans, ev->Get()->Record.HasClientId() ? ev->Get()->Record.GetClientId() : ""); ctx.Send(p.second.Actor, event.Release()); @@ -1935,15 +1945,20 @@ void TPersQueue::Handle(TEvPersQueue::TEvStatus::TPtr& ev, const TActorContext& } ui32 cnt = 0; - for (auto& [_, partitionInfo] : Partitions) { - cnt += partitionInfo.InitDone; + for (auto& [partitionId, partitionInfo] : Partitions) { + if (partitionId.IsSupportivePartition()) { + continue; + } + + cnt += partitionInfo.InitDone; } TActorId ans = CreateStatusProxyActor(TabletID(), ev->Sender, cnt, ev->Cookie, ctx); for (auto& p : Partitions) { - if (!p.second.InitDone) { + if (!p.second.InitDone || p.first.IsSupportivePartition()) { continue; } + THolder event; if (ev->Get()->Record.GetConsumers().empty()) { event = MakeHolder(ans, ev->Get()->Record.HasClientId() ? ev->Get()->Record.GetClientId() : "", @@ -4554,7 +4569,11 @@ void TPersQueue::SendProposeTransactionAbort(const TActorId& target, void TPersQueue::SendEvProposePartitionConfig(const TActorContext& ctx, TDistributedTransaction& tx) { - for (auto& [_, partition] : Partitions) { + for (auto& [partitionId, partition] : Partitions) { + if (partitionId.IsSupportivePartition()) { + continue; + } + auto event = std::make_unique(tx.Step, tx.TxId); event->TopicConverter = tx.TopicConverter; @@ -4565,7 +4584,7 @@ void TPersQueue::SendEvProposePartitionConfig(const TActorContext& ctx, } tx.PartitionRepliesCount = 0; - tx.PartitionRepliesExpected = Partitions.size(); + tx.PartitionRepliesExpected = OriginalPartitionsCount; } TActorId TPersQueue::GetPartitionQuoter(const TPartitionId& partition) { diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp index 1ef1db60bdd1..3c36b11a325c 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp @@ -211,6 +211,7 @@ void TFixture::SetUp(NUnitTest::TTestContext&) { NKikimr::Tests::TServerSettings settings = TTopicSdkTestSetup::MakeServerSettings(); settings.SetEnableTopicServiceTx(true); + settings.SetEnablePQConfigTransactionsAtSchemeShard(true); Setup = std::make_unique(TEST_CASE_NAME, settings); @@ -2097,6 +2098,26 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_38, TFixture) WriteMessagesInTx(0, 1); } +Y_UNIT_TEST_F(WriteToTopic_Demo_39, TFixture) +{ + CreateTopic("topic_A", TEST_CONSUMER); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx); + + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); + + AddConsumer("topic_A", {"consumer"}); + + CommitTx(tx, EStatus::SUCCESS); + + auto messages = ReadFromTopic("topic_A", "consumer", TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2); +} + Y_UNIT_TEST_F(ReadRuleGeneration, TFixture) { // There was a server