Skip to content

Commit be211bb

Browse files
The consumer's generation number is not stored in the transaction (#9590) (#9619)
1 parent 3c1c2f7 commit be211bb

File tree

3 files changed

+152
-22
lines changed

3 files changed

+152
-22
lines changed

ydb/core/persqueue/pq_impl.cpp

+34-22
Original file line numberDiff line numberDiff line change
@@ -715,6 +715,8 @@ void TPersQueue::ApplyNewConfig(const NKikimrPQ::TPQTabletConfig& newConfig,
715715
{
716716
Config = newConfig;
717717

718+
PQ_LOG_D("Apply new config " << Config.ShortDebugString());
719+
718720
ui32 cacheSize = CACHE_SIZE;
719721
if (Config.HasCacheSize()) {
720722
cacheSize = Config.GetCacheSize();
@@ -1630,6 +1632,32 @@ void TPersQueue::CreateTopicConverter(const NKikimrPQ::TPQTabletConfig& config,
16301632
Y_ABORT_UNLESS(topicConverter->IsValid(), "%s", topicConverter->GetReason().c_str());
16311633
}
16321634

1635+
void TPersQueue::UpdateReadRuleGenerations(NKikimrPQ::TPQTabletConfig& cfg) const
1636+
{
1637+
Y_ABORT_UNLESS(cfg.HasVersion());
1638+
const int curConfigVersion = cfg.GetVersion();
1639+
1640+
// set rr generation for provided read rules
1641+
THashMap<TString, std::pair<ui64, ui64>> existed; // map name -> rrVersion, rrGeneration
1642+
for (const auto& c : Config.GetConsumers()) {
1643+
existed[c.GetName()] = std::make_pair(c.GetVersion(), c.GetGeneration());
1644+
}
1645+
1646+
for (auto& c : *cfg.MutableConsumers()) {
1647+
auto it = existed.find(c.GetName());
1648+
ui64 generation = 0;
1649+
if (it != existed.end() && it->second.first == c.GetVersion()) {
1650+
generation = it->second.second;
1651+
} else {
1652+
generation = curConfigVersion;
1653+
}
1654+
c.SetGeneration(generation);
1655+
if (ReadRuleCompatible()) {
1656+
cfg.AddReadRuleGenerations(generation);
1657+
}
1658+
}
1659+
}
1660+
16331661
void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConfig> ev, const TActorId& sender, const TActorContext& ctx)
16341662
{
16351663
const auto& record = ev->GetRecord();
@@ -1642,7 +1670,7 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConf
16421670
NKikimrPQ::TPQTabletConfig cfg = record.GetTabletConfig();
16431671

16441672
Y_ABORT_UNLESS(cfg.HasVersion());
1645-
int curConfigVersion = cfg.GetVersion();
1673+
const int curConfigVersion = cfg.GetVersion();
16461674

16471675
if (curConfigVersion == oldConfigVersion) { //already applied
16481676
LOG_INFO_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID()
@@ -1741,27 +1769,7 @@ void TPersQueue::ProcessUpdateConfigRequest(TAutoPtr<TEvPersQueue::TEvUpdateConf
17411769

17421770
Migrate(cfg);
17431771

1744-
// set rr generation for provided read rules
1745-
{
1746-
THashMap<TString, std::pair<ui64, ui64>> existed; // map name -> rrVersion, rrGeneration
1747-
for (const auto& c : Config.GetConsumers()) {
1748-
existed[c.GetName()] = std::make_pair(c.GetVersion(), c.GetGeneration());
1749-
}
1750-
1751-
for (auto& c : *cfg.MutableConsumers()) {
1752-
auto it = existed.find(c.GetName());
1753-
ui64 generation = 0;
1754-
if (it != existed.end() && it->second.first == c.GetVersion()) {
1755-
generation = it->second.second;
1756-
} else {
1757-
generation = curConfigVersion;
1758-
}
1759-
c.SetGeneration(generation);
1760-
if (ReadRuleCompatible()) {
1761-
cfg.AddReadRuleGenerations(generation);
1762-
}
1763-
}
1764-
}
1772+
UpdateReadRuleGenerations(cfg);
17651773

17661774
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID()
17671775
<< " Config update version " << cfg.GetVersion() << "(current " << Config.GetVersion() << ") received from actor " << sender
@@ -3727,6 +3735,10 @@ void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx)
37273735
tx.OnProposeTransaction(event, GetAllowedStep(),
37283736
TabletID());
37293737

3738+
if (tx.Kind == NKikimrPQ::TTransaction::KIND_CONFIG) {
3739+
UpdateReadRuleGenerations(tx.TabletConfig);
3740+
}
3741+
37303742
if (tx.WriteId.Defined()) {
37313743
const TWriteId& writeId = *tx.WriteId;
37323744
Y_ABORT_UNLESS(TxWrites.contains(writeId),

ydb/core/persqueue/pq_impl.h

+2
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,8 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
530530

531531
bool AllSupportivePartitionsHaveBeenDeleted(const TMaybe<TWriteId>& writeId) const;
532532
void DeleteWriteId(const TMaybe<TWriteId>& writeId);
533+
534+
void UpdateReadRuleGenerations(NKikimrPQ::TPQTabletConfig& cfg) const;
533535
};
534536

535537

ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp

+116
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
44
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
55
#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/ut/ut_utils/ut_utils.h>
6+
#include <ydb/core/cms/console/console.h>
67
#include <ydb/core/keyvalue/keyvalue_events.h>
78
#include <ydb/core/persqueue/key.h>
89
#include <ydb/core/persqueue/blob.h>
@@ -37,8 +38,14 @@ class TFixture : public NUnitTest::TBaseFixture {
3738
void Write(const TString& message, NTable::TTransaction* tx = nullptr);
3839
};
3940

41+
struct TFeatureFlags {
42+
bool EnablePQConfigTransactionsAtSchemeShard = true;
43+
};
44+
4045
void SetUp(NUnitTest::TTestContext&) override;
4146

47+
void NotifySchemeShard(const TFeatureFlags& flags);
48+
4249
NTable::TSession CreateTableSession();
4350
NTable::TTransaction BeginTx(NTable::TSession& session);
4451
void CommitTx(NTable::TTransaction& tx, EStatus status = EStatus::SUCCESS);
@@ -62,6 +69,8 @@ class TFixture : public NUnitTest::TBaseFixture {
6269
std::optional<size_t> maxPartitionCount = std::nullopt);
6370
void DescribeTopic(const TString& path);
6471

72+
void AddConsumer(const TString& topic, const TVector<TString>& consumers);
73+
6574
void WriteToTopicWithInvalidTxId(bool invalidTxId);
6675

6776
TTopicWriteSessionPtr CreateTopicWriteSession(const TString& topicPath,
@@ -95,6 +104,8 @@ class TFixture : public NUnitTest::TBaseFixture {
95104
NYdb::EStatus status);
96105
void CloseTopicWriteSession(const TString& topicPath,
97106
const TString& messageGroupId);
107+
void CloseTopicReadSession(const TString& topicPath,
108+
const TString& consumerName);
98109

99110
enum EEndOfTransaction {
100111
Commit,
@@ -175,6 +186,8 @@ class TFixture : public NUnitTest::TBaseFixture {
175186
ui64 tabletId,
176187
const NPQ::TWriteId& writeId);
177188

189+
ui64 GetSchemeShardTabletId(const TActorId& actorId);
190+
178191
std::unique_ptr<TTopicSdkTestSetup> Setup;
179192
std::unique_ptr<TDriver> Driver;
180193

@@ -192,11 +205,27 @@ void TFixture::SetUp(NUnitTest::TTestContext&)
192205
{
193206
NKikimr::Tests::TServerSettings settings = TTopicSdkTestSetup::MakeServerSettings();
194207
settings.SetEnableTopicServiceTx(true);
208+
195209
Setup = std::make_unique<TTopicSdkTestSetup>(TEST_CASE_NAME, settings);
196210

197211
Driver = std::make_unique<TDriver>(Setup->MakeDriver());
198212
}
199213

214+
void TFixture::NotifySchemeShard(const TFeatureFlags& flags)
215+
{
216+
auto request = std::make_unique<NConsole::TEvConsole::TEvConfigNotificationRequest>();
217+
*request->Record.MutableConfig() = *Setup->GetServer().ServerSettings.AppConfig;
218+
request->Record.MutableConfig()->MutableFeatureFlags()->SetEnablePQConfigTransactionsAtSchemeShard(flags.EnablePQConfigTransactionsAtSchemeShard);
219+
220+
auto& runtime = Setup->GetRuntime();
221+
auto actorId = runtime.AllocateEdgeActor();
222+
223+
ui64 ssId = GetSchemeShardTabletId(actorId);
224+
225+
runtime.SendToPipe(ssId, actorId, request.release());
226+
runtime.GrabEdgeEvent<NConsole::TEvConsole::TEvConfigNotificationResponse>();
227+
}
228+
200229
NTable::TSession TFixture::CreateTableSession()
201230
{
202231
NTable::TTableClient client(GetDriver());
@@ -323,6 +352,20 @@ void TFixture::CreateTopic(const TString& path,
323352
Setup->CreateTopic(path, consumer, partitionCount, maxPartitionCount);
324353
}
325354

355+
void TFixture::AddConsumer(const TString& path,
356+
const TVector<TString>& consumers)
357+
{
358+
NTopic::TTopicClient client(GetDriver());
359+
NTopic::TAlterTopicSettings settings;
360+
361+
for (const auto& consumer : consumers) {
362+
settings.BeginAddConsumer(consumer);
363+
}
364+
365+
auto result = client.AlterTopic(path, settings).GetValueSync();
366+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
367+
}
368+
326369
void TFixture::DescribeTopic(const TString& path)
327370
{
328371
Setup->DescribeTopic(path);
@@ -645,6 +688,13 @@ void TFixture::CloseTopicWriteSession(const TString& topicPath,
645688
TopicWriteSessions.erase(key);
646689
}
647690

691+
void TFixture::CloseTopicReadSession(const TString& topicPath,
692+
const TString& consumerName)
693+
{
694+
Y_UNUSED(consumerName);
695+
TopicReadSessions.erase(topicPath);
696+
}
697+
648698
void TFixture::WriteToTopic(const TString& topicPath,
649699
const TString& messageGroupId,
650700
const TString& message,
@@ -763,6 +813,37 @@ void TFixture::WaitForSessionClose(const TString& topicPath,
763813
UNIT_ASSERT(context.AckCount <= context.WriteCount);
764814
}
765815

816+
ui64 TFixture::GetSchemeShardTabletId(const TActorId& actorId)
817+
{
818+
auto navigate = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
819+
navigate->DatabaseName = "/Root";
820+
821+
NSchemeCache::TSchemeCacheNavigate::TEntry entry;
822+
entry.Path = SplitPath("/Root");
823+
entry.SyncVersion = true;
824+
entry.ShowPrivatePath = true;
825+
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList;
826+
827+
navigate->ResultSet.push_back(std::move(entry));
828+
//navigate->UserToken = "root@builtin";
829+
navigate->Cookie = 12345;
830+
831+
auto& runtime = Setup->GetRuntime();
832+
833+
runtime.Send(MakeSchemeCacheID(), actorId,
834+
new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release()),
835+
0,
836+
true);
837+
auto response = runtime.GrabEdgeEvent<TEvTxProxySchemeCache::TEvNavigateKeySetResult>();
838+
839+
UNIT_ASSERT_VALUES_EQUAL(response->Request->Cookie, 12345);
840+
UNIT_ASSERT_VALUES_EQUAL(response->Request->ErrorCount, 0);
841+
842+
auto& front = response->Request->ResultSet.front();
843+
844+
return front.Self->Info.GetSchemeshardId();
845+
}
846+
766847
ui64 TFixture::GetTopicTabletId(const TActorId& actorId, const TString& topicPath, ui32 partition)
767848
{
768849
auto navigate = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
@@ -1998,6 +2079,41 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_38, TFixture)
19982079
WriteMessagesInTx(0, 1);
19992080
}
20002081

2082+
Y_UNIT_TEST_F(ReadRuleGeneration, TFixture)
2083+
{
2084+
// There was a server
2085+
NotifySchemeShard({.EnablePQConfigTransactionsAtSchemeShard = false});
2086+
2087+
// Users have created their own topic on it
2088+
CreateTopic(TEST_TOPIC);
2089+
2090+
// And they wrote their messages into it
2091+
WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-1");
2092+
WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-2");
2093+
WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-3");
2094+
2095+
// And he had a consumer
2096+
AddConsumer(TEST_TOPIC, {"consumer-1"});
2097+
2098+
// We read messages from the topic and committed offsets
2099+
auto messages = ReadFromTopic(TEST_TOPIC, "consumer-1", TDuration::Seconds(2));
2100+
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 3);
2101+
CloseTopicReadSession(TEST_TOPIC, "consumer-1");
2102+
2103+
// And then the Logbroker team turned on the feature flag
2104+
NotifySchemeShard({.EnablePQConfigTransactionsAtSchemeShard = true});
2105+
2106+
// Users continued to write to the topic
2107+
WriteToTopic(TEST_TOPIC, TEST_MESSAGE_GROUP_ID, "message-4");
2108+
2109+
// Users have added new consumers
2110+
AddConsumer(TEST_TOPIC, {"consumer-2"});
2111+
2112+
// And they wanted to continue reading their messages
2113+
messages = ReadFromTopic(TEST_TOPIC, "consumer-1", TDuration::Seconds(2));
2114+
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1);
2115+
}
2116+
20012117
}
20022118

20032119
}

0 commit comments

Comments
 (0)