Skip to content

Commit c7ba91f

Browse files
committed
Fix autopartitioning of topics with path that is not root of database (ydb-platform#10609)
1 parent 41ca9ba commit c7ba91f

10 files changed

+93
-15
lines changed

ydb/core/persqueue/partition_scale_manager.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@ namespace NPQ {
88

99
TPartitionScaleManager::TPartitionScaleManager(
1010
const TString& topicName,
11+
const TString& topicPath,
1112
const TString& databasePath,
1213
ui64 pathId,
1314
int version,
1415
const NKikimrPQ::TPQTabletConfig& config
1516
)
1617
: TopicName(topicName)
18+
, TopicPath(topicPath)
1719
, DatabasePath(databasePath)
1820
, BalancerConfig(pathId, version, config) {
1921
}
@@ -45,6 +47,7 @@ void TPartitionScaleManager::TrySendScaleRequest(const TActorContext& ctx) {
4547
<< "send split request");
4648
CurrentScaleRequest = ctx.Register(new TPartitionScaleRequest(
4749
TopicName,
50+
TopicPath,
4851
DatabasePath,
4952
BalancerConfig.PathId,
5053
BalancerConfig.PathVersion,

ydb/core/persqueue/partition_scale_manager.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class TPartitionScaleManager {
4747
};
4848

4949
public:
50-
TPartitionScaleManager(const TString& topicPath, const TString& databasePath, ui64 pathId, int version, const NKikimrPQ::TPQTabletConfig& config);
50+
TPartitionScaleManager(const TString& topicName, const TString& topicPath, const TString& databasePath, ui64 pathId, int version, const NKikimrPQ::TPQTabletConfig& config);
5151

5252
public:
5353
void HandleScaleStatusChange(const ui32 partition, NKikimrPQ::EScaleStatus scaleStatus, const TActorContext& ctx);
@@ -71,6 +71,7 @@ class TPartitionScaleManager {
7171
static const ui32 MAX_SCALE_REQUEST_REPEAT_SECONDS_TIMEOUT = 1000;
7272

7373
const TString TopicName;
74+
const TString TopicPath;
7475
TString DatabasePath = "";
7576
TActorId CurrentScaleRequest;
7677
TDuration RequestTimeout = TDuration::MilliSeconds(0);

ydb/core/persqueue/partition_scale_request.cpp

+14-9
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,17 @@ namespace NKikimr {
44
namespace NPQ {
55

66
TPartitionScaleRequest::TPartitionScaleRequest(
7-
TString topicName,
8-
TString databasePath,
7+
const TString& topicName,
8+
const TString& topicPath,
9+
const TString& databasePath,
910
ui64 pathId,
1011
ui64 pathVersion,
11-
std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit> splits,
12-
const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge> merges,
13-
NActors::TActorId parentActorId
12+
const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit>& splits,
13+
const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge>& merges,
14+
const NActors::TActorId& parentActorId
1415
)
1516
: Topic(topicName)
17+
, TopicPath(topicPath)
1618
, DatabasePath(databasePath)
1719
, PathId(pathId)
1820
, PathVersion(pathVersion)
@@ -30,24 +32,27 @@ void TPartitionScaleRequest::Bootstrap(const NActors::TActorContext &ctx) {
3032
void TPartitionScaleRequest::SendProposeRequest(const NActors::TActorContext &ctx) {
3133
auto proposal = std::make_unique<TEvTxUserProxy::TEvProposeTransaction>();
3234
proposal->Record.SetDatabaseName(CanonizePath(DatabasePath));
33-
FillProposeRequest(*proposal, DatabasePath, Topic, ctx);
35+
FillProposeRequest(*proposal, ctx);
3436
ctx.Send(MakeTxProxyID(), proposal.release());
3537
}
3638

37-
void TPartitionScaleRequest::FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const TString& workingDir, const TString& topicName, const NActors::TActorContext &ctx) {
39+
void TPartitionScaleRequest::FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const NActors::TActorContext &ctx) {
40+
auto workingDir = TopicPath.substr(0, TopicPath.size() - Topic.size());
41+
3842
auto& modifyScheme = *proposal.Record.MutableTransaction()->MutableModifyScheme();
3943
modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterPersQueueGroup);
4044
modifyScheme.SetWorkingDir(workingDir);
45+
modifyScheme.SetInternal(true);
4146

4247
auto applyIf = modifyScheme.AddApplyIf();
4348
applyIf->SetPathId(PathId);
4449
applyIf->SetPathVersion(PathVersion == 0 ? 1 : PathVersion);
4550
applyIf->SetCheckEntityVersion(true);
4651

4752
NKikimrSchemeOp::TPersQueueGroupDescription groupDescription;
48-
groupDescription.SetName(topicName);
53+
groupDescription.SetName(Topic);
4954
TStringBuilder logMessage;
50-
logMessage << "TPartitionScaleRequest::FillProposeRequest trying to scale partitions. Spilts: ";
55+
logMessage << "TPartitionScaleRequest::FillProposeRequest trying to scale partitions of '" << workingDir << "/" << Topic << "'. Spilts: ";
5156
for(const auto& split: Splits) {
5257
auto* newSplit = groupDescription.AddSplit();
5358
logMessage << "partition: " << split.GetPartition() << " boundary: '" << split.GetSplitBoundary() << "' ";

ydb/core/persqueue/partition_scale_request.h

+6-2
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@ class TPartitionScaleRequest: public NActors::TActorBootstrapped<TPartitionScale
2626
};
2727

2828
public:
29-
TPartitionScaleRequest(TString topicName, TString databasePath, ui64 pathId, ui64 pathVersion, std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit> splits, const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge> merges, NActors::TActorId parentActorId);
29+
TPartitionScaleRequest(const TString& topicName, const TString& topicPath, const TString& databasePath, ui64 pathId, ui64 pathVersion,
30+
const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit>& splits,
31+
const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge>& merges,
32+
const NActors::TActorId& parentActorId);
3033

3134
public:
3235
void Bootstrap(const NActors::TActorContext &ctx);
@@ -48,10 +51,11 @@ class TPartitionScaleRequest: public NActors::TActorBootstrapped<TPartitionScale
4851
}
4952
std::pair<TString, TString> SplitPath(const TString& path);
5053
void SendProposeRequest(const NActors::TActorContext &ctx);
51-
void FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const TString& workingDir, const TString& topicName, const NActors::TActorContext &ctx);
54+
void FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const NActors::TActorContext &ctx);
5255

5356
private:
5457
const TString Topic;
58+
const TString TopicPath;
5559
const TString DatabasePath;
5660
const ui64 PathId;
5761
const ui64 PathVersion;

ydb/core/persqueue/read_balancer.cpp

+5-1
Original file line numberDiff line numberDiff line change
@@ -537,7 +537,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr
537537

538538
if (SplitMergeEnabled(TabletConfig)) {
539539
if (!PartitionsScaleManager) {
540-
PartitionsScaleManager = std::make_unique<TPartitionScaleManager>(Topic, DatabasePath, PathId, Version, TabletConfig);
540+
PartitionsScaleManager = std::make_unique<TPartitionScaleManager>(Topic, Path, DatabasePath, PathId, Version, TabletConfig);
541541
} else {
542542
PartitionsScaleManager->UpdateBalancerConfig(PathId, Version, TabletConfig);
543543
}
@@ -1266,16 +1266,20 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvGetReadSessionsInfo::TPtr&
12661266

12671267
void TPersQueueReadBalancer::Handle(TEvPQ::TEvPartitionScaleStatusChanged::TPtr& ev, const TActorContext& ctx) {
12681268
if (!SplitMergeEnabled(TabletConfig)) {
1269+
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "Skip TEvPartitionScaleStatusChanged: autopartitioning disabled.");
12691270
return;
12701271
}
12711272
auto& record = ev->Get()->Record;
12721273
auto* node = PartitionGraph.GetPartition(record.GetPartitionId());
12731274
if (!node) {
1275+
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "Skip TEvPartitionScaleStatusChanged: partition " << record.GetPartitionId() << " not found.");
12741276
return;
12751277
}
12761278

12771279
if (PartitionsScaleManager) {
12781280
PartitionsScaleManager->HandleScaleStatusChange(record.GetPartitionId(), record.GetScaleStatus(), ctx);
1281+
} else {
1282+
LOG_NOTICE_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "Skip TEvPartitionScaleStatusChanged: scale manager isn`t initialized.");
12791283
}
12801284
}
12811285

ydb/core/persqueue/read_balancer__txinit.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ struct TPersQueueReadBalancer::TTxInit : public ITransaction {
6060
Self->PartitionGraph = MakePartitionGraph(Self->TabletConfig);
6161

6262
if (SplitMergeEnabled(Self->TabletConfig)) {
63-
Self->PartitionsScaleManager = std::make_unique<TPartitionScaleManager>(Self->Topic, Self->DatabasePath, Self->PathId, Self->Version, Self->TabletConfig);
63+
Self->PartitionsScaleManager = std::make_unique<TPartitionScaleManager>(Self->Topic, Self->Path, Self->DatabasePath, Self->PathId, Self->Version, Self->TabletConfig);
6464
}
6565
Self->UpdateConfigCounters();
6666
}

ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ TTopicSdkTestSetup CreateSetup() {
9696
NKikimrConfig::TFeatureFlags ff;
9797
ff.SetEnableTopicSplitMerge(true);
9898
ff.SetEnablePQConfigTransactionsAtSchemeShard(true);
99-
//ff.SetEnableTopicServiceTx(true);
99+
ff.SetEnableTopicServiceTx(true);
100100

101101
auto settings = TTopicSdkTestSetup::MakeServerSettings();
102102
settings.SetFeatureFlags(ff);

ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp

+54
Original file line numberDiff line numberDiff line change
@@ -894,6 +894,60 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
894894
}
895895
}
896896

897+
void ExecuteQuery(NYdb::NTable::TSession& session, const TString& query ) {
898+
const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
899+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
900+
}
901+
902+
Y_UNIT_TEST(WithDir_PartitionSplit_AutosplitByLoad) {
903+
TTopicSdkTestSetup setup = CreateSetup();
904+
auto client = setup.MakeClient();
905+
auto tableClient = setup.MakeTableClient();
906+
auto session = tableClient.CreateSession().GetValueSync().GetSession();
907+
908+
setup.GetServer().AnnoyingClient->MkDir("/Root", "dir");
909+
910+
ExecuteQuery(session, R"(
911+
--!syntax_v1
912+
CREATE TOPIC `/Root/dir/origin`
913+
WITH (
914+
AUTO_PARTITIONING_STRATEGY = 'SCALE_UP',
915+
MAX_ACTIVE_PARTITIONS = 50
916+
);
917+
)");
918+
919+
{
920+
auto describe = client.DescribeTopic("/Root/dir/origin").GetValueSync();
921+
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 1);
922+
}
923+
924+
ui64 balancerTabletId;
925+
{
926+
auto pathDescr = setup.GetServer().AnnoyingClient->Ls("/Root/dir/origin")->Record.GetPathDescription().GetSelf();
927+
balancerTabletId = pathDescr.GetBalancerTabletID();
928+
Cerr << ">>>>> BalancerTabletID=" << balancerTabletId << Endl << Flush;
929+
UNIT_ASSERT(balancerTabletId);
930+
}
931+
932+
{
933+
const auto edge = setup.GetRuntime().AllocateEdgeActor();
934+
setup.GetRuntime().SendToPipe(balancerTabletId, edge, new TEvPQ::TEvPartitionScaleStatusChanged(0, NKikimrPQ::EScaleStatus::NEED_SPLIT));
935+
}
936+
937+
{
938+
size_t partitionCount = 0;
939+
for (size_t i = 0; i < 10; ++i) {
940+
Sleep(TDuration::Seconds(1));
941+
auto describe = client.DescribeTopic("/Root/dir/origin").GetValueSync();
942+
partitionCount = describe.GetTopicDescription().GetPartitions().size();
943+
if (partitionCount == 3) {
944+
break;
945+
}
946+
}
947+
UNIT_ASSERT_VALUES_EQUAL(partitionCount, 3);
948+
}
949+
}
950+
897951
Y_UNIT_TEST(MidOfRange) {
898952
auto AsString = [](std::vector<ui16> vs) {
899953
TStringBuilder a;

ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.cpp

+6
Original file line numberDiff line numberDiff line change
@@ -136,3 +136,9 @@ TTopicClient TTopicSdkTestSetup::MakeClient() const
136136
{
137137
return TTopicClient(MakeDriver());
138138
}
139+
140+
NYdb::NTable::TTableClient TTopicSdkTestSetup::MakeTableClient() const
141+
{
142+
return NYdb::NTable::TTableClient(MakeDriver(), NYdb::NTable::TClientSettings()
143+
.UseQueryCache(false));
144+
}

ydb/public/sdk/cpp/client/ydb_topic/ut/ut_utils/topic_sdk_test_setup.h

+1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class TTopicSdkTestSetup {
3333
TLog& GetLog();
3434

3535
TTopicClient MakeClient() const;
36+
NYdb::NTable::TTableClient MakeTableClient() const;
3637

3738
TDriver MakeDriver() const;
3839
TDriver MakeDriver(const TDriverConfig& config) const;

0 commit comments

Comments
 (0)