Skip to content

Commit a155f1c

Browse files
authored
Fix autopartitioning of topics with path that is not root of dat… (#10593)
1 parent cebe29d commit a155f1c

File tree

10 files changed

+102
-15
lines changed

10 files changed

+102
-15
lines changed

ydb/core/persqueue/partition_scale_manager.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,15 @@ namespace NPQ {
88

99
TPartitionScaleManager::TPartitionScaleManager(
1010
const TString& topicName,
11+
const TString& topicPath,
1112
const TString& databasePath,
1213
ui64 pathId,
1314
int version,
1415
const NKikimrPQ::TPQTabletConfig& config,
1516
const TPartitionGraph& partitionGraph
1617
)
1718
: TopicName(topicName)
19+
, TopicPath(topicPath)
1820
, DatabasePath(databasePath)
1921
, BalancerConfig(pathId, version, config)
2022
, PartitionGraph(partitionGraph) {
@@ -47,6 +49,7 @@ void TPartitionScaleManager::TrySendScaleRequest(const TActorContext& ctx) {
4749
<< "send split request");
4850
CurrentScaleRequest = ctx.Register(new TPartitionScaleRequest(
4951
TopicName,
52+
TopicPath,
5053
DatabasePath,
5154
BalancerConfig.PathId,
5255
BalancerConfig.PathVersion,

ydb/core/persqueue/partition_scale_manager.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class TPartitionScaleManager {
4545
};
4646

4747
public:
48-
TPartitionScaleManager(const TString& topicPath, const TString& databasePath, ui64 pathId, int version, const NKikimrPQ::TPQTabletConfig& config, const TPartitionGraph& partitionGraph);
48+
TPartitionScaleManager(const TString& topicName, const TString& topicPath, const TString& databasePath, ui64 pathId, int version, const NKikimrPQ::TPQTabletConfig& config, const TPartitionGraph& partitionGraph);
4949

5050
public:
5151
void HandleScaleStatusChange(const ui32 partition, NKikimrPQ::EScaleStatus scaleStatus, const TActorContext& ctx);
@@ -69,6 +69,7 @@ class TPartitionScaleManager {
6969
static const ui32 MAX_SCALE_REQUEST_REPEAT_SECONDS_TIMEOUT = 1000;
7070

7171
const TString TopicName;
72+
const TString TopicPath;
7273
TString DatabasePath = "";
7374
TActorId CurrentScaleRequest;
7475
TDuration RequestTimeout = TDuration::MilliSeconds(0);

ydb/core/persqueue/partition_scale_request.cpp

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,17 @@ namespace NKikimr {
44
namespace NPQ {
55

66
TPartitionScaleRequest::TPartitionScaleRequest(
7-
TString topicName,
8-
TString databasePath,
7+
const TString& topicName,
8+
const TString& topicPath,
9+
const TString& databasePath,
910
ui64 pathId,
1011
ui64 pathVersion,
11-
std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit> splits,
12-
const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge> merges,
13-
NActors::TActorId parentActorId
12+
const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit>& splits,
13+
const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge>& merges,
14+
const NActors::TActorId& parentActorId
1415
)
1516
: Topic(topicName)
17+
, TopicPath(topicPath)
1618
, DatabasePath(databasePath)
1719
, PathId(pathId)
1820
, PathVersion(pathVersion)
@@ -30,24 +32,27 @@ void TPartitionScaleRequest::Bootstrap(const NActors::TActorContext &ctx) {
3032
void TPartitionScaleRequest::SendProposeRequest(const NActors::TActorContext &ctx) {
3133
auto proposal = std::make_unique<TEvTxUserProxy::TEvProposeTransaction>();
3234
proposal->Record.SetDatabaseName(CanonizePath(DatabasePath));
33-
FillProposeRequest(*proposal, DatabasePath, Topic, ctx);
35+
FillProposeRequest(*proposal, ctx);
3436
ctx.Send(MakeTxProxyID(), proposal.release());
3537
}
3638

37-
void TPartitionScaleRequest::FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const TString& workingDir, const TString& topicName, const NActors::TActorContext &ctx) {
39+
void TPartitionScaleRequest::FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const NActors::TActorContext &ctx) {
40+
auto workingDir = TopicPath.substr(0, TopicPath.size() - Topic.size());
41+
3842
auto& modifyScheme = *proposal.Record.MutableTransaction()->MutableModifyScheme();
3943
modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterPersQueueGroup);
4044
modifyScheme.SetWorkingDir(workingDir);
45+
modifyScheme.SetInternal(true);
4146

4247
auto applyIf = modifyScheme.AddApplyIf();
4348
applyIf->SetPathId(PathId);
4449
applyIf->SetPathVersion(PathVersion == 0 ? 1 : PathVersion);
4550
applyIf->SetCheckEntityVersion(true);
4651

4752
NKikimrSchemeOp::TPersQueueGroupDescription groupDescription;
48-
groupDescription.SetName(topicName);
53+
groupDescription.SetName(Topic);
4954
TStringBuilder logMessage;
50-
logMessage << "TPartitionScaleRequest::FillProposeRequest trying to scale partitions. Spilts: ";
55+
logMessage << "TPartitionScaleRequest::FillProposeRequest trying to scale partitions of '" << workingDir << "/" << Topic << "'. Spilts: ";
5156
for(const auto& split: Splits) {
5257
auto* newSplit = groupDescription.AddSplit();
5358
logMessage << "partition: " << split.GetPartition() << " boundary: '" << split.GetSplitBoundary() << "' ";

ydb/core/persqueue/partition_scale_request.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@ class TPartitionScaleRequest: public NActors::TActorBootstrapped<TPartitionScale
2626
};
2727

2828
public:
29-
TPartitionScaleRequest(TString topicName, TString databasePath, ui64 pathId, ui64 pathVersion, std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit> splits, const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge> merges, NActors::TActorId parentActorId);
29+
TPartitionScaleRequest(const TString& topicName, const TString& topicPath, const TString& databasePath, ui64 pathId, ui64 pathVersion,
30+
const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit>& splits,
31+
const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge>& merges,
32+
const NActors::TActorId& parentActorId);
3033

3134
public:
3235
void Bootstrap(const NActors::TActorContext &ctx);
@@ -48,10 +51,11 @@ class TPartitionScaleRequest: public NActors::TActorBootstrapped<TPartitionScale
4851
}
4952
std::pair<TString, TString> SplitPath(const TString& path);
5053
void SendProposeRequest(const NActors::TActorContext &ctx);
51-
void FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const TString& workingDir, const TString& topicName, const NActors::TActorContext &ctx);
54+
void FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const NActors::TActorContext &ctx);
5255

5356
private:
5457
const TString Topic;
58+
const TString TopicPath;
5559
const TString DatabasePath;
5660
const ui64 PathId;
5761
const ui64 PathVersion;

ydb/core/persqueue/read_balancer.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -537,7 +537,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr
537537

538538
if (SplitMergeEnabled(TabletConfig)) {
539539
if (!PartitionsScaleManager) {
540-
PartitionsScaleManager = std::make_unique<TPartitionScaleManager>(Topic, DatabasePath, PathId, Version, TabletConfig, PartitionGraph);
540+
PartitionsScaleManager = std::make_unique<TPartitionScaleManager>(Topic, Path, DatabasePath, PathId, Version, TabletConfig, PartitionGraph);
541541
} else {
542542
PartitionsScaleManager->UpdateBalancerConfig(PathId, Version, TabletConfig);
543543
}
@@ -1266,16 +1266,20 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvGetReadSessionsInfo::TPtr&
12661266

12671267
void TPersQueueReadBalancer::Handle(TEvPQ::TEvPartitionScaleStatusChanged::TPtr& ev, const TActorContext& ctx) {
12681268
if (!SplitMergeEnabled(TabletConfig)) {
1269+
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "Skip TEvPartitionScaleStatusChanged: autopartitioning disabled.");
12691270
return;
12701271
}
12711272
auto& record = ev->Get()->Record;
12721273
auto* node = PartitionGraph.GetPartition(record.GetPartitionId());
12731274
if (!node) {
1275+
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "Skip TEvPartitionScaleStatusChanged: partition " << record.GetPartitionId() << " not found.");
12741276
return;
12751277
}
12761278

12771279
if (PartitionsScaleManager) {
12781280
PartitionsScaleManager->HandleScaleStatusChange(record.GetPartitionId(), record.GetScaleStatus(), ctx);
1281+
} else {
1282+
LOG_NOTICE_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "Skip TEvPartitionScaleStatusChanged: scale manager isn`t initialized.");
12791283
}
12801284
}
12811285

ydb/core/persqueue/read_balancer__txinit.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ struct TPersQueueReadBalancer::TTxInit : public ITransaction {
6060
Self->PartitionGraph = MakePartitionGraph(Self->TabletConfig);
6161

6262
if (SplitMergeEnabled(Self->TabletConfig)) {
63-
Self->PartitionsScaleManager = std::make_unique<TPartitionScaleManager>(Self->Topic, Self->DatabasePath, Self->PathId, Self->Version, Self->TabletConfig, Self->PartitionGraph);
63+
Self->PartitionsScaleManager = std::make_unique<TPartitionScaleManager>(Self->Topic, Self->Path, Self->DatabasePath, Self->PathId, Self->Version, Self->TabletConfig, Self->PartitionGraph);
6464
}
6565
Self->UpdateConfigCounters();
6666
}

ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ TTopicSdkTestSetup CreateSetup() {
9696
NKikimrConfig::TFeatureFlags ff;
9797
ff.SetEnableTopicSplitMerge(true);
9898
ff.SetEnablePQConfigTransactionsAtSchemeShard(true);
99-
//ff.SetEnableTopicServiceTx(true);
99+
ff.SetEnableTopicServiceTx(true);
100+
ff.SetEnableTopicAutopartitioningForCDC(true);
100101

101102
auto settings = TTopicSdkTestSetup::MakeServerSettings();
102103
settings.SetFeatureFlags(ff);

ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -924,6 +924,68 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
924924
}
925925
}
926926

927+
void ExecuteQuery(NYdb::NTable::TSession& session, const TString& query ) {
928+
const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
929+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
930+
}
931+
932+
Y_UNIT_TEST(CDC_PartitionSplit_AutosplitByLoad) {
933+
TTopicSdkTestSetup setup = CreateSetup();
934+
auto client = setup.MakeClient();
935+
auto tableClient = setup.MakeTableClient();
936+
auto session = tableClient.CreateSession().GetValueSync().GetSession();
937+
938+
ExecuteQuery(session, R"(
939+
--!syntax_v1
940+
CREATE TABLE `/Root/origin` (
941+
id Uint64,
942+
value Text,
943+
PRIMARY KEY (id)
944+
);
945+
)");
946+
947+
ExecuteQuery(session, R"(
948+
--!syntax_v1
949+
ALTER TABLE `/Root/origin`
950+
ADD CHANGEFEED `feed` WITH (
951+
MODE = 'UPDATES',
952+
FORMAT = 'JSON',
953+
TOPIC_AUTO_PARTITIONING = 'ENABLED'
954+
);
955+
)");
956+
957+
{
958+
auto describe = client.DescribeTopic("/Root/origin/feed").GetValueSync();
959+
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 1);
960+
}
961+
962+
ui64 balancerTabletId;
963+
{
964+
auto pathDescr = setup.GetServer().AnnoyingClient->Ls("/Root/origin/feed/streamImpl")->Record.GetPathDescription().GetSelf();
965+
balancerTabletId = pathDescr.GetBalancerTabletID();
966+
Cerr << ">>>>> BalancerTabletID=" << balancerTabletId << Endl << Flush;
967+
UNIT_ASSERT(balancerTabletId);
968+
}
969+
970+
{
971+
const auto edge = setup.GetRuntime().AllocateEdgeActor();
972+
setup.GetRuntime().SendToPipe(balancerTabletId, edge, new TEvPQ::TEvPartitionScaleStatusChanged(0, NKikimrPQ::EScaleStatus::NEED_SPLIT));
973+
}
974+
975+
{
976+
size_t partitionCount = 0;
977+
for (size_t i = 0; i < 10; ++i) {
978+
Sleep(TDuration::Seconds(1));
979+
auto describe = client.DescribeTopic("/Root/origin/feed").GetValueSync();
980+
partitionCount = describe.GetTopicDescription().GetPartitions().size();
981+
if (partitionCount == 3) {
982+
break;
983+
}
984+
}
985+
UNIT_ASSERT_VALUES_EQUAL(partitionCount, 3);
986+
}
987+
}
988+
927989
Y_UNIT_TEST(MidOfRange) {
928990
auto AsString = [](std::vector<ui16> vs) {
929991
TStringBuilder a;

ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,3 +136,9 @@ TTopicClient TTopicSdkTestSetup::MakeClient() const
136136
{
137137
return TTopicClient(MakeDriver());
138138
}
139+
140+
NYdb::NTable::TTableClient TTopicSdkTestSetup::MakeTableClient() const
141+
{
142+
return NYdb::NTable::TTableClient(MakeDriver(), NYdb::NTable::TClientSettings()
143+
.UseQueryCache(false));
144+
}

ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class TTopicSdkTestSetup {
3333
TLog& GetLog();
3434

3535
TTopicClient MakeClient() const;
36+
NYdb::NTable::TTableClient MakeTableClient() const;
3637

3738
TDriver MakeDriver() const;
3839
TDriver MakeDriver(const TDriverConfig& config) const;

0 commit comments

Comments
 (0)