Skip to content

Commit d317ae4

Browse files
The consumer's generation number is not stored in the transaction (#9590)
1 parent 0274cfd commit d317ae4

File tree

3 files changed

+150
-20
lines changed

3 files changed

+150
-20
lines changed

ydb/core/persqueue/pq_impl.cpp

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -715,6 +715,8 @@ void TPersQueue::ApplyNewConfig(const NKikimrPQ::TPQTabletConfig& newConfig,
715715
{
716716
Config = newConfig;
717717

718+
PQ_LOG_D("Apply new config " << Config.ShortDebugString());
719+
718720
ui32 cacheSize = CACHE_SIZE;
719721
if (Config.HasCacheSize()) {
720722
cacheSize = Config.GetCacheSize();
@@ -1630,6 +1632,30 @@ void TPersQueue::CreateTopicConverter(const NKikimrPQ::TPQTabletConfig& config,
16301632
Y_ABORT_UNLESS(topicConverter->IsValid(), "%s", topicConverter->GetReason().c_str());
16311633
}
16321634

1635+
void TPersQueue::UpdateReadRuleGenerations(NKikimrPQ::TPQTabletConfig& cfg) const
1636+
{
1637+
Y_ABORT_UNLESS(cfg.HasVersion());
1638+
const int curConfigVersion = cfg.GetVersion();
1639+
1640+
// set rr generation for provided read rules
1641+
THashMap<TString, std::pair<ui64, ui64>> existed; // map name -> rrVersion, rrGeneration
1642+
for (const auto& c : Config.GetConsumers()) {
1643+
existed[c.GetName()] = std::make_pair(c.GetVersion(), c.GetGeneration());
1644+
}
1645+
1646+
for (auto& c : *cfg.MutableConsumers()) {
1647+
auto it = existed.find(c.GetName());
1648+
ui64 generation = 0;
1649+
if (it != existed.end() && it->second.first == c.GetVersion()) {
1650+
generation = it->second.second;
1651+
} else {
1652+
generation = curConfigVersion;
1653+
}
1654+
c.SetGeneration(generation);
1655+
cfg.AddReadRuleGenerations(generation);
1656+
}
1657+
}
1658+
16331659
void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConfig> ev, const TActorId& sender, const TActorContext& ctx)
16341660
{
16351661
const auto& record = ev->GetRecord();
@@ -1642,7 +1668,7 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConf
16421668
NKikimrPQ::TPQTabletConfig cfg = record.GetTabletConfig();
16431669

16441670
Y_ABORT_UNLESS(cfg.HasVersion());
1645-
int curConfigVersion = cfg.GetVersion();
1671+
const int curConfigVersion = cfg.GetVersion();
16461672

16471673
if (curConfigVersion == oldConfigVersion) { //already applied
16481674
LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID()
@@ -1741,25 +1767,7 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConf
17411767

17421768
Migrate(cfg);
17431769

1744-
// set rr generation for provided read rules
1745-
{
1746-
THashMap<TString, std::pair<ui64, ui64>> existed; // map name -> rrVersion, rrGeneration
1747-
for (const auto& c : Config.GetConsumers()) {
1748-
existed[c.GetName()] = std::make_pair(c.GetVersion(), c.GetGeneration());
1749-
}
1750-
1751-
for (auto& c : *cfg.MutableConsumers()) {
1752-
auto it = existed.find(c.GetName());
1753-
ui64 generation = 0;
1754-
if (it != existed.end() && it->second.first == c.GetVersion()) {
1755-
generation = it->second.second;
1756-
} else {
1757-
generation = curConfigVersion;
1758-
}
1759-
c.SetGeneration(generation);
1760-
cfg.AddReadRuleGenerations(generation);
1761-
}
1762-
}
1770+
UpdateReadRuleGenerations(cfg);
17631771

17641772
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID()
17651773
<< " Config update version " << cfg.GetVersion() << "(current " << Config.GetVersion() << ") received from actor " << sender
@@ -3725,6 +3733,10 @@ void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx)
37253733
tx.OnProposeTransaction(event, GetAllowedStep(),
37263734
TabletID());
37273735

3736+
if (tx.Kind == NKikimrPQ::TTransaction::KIND_CONFIG) {
3737+
UpdateReadRuleGenerations(tx.TabletConfig);
3738+
}
3739+
37283740
if (tx.WriteId.Defined()) {
37293741
const TWriteId& writeId = *tx.WriteId;
37303742
Y_ABORT_UNLESS(TxWrites.contains(writeId),

ydb/core/persqueue/pq_impl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,8 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
530530

531531
bool AllSupportivePartitionsHaveBeenDeleted(const TMaybe<TWriteId>& writeId) const;
532532
void DeleteWriteId(const TMaybe<TWriteId>& writeId);
533+
534+
void UpdateReadRuleGenerations(NKikimrPQ::TPQTabletConfig& cfg) const;
533535
};
534536

535537

ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
44
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
55
#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/ut/ut_utils/ut_utils.h>
6+
#include <ydb/core/cms/console/console.h>
67
#include <ydb/core/keyvalue/keyvalue_events.h>
78
#include <ydb/core/persqueue/key.h>
89
#include <ydb/core/persqueue/blob.h>
@@ -42,8 +43,14 @@ class TFixture : public NUnitTest::TBaseFixture {
4243
void WaitForEvent();
4344
};
4445

46+
struct TFeatureFlags {
47+
bool EnablePQConfigTransactionsAtSchemeShard = true;
48+
};
49+
4550
void SetUp(NUnitTest::TTestContext&) override;
4651

52+
void NotifySchemeShard(const TFeatureFlags& flags);
53+
4754
NTable::TSession CreateTableSession();
4855
NTable::TTransaction BeginTx(NTable::TSession& session);
4956
void CommitTx(NTable::TTransaction& tx, EStatus status = EStatus::SUCCESS);
@@ -67,6 +74,8 @@ class TFixture : public NUnitTest::TBaseFixture {
6774
std::optional<size_t> maxPartitionCount = std::nullopt);
6875
void DescribeTopic(const TString& path);
6976

77+
void AddConsumer(const TString& topic, const TVector<TString>& consumers);
78+
7079
void WriteToTopicWithInvalidTxId(bool invalidTxId);
7180

7281
TTopicWriteSessionPtr CreateTopicWriteSession(const TString& topicPath,
@@ -101,6 +110,8 @@ class TFixture : public NUnitTest::TBaseFixture {
101110
NYdb::EStatus status);
102111
void CloseTopicWriteSession(const TString& topicPath,
103112
const TString& messageGroupId);
113+
void CloseTopicReadSession(const TString& topicPath,
114+
const TString& consumerName);
104115

105116
enum EEndOfTransaction {
106117
Commit,
@@ -181,6 +192,8 @@ class TFixture : public NUnitTest::TBaseFixture {
181192
ui64 tabletId,
182193
const NPQ::TWriteId& writeId);
183194

195+
ui64 GetSchemeShardTabletId(const TActorId& actorId);
196+
184197
std::unique_ptr<TTopicSdkTestSetup> Setup;
185198
std::unique_ptr<TDriver> Driver;
186199

@@ -198,11 +211,27 @@ void TFixture::SetUp(NUnitTest::TTestContext&)
198211
{
199212
NKikimr::Tests::TServerSettings settings = TTopicSdkTestSetup::MakeServerSettings();
200213
settings.SetEnableTopicServiceTx(true);
214+
201215
Setup = std::make_unique<TTopicSdkTestSetup>(TEST_CASE_NAME, settings);
202216

203217
Driver = std::make_unique<TDriver>(Setup->MakeDriver());
204218
}
205219

220+
void TFixture::NotifySchemeShard(const TFeatureFlags& flags)
221+
{
222+
auto request = std::make_unique<NConsole::TEvConsole::TEvConfigNotificationRequest>();
223+
*request->Record.MutableConfig() = *Setup->GetServer().ServerSettings.AppConfig;
224+
request->Record.MutableConfig()->MutableFeatureFlags()->SetEnablePQConfigTransactionsAtSchemeShard(flags.EnablePQConfigTransactionsAtSchemeShard);
225+
226+
auto& runtime = Setup->GetRuntime();
227+
auto actorId = runtime.AllocateEdgeActor();
228+
229+
ui64 ssId = GetSchemeShardTabletId(actorId);
230+
231+
runtime.SendToPipe(ssId, actorId, request.release());
232+
runtime.GrabEdgeEvent<NConsole::TEvConsole::TEvConfigNotificationResponse>();
233+
}
234+
206235
NTable::TSession TFixture::CreateTableSession()
207236
{
208237
NTable::TTableClient client(GetDriver());
@@ -329,6 +358,20 @@ void TFixture::CreateTopic(const TString& path,
329358
Setup->CreateTopic(path, consumer, partitionCount, maxPartitionCount);
330359
}
331360

361+
void TFixture::AddConsumer(const TString& path,
362+
const TVector<TString>& consumers)
363+
{
364+
NTopic::TTopicClient client(GetDriver());
365+
NTopic::TAlterTopicSettings settings;
366+
367+
for (const auto& consumer : consumers) {
368+
settings.BeginAddConsumer(consumer);
369+
}
370+
371+
auto result = client.AlterTopic(path, settings).GetValueSync();
372+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
373+
}
374+
332375
void TFixture::DescribeTopic(const TString& path)
333376
{
334377
Setup->DescribeTopic(path);
@@ -663,6 +706,13 @@ void TFixture::CloseTopicWriteSession(const TString& topicPath,
663706
TopicWriteSessions.erase(key);
664707
}
665708

709+
void TFixture::CloseTopicReadSession(const TString& topicPath,
710+
const TString& consumerName)
711+
{
712+
Y_UNUSED(consumerName);
713+
TopicReadSessions.erase(topicPath);
714+
}
715+
666716
void TFixture::WriteToTopic(const TString& topicPath,
667717
const TString& messageGroupId,
668718
const TString& message,
@@ -779,6 +829,37 @@ void TFixture::WaitForSessionClose(const TString& topicPath,
779829
UNIT_ASSERT(context.AckCount() <= context.WriteCount);
780830
}
781831

832+
ui64 TFixture::GetSchemeShardTabletId(const TActorId& actorId)
833+
{
834+
auto navigate = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
835+
navigate->DatabaseName = "/Root";
836+
837+
NSchemeCache::TSchemeCacheNavigate::TEntry entry;
838+
entry.Path = SplitPath("/Root");
839+
entry.SyncVersion = true;
840+
entry.ShowPrivatePath = true;
841+
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList;
842+
843+
navigate->ResultSet.push_back(std::move(entry));
844+
//navigate->UserToken = "root@builtin";
845+
navigate->Cookie = 12345;
846+
847+
auto& runtime = Setup->GetRuntime();
848+
849+
runtime.Send(MakeSchemeCacheID(), actorId,
850+
new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release()),
851+
0,
852+
true);
853+
auto response = runtime.GrabEdgeEvent<TEvTxProxySchemeCache::TEvNavigateKeySetResult>();
854+
855+
UNIT_ASSERT_VALUES_EQUAL(response->Request->Cookie, 12345);
856+
UNIT_ASSERT_VALUES_EQUAL(response->Request->ErrorCount, 0);
857+
858+
auto& front = response->Request->ResultSet.front();
859+
860+
return front.Self->Info.GetSchemeshardId();
861+
}
862+
782863
ui64 TFixture::GetTopicTabletId(const TActorId& actorId, const TString& topicPath, ui32 partition)
783864
{
784865
auto navigate = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
@@ -2016,6 +2097,41 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_38, TFixture)
20162097
WriteMessagesInTx(0, 1);
20172098
}
20182099

2100+
Y_UNIT_TEST_F(ReadRuleGeneration, TFixture)
2101+
{
2102+
// There was a server
2103+
NotifySchemeShard({.EnablePQConfigTransactionsAtSchemeShard = false});
2104+
2105+
// Users have created their own topic on it
2106+
CreateTopic(TEST_TOPIC);
2107+
2108+
// And they wrote their messages into it
2109+
WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-1");
2110+
WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-2");
2111+
WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-3");
2112+
2113+
// And he had a consumer
2114+
AddConsumer(TEST_TOPIC, {"consumer-1"});
2115+
2116+
// We read messages from the topic and committed offsets
2117+
auto messages = ReadFromTopic(TEST_TOPIC, "consumer-1", TDuration::Seconds(2));
2118+
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 3);
2119+
CloseTopicReadSession(TEST_TOPIC, "consumer-1");
2120+
2121+
// And then the Logbroker team turned on the feature flag
2122+
NotifySchemeShard({.EnablePQConfigTransactionsAtSchemeShard = true});
2123+
2124+
// Users continued to write to the topic
2125+
WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-4");
2126+
2127+
// Users have added new consumers
2128+
AddConsumer(TEST_TOPIC, {"consumer-2"});
2129+
2130+
// And they wanted to continue reading their messages
2131+
messages = ReadFromTopic(TEST_TOPIC, "consumer-1", TDuration::Seconds(2));
2132+
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1);
2133+
}
2134+
20192135
}
20202136

20212137
}

0 commit comments

Comments
 (0)