Skip to content

Commit 2148020

Browse files
The TEvProposePartitionConfig message is sent only to the main partitions (#9599)
1 parent 350eccc commit 2148020

File tree

2 files changed

+48
-8
lines changed

2 files changed

+48
-8
lines changed

ydb/core/persqueue/pq_impl.cpp

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,7 @@ class TMonitoringProxy : public TActorBootstrapped<TMonitoringProxy> {
554554
, TabletID(tabletId)
555555
, Inflight(inflight)
556556
{
557-
for (auto& p: Partitions) {
557+
for (auto& p : Partitions) {
558558
Results[p.first].push_back(Sprintf("Partition %u: NO DATA", p.first));
559559
}
560560
}
@@ -691,6 +691,10 @@ void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx)
691691
ClearNewConfig();
692692

693693
for (auto& p : Partitions) { //change config for already created partitions
694+
if (p.first.IsSupportivePartition()) {
695+
continue;
696+
}
697+
694698
ctx.Send(p.second.Actor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config, BootstrapConfigTx ? *BootstrapConfigTx : NKikimrPQ::TBootstrapConfig()));
695699
}
696700
ChangePartitionConfigInflight += Partitions.size();
@@ -1871,13 +1875,19 @@ void TPersQueue::Handle(TEvPersQueue::TEvOffsets::TPtr& ev, const TActorContext&
18711875
}
18721876
ui32 cnt = 0;
18731877
for (auto& p : Partitions) {
1874-
cnt += p.second.InitDone;
1878+
if (p.first.IsSupportivePartition()) {
1879+
continue;
1880+
}
1881+
1882+
cnt += p.second.InitDone;
18751883
}
18761884
TActorId ans = CreateOffsetsProxyActor(TabletID(), ev->Sender, cnt, ctx);
18771885

18781886
for (auto& p : Partitions) {
1879-
if (!p.second.InitDone)
1887+
if (!p.second.InitDone || p.first.IsSupportivePartition()) {
18801888
continue;
1889+
}
1890+
18811891
THolder<TEvPQ::TEvPartitionOffsets> event = MakeHolder<TEvPQ::TEvPartitionOffsets>(ans, ev->Get()->Record.HasClientId() ?
18821892
ev->Get()->Record.GetClientId() : "");
18831893
ctx.Send(p.second.Actor, event.Release());
@@ -1935,15 +1945,20 @@ void TPersQueue::Handle(TEvPersQueue::TEvStatus::TPtr& ev, const TActorContext&
19351945
}
19361946

19371947
ui32 cnt = 0;
1938-
for (auto& [_, partitionInfo] : Partitions) {
1939-
cnt += partitionInfo.InitDone;
1948+
for (auto& [partitionId, partitionInfo] : Partitions) {
1949+
if (partitionId.IsSupportivePartition()) {
1950+
continue;
1951+
}
1952+
1953+
cnt += partitionInfo.InitDone;
19401954
}
19411955

19421956
TActorId ans = CreateStatusProxyActor(TabletID(), ev->Sender, cnt, ev->Cookie, ctx);
19431957
for (auto& p : Partitions) {
1944-
if (!p.second.InitDone) {
1958+
if (!p.second.InitDone || p.first.IsSupportivePartition()) {
19451959
continue;
19461960
}
1961+
19471962
THolder<TEvPQ::TEvPartitionStatus> event;
19481963
if (ev->Get()->Record.GetConsumers().empty()) {
19491964
event = MakeHolder<TEvPQ::TEvPartitionStatus>(ans, ev->Get()->Record.HasClientId() ? ev->Get()->Record.GetClientId() : "",
@@ -4554,7 +4569,11 @@ void TPersQueue::SendProposeTransactionAbort(const TActorId& target,
45544569
void TPersQueue::SendEvProposePartitionConfig(const TActorContext& ctx,
45554570
TDistributedTransaction& tx)
45564571
{
4557-
for (auto& [_, partition] : Partitions) {
4572+
for (auto& [partitionId, partition] : Partitions) {
4573+
if (partitionId.IsSupportivePartition()) {
4574+
continue;
4575+
}
4576+
45584577
auto event = std::make_unique<TEvPQ::TEvProposePartitionConfig>(tx.Step, tx.TxId);
45594578

45604579
event->TopicConverter = tx.TopicConverter;
@@ -4565,7 +4584,7 @@ void TPersQueue::SendEvProposePartitionConfig(const TActorContext& ctx,
45654584
}
45664585

45674586
tx.PartitionRepliesCount = 0;
4568-
tx.PartitionRepliesExpected = Partitions.size();
4587+
tx.PartitionRepliesExpected = OriginalPartitionsCount;
45694588
}
45704589

45714590
TActorId TPersQueue::GetPartitionQuoter(const TPartitionId& partition) {

ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ void TFixture::SetUp(NUnitTest::TTestContext&)
211211
{
212212
NKikimr::Tests::TServerSettings settings = TTopicSdkTestSetup::MakeServerSettings();
213213
settings.SetEnableTopicServiceTx(true);
214+
settings.SetEnablePQConfigTransactionsAtSchemeShard(true);
214215

215216
Setup = std::make_unique<TTopicSdkTestSetup>(TEST_CASE_NAME, settings);
216217

@@ -2097,6 +2098,26 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_38, TFixture)
20972098
WriteMessagesInTx(0, 1);
20982099
}
20992100

2101+
Y_UNIT_TEST_F(WriteToTopic_Demo_39, TFixture)
2102+
{
2103+
CreateTopic("topic_A", TEST_CONSUMER);
2104+
2105+
NTable::TSession tableSession = CreateTableSession();
2106+
NTable::TTransaction tx = BeginTx(tableSession);
2107+
2108+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx);
2109+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx);
2110+
2111+
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
2112+
2113+
AddConsumer("topic_A", {"consumer"});
2114+
2115+
CommitTx(tx, EStatus::SUCCESS);
2116+
2117+
auto messages = ReadFromTopic("topic_A", "consumer", TDuration::Seconds(2));
2118+
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2);
2119+
}
2120+
21002121
Y_UNIT_TEST_F(ReadRuleGeneration, TFixture)
21012122
{
21022123
// There was a server

0 commit comments

Comments
 (0)