Skip to content

Fix autopartitioning of topics with path that is not root of database #10609

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ydb/core/persqueue/partition_scale_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ namespace NPQ {

TPartitionScaleManager::TPartitionScaleManager(
const TString& topicName,
const TString& topicPath,
const TString& databasePath,
ui64 pathId,
int version,
const NKikimrPQ::TPQTabletConfig& config
)
: TopicName(topicName)
, TopicPath(topicPath)
, DatabasePath(databasePath)
, BalancerConfig(pathId, version, config) {
}
Expand Down Expand Up @@ -45,6 +47,7 @@ void TPartitionScaleManager::TrySendScaleRequest(const TActorContext& ctx) {
<< "send split request");
CurrentScaleRequest = ctx.Register(new TPartitionScaleRequest(
TopicName,
TopicPath,
DatabasePath,
BalancerConfig.PathId,
BalancerConfig.PathVersion,
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/persqueue/partition_scale_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class TPartitionScaleManager {
};

public:
TPartitionScaleManager(const TString& topicPath, const TString& databasePath, ui64 pathId, int version, const NKikimrPQ::TPQTabletConfig& config);
TPartitionScaleManager(const TString& topicName, const TString& topicPath, const TString& databasePath, ui64 pathId, int version, const NKikimrPQ::TPQTabletConfig& config);

public:
void HandleScaleStatusChange(const ui32 partition, NKikimrPQ::EScaleStatus scaleStatus, const TActorContext& ctx);
Expand All @@ -71,6 +71,7 @@ class TPartitionScaleManager {
static const ui32 MAX_SCALE_REQUEST_REPEAT_SECONDS_TIMEOUT = 1000;

const TString TopicName;
const TString TopicPath;
TString DatabasePath = "";
TActorId CurrentScaleRequest;
TDuration RequestTimeout = TDuration::MilliSeconds(0);
Expand Down
23 changes: 14 additions & 9 deletions ydb/core/persqueue/partition_scale_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@ namespace NKikimr {
namespace NPQ {

TPartitionScaleRequest::TPartitionScaleRequest(
TString topicName,
TString databasePath,
const TString& topicName,
const TString& topicPath,
const TString& databasePath,
ui64 pathId,
ui64 pathVersion,
std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit> splits,
const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge> merges,
NActors::TActorId parentActorId
const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit>& splits,
const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge>& merges,
const NActors::TActorId& parentActorId
)
: Topic(topicName)
, TopicPath(topicPath)
, DatabasePath(databasePath)
, PathId(pathId)
, PathVersion(pathVersion)
Expand All @@ -30,24 +32,27 @@ void TPartitionScaleRequest::Bootstrap(const NActors::TActorContext &ctx) {
void TPartitionScaleRequest::SendProposeRequest(const NActors::TActorContext &ctx) {
auto proposal = std::make_unique<TEvTxUserProxy::TEvProposeTransaction>();
proposal->Record.SetDatabaseName(CanonizePath(DatabasePath));
FillProposeRequest(*proposal, DatabasePath, Topic, ctx);
FillProposeRequest(*proposal, ctx);
ctx.Send(MakeTxProxyID(), proposal.release());
}

void TPartitionScaleRequest::FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const TString& workingDir, const TString& topicName, const NActors::TActorContext &ctx) {
void TPartitionScaleRequest::FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const NActors::TActorContext &ctx) {
auto workingDir = TopicPath.substr(0, TopicPath.size() - Topic.size());

auto& modifyScheme = *proposal.Record.MutableTransaction()->MutableModifyScheme();
modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterPersQueueGroup);
modifyScheme.SetWorkingDir(workingDir);
modifyScheme.SetInternal(true);

auto applyIf = modifyScheme.AddApplyIf();
applyIf->SetPathId(PathId);
applyIf->SetPathVersion(PathVersion == 0 ? 1 : PathVersion);
applyIf->SetCheckEntityVersion(true);

NKikimrSchemeOp::TPersQueueGroupDescription groupDescription;
groupDescription.SetName(topicName);
groupDescription.SetName(Topic);
TStringBuilder logMessage;
logMessage << "TPartitionScaleRequest::FillProposeRequest trying to scale partitions. Spilts: ";
logMessage << "TPartitionScaleRequest::FillProposeRequest trying to scale partitions of '" << workingDir << "/" << Topic << "'. Spilts: ";
for(const auto& split: Splits) {
auto* newSplit = groupDescription.AddSplit();
logMessage << "partition: " << split.GetPartition() << " boundary: '" << split.GetSplitBoundary() << "' ";
Expand Down
8 changes: 6 additions & 2 deletions ydb/core/persqueue/partition_scale_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ class TPartitionScaleRequest: public NActors::TActorBootstrapped<TPartitionScale
};

public:
TPartitionScaleRequest(TString topicName, TString databasePath, ui64 pathId, ui64 pathVersion, std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit> splits, const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge> merges, NActors::TActorId parentActorId);
TPartitionScaleRequest(const TString& topicName, const TString& topicPath, const TString& databasePath, ui64 pathId, ui64 pathVersion,
const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit>& splits,
const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge>& merges,
const NActors::TActorId& parentActorId);

public:
void Bootstrap(const NActors::TActorContext &ctx);
Expand All @@ -48,10 +51,11 @@ class TPartitionScaleRequest: public NActors::TActorBootstrapped<TPartitionScale
}
std::pair<TString, TString> SplitPath(const TString& path);
void SendProposeRequest(const NActors::TActorContext &ctx);
void FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const TString& workingDir, const TString& topicName, const NActors::TActorContext &ctx);
void FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const NActors::TActorContext &ctx);

private:
const TString Topic;
const TString TopicPath;
const TString DatabasePath;
const ui64 PathId;
const ui64 PathVersion;
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/persqueue/read_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr

if (SplitMergeEnabled(TabletConfig)) {
if (!PartitionsScaleManager) {
PartitionsScaleManager = std::make_unique<TPartitionScaleManager>(Topic, DatabasePath, PathId, Version, TabletConfig);
PartitionsScaleManager = std::make_unique<TPartitionScaleManager>(Topic, Path, DatabasePath, PathId, Version, TabletConfig);
} else {
PartitionsScaleManager->UpdateBalancerConfig(PathId, Version, TabletConfig);
}
Expand Down Expand Up @@ -1266,16 +1266,20 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvGetReadSessionsInfo::TPtr&

void TPersQueueReadBalancer::Handle(TEvPQ::TEvPartitionScaleStatusChanged::TPtr& ev, const TActorContext& ctx) {
if (!SplitMergeEnabled(TabletConfig)) {
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "Skip TEvPartitionScaleStatusChanged: autopartitioning disabled.");
return;
}
auto& record = ev->Get()->Record;
auto* node = PartitionGraph.GetPartition(record.GetPartitionId());
if (!node) {
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "Skip TEvPartitionScaleStatusChanged: partition " << record.GetPartitionId() << " not found.");
return;
}

if (PartitionsScaleManager) {
PartitionsScaleManager->HandleScaleStatusChange(record.GetPartitionId(), record.GetScaleStatus(), ctx);
} else {
LOG_NOTICE_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "Skip TEvPartitionScaleStatusChanged: scale manager isn`t initialized.");
}
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/read_balancer__txinit.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ struct TPersQueueReadBalancer::TTxInit : public ITransaction {
Self->PartitionGraph = MakePartitionGraph(Self->TabletConfig);

if (SplitMergeEnabled(Self->TabletConfig)) {
Self->PartitionsScaleManager = std::make_unique<TPartitionScaleManager>(Self->Topic, Self->DatabasePath, Self->PathId, Self->Version, Self->TabletConfig);
Self->PartitionsScaleManager = std::make_unique<TPartitionScaleManager>(Self->Topic, Self->Path, Self->DatabasePath, Self->PathId, Self->Version, Self->TabletConfig);
}
Self->UpdateConfigCounters();
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ TTopicSdkTestSetup CreateSetup() {
NKikimrConfig::TFeatureFlags ff;
ff.SetEnableTopicSplitMerge(true);
ff.SetEnablePQConfigTransactionsAtSchemeShard(true);
//ff.SetEnableTopicServiceTx(true);
ff.SetEnableTopicServiceTx(true);

auto settings = TTopicSdkTestSetup::MakeServerSettings();
settings.SetFeatureFlags(ff);
Expand Down
54 changes: 54 additions & 0 deletions ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -894,6 +894,60 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
}
}

void ExecuteQuery(NYdb::NTable::TSession& session, const TString& query ) {
const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
}

Y_UNIT_TEST(WithDir_PartitionSplit_AutosplitByLoad) {
TTopicSdkTestSetup setup = CreateSetup();
auto client = setup.MakeClient();
auto tableClient = setup.MakeTableClient();
auto session = tableClient.CreateSession().GetValueSync().GetSession();

setup.GetServer().AnnoyingClient->MkDir("/Root", "dir");

ExecuteQuery(session, R"(
--!syntax_v1
CREATE TOPIC `/Root/dir/origin`
WITH (
AUTO_PARTITIONING_STRATEGY = 'SCALE_UP',
MAX_ACTIVE_PARTITIONS = 50
);
)");

{
auto describe = client.DescribeTopic("/Root/dir/origin").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 1);
}

ui64 balancerTabletId;
{
auto pathDescr = setup.GetServer().AnnoyingClient->Ls("/Root/dir/origin")->Record.GetPathDescription().GetSelf();
balancerTabletId = pathDescr.GetBalancerTabletID();
Cerr << ">>>>> BalancerTabletID=" << balancerTabletId << Endl << Flush;
UNIT_ASSERT(balancerTabletId);
}

{
const auto edge = setup.GetRuntime().AllocateEdgeActor();
setup.GetRuntime().SendToPipe(balancerTabletId, edge, new TEvPQ::TEvPartitionScaleStatusChanged(0, NKikimrPQ::EScaleStatus::NEED_SPLIT));
}

{
size_t partitionCount = 0;
for (size_t i = 0; i < 10; ++i) {
Sleep(TDuration::Seconds(1));
auto describe = client.DescribeTopic("/Root/dir/origin").GetValueSync();
partitionCount = describe.GetTopicDescription().GetPartitions().size();
if (partitionCount == 3) {
break;
}
}
UNIT_ASSERT_VALUES_EQUAL(partitionCount, 3);
}
}

Y_UNIT_TEST(MidOfRange) {
auto AsString = [](std::vector<ui16> vs) {
TStringBuilder a;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,9 @@ TTopicClient TTopicSdkTestSetup::MakeClient() const
{
return TTopicClient(MakeDriver());
}

NYdb::NTable::TTableClient TTopicSdkTestSetup::MakeTableClient() const
{
return NYdb::NTable::TTableClient(MakeDriver(), NYdb::NTable::TClientSettings()
.UseQueryCache(false));
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class TTopicSdkTestSetup {
TLog& GetLog();

TTopicClient MakeClient() const;
NYdb::NTable::TTableClient MakeTableClient() const;

TDriver MakeDriver() const;
TDriver MakeDriver(const TDriverConfig& config) const;
Expand Down
Loading