Skip to content

Commit 492d722

Browse files
The TEvProposePartitionConfig message is sent only to the main partitions (#9599) (#9632)
1 parent 45f8e08 commit 492d722

File tree

2 files changed

+55
-14
lines changed

2 files changed

+55
-14
lines changed

ydb/core/persqueue/pq_impl.cpp

+27-8
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,7 @@ class TMonitoringProxy : public TActorBootstrapped<TMonitoringProxy> {
554554
, TabletID(tabletId)
555555
, Inflight(inflight)
556556
{
557-
for (auto& p: Partitions) {
557+
for (auto& p : Partitions) {
558558
Results[p.first].push_back(Sprintf("Partition %u: NO DATA", p.first));
559559
}
560560
}
@@ -691,6 +691,10 @@ void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx)
691691
ClearNewConfig();
692692

693693
for (auto& p : Partitions) { //change config for already created partitions
694+
if (p.first.IsSupportivePartition()) {
695+
continue;
696+
}
697+
694698
ctx.Send(p.second.Actor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config, BootstrapConfigTx ? *BootstrapConfigTx : NKikimrPQ::TBootstrapConfig()));
695699
}
696700
ChangePartitionConfigInflight += Partitions.size();
@@ -1873,13 +1877,19 @@ void TPersQueue::Handle(TEvPersQueue::TEvOffsets::TPtr& ev, const TActorContext&
18731877
}
18741878
ui32 cnt = 0;
18751879
for (auto& p : Partitions) {
1876-
cnt += p.second.InitDone;
1880+
if (p.first.IsSupportivePartition()) {
1881+
continue;
1882+
}
1883+
1884+
cnt += p.second.InitDone;
18771885
}
18781886
TActorId ans = CreateOffsetsProxyActor(TabletID(), ev->Sender, cnt, ctx);
18791887

18801888
for (auto& p : Partitions) {
1881-
if (!p.second.InitDone)
1889+
if (!p.second.InitDone || p.first.IsSupportivePartition()) {
18821890
continue;
1891+
}
1892+
18831893
THolder<TEvPQ::TEvPartitionOffsets> event = MakeHolder<TEvPQ::TEvPartitionOffsets>(ans, ev->Get()->Record.HasClientId() ?
18841894
ev->Get()->Record.GetClientId() : "");
18851895
ctx.Send(p.second.Actor, event.Release());
@@ -1937,15 +1947,20 @@ void TPersQueue::Handle(TEvPersQueue::TEvStatus::TPtr& ev, const TActorContext&
19371947
}
19381948

19391949
ui32 cnt = 0;
1940-
for (auto& [_, partitionInfo] : Partitions) {
1941-
cnt += partitionInfo.InitDone;
1950+
for (auto& [partitionId, partitionInfo] : Partitions) {
1951+
if (partitionId.IsSupportivePartition()) {
1952+
continue;
1953+
}
1954+
1955+
cnt += partitionInfo.InitDone;
19421956
}
19431957

19441958
TActorId ans = CreateStatusProxyActor(TabletID(), ev->Sender, cnt, ev->Cookie, ctx);
19451959
for (auto& p : Partitions) {
1946-
if (!p.second.InitDone) {
1960+
if (!p.second.InitDone || p.first.IsSupportivePartition()) {
19471961
continue;
19481962
}
1963+
19491964
THolder<TEvPQ::TEvPartitionStatus> event;
19501965
if (ev->Get()->Record.GetConsumers().empty()) {
19511966
event = MakeHolder<TEvPQ::TEvPartitionStatus>(ans, ev->Get()->Record.HasClientId() ? ev->Get()->Record.GetClientId() : "",
@@ -4556,7 +4571,11 @@ void TPersQueue::SendProposeTransactionAbort(const TActorId& target,
45564571
void TPersQueue::SendEvProposePartitionConfig(const TActorContext& ctx,
45574572
TDistributedTransaction& tx)
45584573
{
4559-
for (auto& [_, partition] : Partitions) {
4574+
for (auto& [partitionId, partition] : Partitions) {
4575+
if (partitionId.IsSupportivePartition()) {
4576+
continue;
4577+
}
4578+
45604579
auto event = std::make_unique<TEvPQ::TEvProposePartitionConfig>(tx.Step, tx.TxId);
45614580

45624581
event->TopicConverter = tx.TopicConverter;
@@ -4567,7 +4586,7 @@ void TPersQueue::SendEvProposePartitionConfig(const TActorContext& ctx,
45674586
}
45684587

45694588
tx.PartitionRepliesCount = 0;
4570-
tx.PartitionRepliesExpected = Partitions.size();
4589+
tx.PartitionRepliesExpected = OriginalPartitionsCount;
45714590
}
45724591

45734592
TActorId TPersQueue::GetPartitionQuoter(const TPartitionId& partition) {

ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp

+28-6
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ class TFixture : public NUnitTest::TBaseFixture {
6969
std::optional<size_t> maxPartitionCount = std::nullopt);
7070
void DescribeTopic(const TString& path);
7171

72-
void AddConsumer(const TString& topic, const TVector<TString>& consumers);
72+
void AddConsumer(const TString& path,
73+
const TVector<TString>& consumers);
7374

7475
void WriteToTopicWithInvalidTxId(bool invalidTxId);
7576

@@ -205,6 +206,7 @@ void TFixture::SetUp(NUnitTest::TTestContext&)
205206
{
206207
NKikimr::Tests::TServerSettings settings = TTopicSdkTestSetup::MakeServerSettings();
207208
settings.SetEnableTopicServiceTx(true);
209+
settings.SetEnablePQConfigTransactionsAtSchemeShard(true);
208210

209211
Setup = std::make_unique<TTopicSdkTestSetup>(TEST_CASE_NAME, settings);
210212

@@ -352,6 +354,11 @@ void TFixture::CreateTopic(const TString& path,
352354
Setup->CreateTopic(path, consumer, partitionCount, maxPartitionCount);
353355
}
354356

357+
void TFixture::DescribeTopic(const TString& path)
358+
{
359+
Setup->DescribeTopic(path);
360+
}
361+
355362
void TFixture::AddConsumer(const TString& path,
356363
const TVector<TString>& consumers)
357364
{
@@ -366,11 +373,6 @@ void TFixture::AddConsumer(const TString& path,
366373
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
367374
}
368375

369-
void TFixture::DescribeTopic(const TString& path)
370-
{
371-
Setup->DescribeTopic(path);
372-
}
373-
374376
const TDriver& TFixture::GetDriver() const
375377
{
376378
return *Driver;
@@ -2079,6 +2081,26 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_38, TFixture)
20792081
WriteMessagesInTx(0, 1);
20802082
}
20812083

2084+
Y_UNIT_TEST_F(WriteToTopic_Demo_39, TFixture)
2085+
{
2086+
CreateTopic("topic_A", TEST_CONSUMER);
2087+
2088+
NTable::TSession tableSession = CreateTableSession();
2089+
NTable::TTransaction tx = BeginTx(tableSession);
2090+
2091+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx);
2092+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx);
2093+
2094+
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
2095+
2096+
AddConsumer("topic_A", {"consumer"});
2097+
2098+
CommitTx(tx, EStatus::SUCCESS);
2099+
2100+
auto messages = ReadFromTopic("topic_A", "consumer", TDuration::Seconds(2));
2101+
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2);
2102+
}
2103+
20822104
Y_UNIT_TEST_F(ReadRuleGeneration, TFixture)
20832105
{
20842106
// There was a server

0 commit comments

Comments
 (0)