Skip to content

Commit 0a916d0

Browse files
authored
Autoscaling control plane (#4486)
1 parent b098b52 commit 0a916d0

17 files changed

+616
-158
lines changed

ydb/core/persqueue/partition_scale_manager.cpp

+4-4
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ namespace NPQ {
55

66

77
TPartitionScaleManager::TPartitionScaleManager(
8-
const TString& topicName,
9-
const TString& databasePath,
8+
const TString& topicName,
9+
const TString& databasePath,
1010
NKikimrPQ::TUpdateBalancerConfig& balancerConfig
1111
)
1212
: TopicName(topicName)
@@ -34,7 +34,7 @@ void TPartitionScaleManager::TrySendScaleRequest(const TActorContext& ctx) {
3434
if (splitMergePair.first.empty() && splitMergePair.second.empty()) {
3535
return;
3636
}
37-
37+
3838
RequestInflight = true;
3939
CurrentScaleRequest = ctx.Register(new TPartitionScaleRequest(
4040
TopicName,
@@ -55,7 +55,7 @@ std::pair<std::vector<TPartitionSplit>, std::vector<TPartitionMerge>> TPartition
5555
std::vector<TPartitionSplit> splitsToApply;
5656
std::vector<TPartitionMerge> mergesToApply;
5757

58-
size_t allowedSplitsCount = BalancerConfig.PartitionCountLimit > BalancerConfig.CurPartitions ? BalancerConfig.PartitionCountLimit - BalancerConfig.CurPartitions : 0;
58+
size_t allowedSplitsCount = BalancerConfig.MaxActivePartitions > BalancerConfig.CurPartitions ? BalancerConfig.MaxActivePartitions - BalancerConfig.CurPartitions : 0;
5959
auto itSplit = PartitionsToSplit.begin();
6060
while (allowedSplitsCount > 0 && itSplit != PartitionsToSplit.end()) {
6161
const auto partitionId = itSplit->first;

ydb/core/persqueue/partition_scale_manager.h

+5-5
Original file line numberDiff line numberDiff line change
@@ -28,23 +28,23 @@ class TPartitionScaleManager {
2828
NSchemeShard::TTopicTabletInfo::TKeyRange KeyRange;
2929
};
3030

31-
private:
31+
private:
3232
struct TBalancerConfig {
3333
TBalancerConfig(
3434
NKikimrPQ::TUpdateBalancerConfig& config
3535
)
3636
: PathId(config.GetPathId())
3737
, PathVersion(config.GetVersion())
3838
, PartitionGraph(MakePartitionGraph(config))
39-
, PartitionCountLimit(config.GetTabletConfig().GetPartitionStrategy().GetMaxPartitionCount())
39+
, MaxActivePartitions(config.GetTabletConfig().GetPartitionStrategy().GetMaxPartitionCount())
4040
, MinActivePartitions(config.GetTabletConfig().GetPartitionStrategy().GetMinPartitionCount())
4141
, CurPartitions(config.PartitionsSize()) {
4242
}
4343

4444
ui64 PathId;
4545
int PathVersion;
4646
TPartitionGraph PartitionGraph;
47-
ui64 PartitionCountLimit;
47+
ui64 MaxActivePartitions;
4848
ui64 MinActivePartitions;
4949
ui64 CurPartitions;
5050
};
@@ -59,7 +59,7 @@ class TPartitionScaleManager {
5959
void UpdateBalancerConfig(NKikimrPQ::TUpdateBalancerConfig& config);
6060
void UpdateDatabasePath(const TString& dbPath);
6161
void Die(const TActorContext& ctx);
62-
62+
6363
static TString GetRangeMid(const TString& from, const TString& to);
6464

6565
private:
@@ -74,7 +74,7 @@ class TPartitionScaleManager {
7474
private:
7575
static const ui32 MIN_SCALE_REQUEST_REPEAT_SECONDS_TIMEOUT = 10;
7676
static const ui32 MAX_SCALE_REQUEST_REPEAT_SECONDS_TIMEOUT = 1000;
77-
77+
7878
const TString TopicName;
7979
TString DatabasePath = "";
8080
TActorId CurrentScaleRequest;

ydb/core/persqueue/partition_scale_request.cpp

+17-13
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@ namespace NKikimr {
44
namespace NPQ {
55

66
TPartitionScaleRequest::TPartitionScaleRequest(
7-
TString topicName,
8-
TString databasePath,
9-
ui64 pathId,
10-
ui64 pathVersion,
11-
std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit> splits,
12-
const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge> merges,
7+
TString topicName,
8+
TString databasePath,
9+
ui64 pathId,
10+
ui64 pathVersion,
11+
std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit> splits,
12+
const std::vector<NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge> merges,
1313
NActors::TActorId parentActorId
1414
)
1515
: Topic(topicName)
@@ -19,7 +19,7 @@ TPartitionScaleRequest::TPartitionScaleRequest(
1919
, Splits(splits)
2020
, Merges(merges)
2121
, ParentActorId(parentActorId) {
22-
22+
2323
}
2424

2525
void TPartitionScaleRequest::Bootstrap(const NActors::TActorContext &ctx) {
@@ -41,8 +41,8 @@ void TPartitionScaleRequest::FillProposeRequest(TEvTxUserProxy::TEvProposeTransa
4141

4242
auto applyIf = modifyScheme.AddApplyIf();
4343
applyIf->SetPathId(PathId);
44-
applyIf->SetPathVersion(PathVersion);
45-
//applyIf->SetCheckGeneralVersion(false);
44+
applyIf->SetPathVersion(PathVersion == 0 ? 1 : PathVersion);
45+
applyIf->SetCheckEntityVersion(true);
4646

4747
NKikimrSchemeOp::TPersQueueGroupDescription groupDescription;
4848
groupDescription.SetName(topicName);
@@ -70,14 +70,14 @@ void TPartitionScaleRequest::PassAway() {
7070

7171
void TPartitionScaleRequest::Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActorContext &ctx) {
7272
if (ev->Get()->Status != NKikimrProto::OK) {
73-
auto scaleRequestResult = std::make_unique<TEvPartitionScaleRequestDone>(TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ProxyShardNotAvailable);//savnik: проверить, какой статус тут приходит
73+
auto scaleRequestResult = std::make_unique<TEvPartitionScaleRequestDone>(TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ProxyShardNotAvailable);
7474
Send(ParentActorId, scaleRequestResult.release());
7575
Die(ctx);
7676
}
7777
}
7878

7979
void TPartitionScaleRequest::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr&, const TActorContext &ctx) {
80-
auto scaleRequestResult = std::make_unique<TEvPartitionScaleRequestDone>(TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ProxyShardNotAvailable);//savnik: проверить, какой статус тут приходит
80+
auto scaleRequestResult = std::make_unique<TEvPartitionScaleRequestDone>(TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ProxyShardNotAvailable);
8181
Send(ParentActorId, scaleRequestResult.release());
8282
Die(ctx);
8383
}
@@ -90,11 +90,15 @@ void TPartitionScaleRequest::Handle(NSchemeShard::TEvSchemeShard::TEvNotifyTxCom
9090

9191
void TPartitionScaleRequest::Handle(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev, const NActors::TActorContext& ctx) {
9292
auto msg = ev->Get();
93-
//Cerr << "SAVDBG" << msg->Record.GetIssues()[0].Getmessage(); //savnik: log err
9493

9594
auto status = static_cast<TEvTxUserProxy::TEvProposeTransactionStatus::EStatus>(msg->Record.GetStatus());
9695
if (status != TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress) {
97-
auto scaleRequestResult = std::make_unique<TEvPartitionScaleRequestDone>(status);//savnik: проверить, какой статус тут приходит
96+
auto scaleRequestResult = std::make_unique<TEvPartitionScaleRequestDone>(status);
97+
TStringBuilder issues;
98+
for (auto& issue : ev->Get()->Record.GetIssues()) {
99+
issues << issue.ShortDebugString() + ", ";
100+
}
101+
Cerr << "\n SAVDGB " << issues << "\n";
98102
Send(ParentActorId, scaleRequestResult.release());
99103
Die(ctx);
100104
} else {

ydb/core/persqueue/partition_write.cpp

+7-2
Original file line numberDiff line numberDiff line change
@@ -541,9 +541,14 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) {
541541
NKikimrPQ::EScaleStatus TPartition::CheckScaleStatus(const TActorContext& /*ctx*/) {
542542
auto const writeSpeedUsagePercent = SplitMergeAvgWriteBytes->GetValue() * 100.0 / Config.GetPartitionStrategy().GetScaleThresholdSeconds() / TotalPartitionWriteSpeed;
543543

544-
if (writeSpeedUsagePercent >= Config.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent()) {
544+
auto splitEnabled = Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT
545+
|| Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE;
546+
547+
auto mergeEnabled = Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE;
548+
549+
if (splitEnabled && writeSpeedUsagePercent >= Config.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent()) {
545550
return NKikimrPQ::EScaleStatus::NEED_SPLIT;
546-
} else if (writeSpeedUsagePercent <= Config.GetPartitionStrategy().GetScaleDownPartitionWriteSpeedThresholdPercent()) {
551+
} else if (mergeEnabled && writeSpeedUsagePercent <= Config.GetPartitionStrategy().GetScaleDownPartitionWriteSpeedThresholdPercent()) {
547552
return NKikimrPQ::EScaleStatus::NEED_MERGE;
548553
}
549554
return NKikimrPQ::EScaleStatus::NORMAL;

ydb/core/persqueue/ut/autoscaling_ut.cpp

+98
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,104 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
498498
status = client.CommitOffset(TEST_TOPIC, 0, TEST_CONSUMER, 0).GetValueSync();
499499
UNIT_ASSERT_VALUES_EQUAL_C(NYdb::EStatus::BAD_REQUEST, status.GetStatus(), "The consumer cannot commit an offset for inactive, read-to-the-end partitions.");
500500
}
501+
502+
Y_UNIT_TEST(CreateAlterDescribe) {
503+
auto autoscalingTestTopic = "autoscalit-topic";
504+
TTopicSdkTestSetup setup = CreateSetup();
505+
TTopicClient client = setup.MakeClient();
506+
507+
auto minParts = 5;
508+
auto maxParts = 10;
509+
auto scaleUpPercent = 80;
510+
auto scaleDownPercent = 20;
511+
auto threshold = 500;
512+
auto strategy = EAutoscalingStrategy::ScaleUp;
513+
514+
TCreateTopicSettings createSettings;
515+
createSettings
516+
.BeginConfigurePartitioningSettings()
517+
.MinActivePartitions(minParts)
518+
.MaxActivePartitions(maxParts)
519+
.BeginConfigureAutoscalingSettings()
520+
.ScaleUpThresholdPercent(scaleUpPercent)
521+
.ScaleDownThresholdPercent(scaleDownPercent)
522+
.ThresholdTime(TDuration::Seconds(threshold))
523+
.Strategy(strategy)
524+
.EndConfigureAutoscalingSettings()
525+
.EndConfigurePartitioningSettings();
526+
client.CreateTopic(autoscalingTestTopic, createSettings).Wait();
527+
528+
TDescribeTopicSettings descSettings;
529+
530+
auto describe = client.DescribeTopic(autoscalingTestTopic, descSettings).GetValueSync();
531+
UNIT_ASSERT_VALUES_EQUAL_C(describe.GetStatus(), NYdb::EStatus::SUCCESS, describe.GetIssues().ToString());
532+
533+
534+
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), minParts);
535+
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetMaxActivePartitions(), maxParts);
536+
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetStrategy(), strategy);
537+
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetScaleDownThresholdPercent(), scaleDownPercent);
538+
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetScaleUpThresholdPercent(), scaleUpPercent);
539+
UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetThresholdTime().Seconds(), threshold);
540+
541+
auto alterMinParts = 10;
542+
auto alterMaxParts = 20;
543+
auto alterScaleUpPercent = 90;
544+
auto alterScaleDownPercent = 10;
545+
auto alterThreshold = 700;
546+
auto alterStrategy = EAutoscalingStrategy::ScaleUpAndDown;
547+
548+
TAlterTopicSettings alterSettings;
549+
alterSettings
550+
.BeginAlterPartitioningSettings()
551+
.MinActivePartitions(alterMinParts)
552+
.MaxActivePartitions(alterMaxParts)
553+
.BeginAlterAutoscalingSettings()
554+
.ScaleDownThresholdPercent(alterScaleDownPercent)
555+
.ScaleUpThresholdPercent(alterScaleUpPercent)
556+
.ThresholdTime(TDuration::Seconds(alterThreshold))
557+
.Strategy(alterStrategy)
558+
.EndAlterAutoscalingSettings()
559+
.EndAlterTopicPartitioningSettings();
560+
561+
client.AlterTopic(autoscalingTestTopic, alterSettings).Wait();
562+
563+
auto describeAfterAlter = client.DescribeTopic(autoscalingTestTopic).GetValueSync();
564+
565+
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), alterMinParts);
566+
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetMaxActivePartitions(), alterMaxParts);
567+
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetStrategy(), alterStrategy);
568+
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetScaleDownThresholdPercent(), alterScaleDownPercent);
569+
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetScaleUpThresholdPercent(), alterScaleUpPercent);
570+
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetThresholdTime().Seconds(), alterThreshold);
571+
}
572+
573+
Y_UNIT_TEST(PartitionSplit_AutosplitByLoad) {
574+
TTopicSdkTestSetup setup = CreateSetup();
575+
TTopicClient client = setup.MakeClient();
576+
577+
TCreateTopicSettings createSettings;
578+
createSettings
579+
.BeginConfigurePartitioningSettings()
580+
.MinActivePartitions(1)
581+
.MaxActivePartitions(100)
582+
.BeginConfigureAutoscalingSettings()
583+
.ScaleUpThresholdPercent(2)
584+
.ScaleDownThresholdPercent(1)
585+
.ThresholdTime(TDuration::Seconds(1))
586+
.Strategy(EAutoscalingStrategy::ScaleUp)
587+
.EndConfigureAutoscalingSettings()
588+
.EndConfigurePartitioningSettings();
589+
client.CreateTopic(TEST_TOPIC, createSettings).Wait();
590+
591+
auto msg = TString("a", 1_MB);
592+
auto writeSession = CreateWriteSession(client, "producer-1", 0);
593+
UNIT_ASSERT(writeSession->Write(Msg(msg, 1)));
594+
UNIT_ASSERT(writeSession->Write(Msg(msg, 2)));
595+
Sleep(TDuration::Seconds(5));
596+
auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync();
597+
UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 3);
598+
}
501599
}
502600

503601
} // namespace NKikimr

ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,9 @@ TTopicSdkTestSetup CreateSetup() {
105105
return setup;
106106
}
107107

108-
std::shared_ptr<ISimpleBlockingWriteSession> CreateWriteSession(TTopicClient& client, const TString& producer, std::optional<ui32> partition) {
108+
std::shared_ptr<ISimpleBlockingWriteSession> CreateWriteSession(TTopicClient& client, const TString& producer, std::optional<ui32> partition, TString topic) {
109109
auto writeSettings = TWriteSessionSettings()
110-
.Path(TEST_TOPIC)
110+
.Path(topic)
111111
.ProducerId(producer);
112112
if (partition) {
113113
writeSettings.PartitionId(*partition);

ydb/core/persqueue/ut/common/autoscaling_ut_common.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ TWriteMessage Msg(const TString& data, ui64 seqNo);
2727

2828
TTopicSdkTestSetup CreateSetup();
2929

30-
std::shared_ptr<ISimpleBlockingWriteSession> CreateWriteSession(TTopicClient& client, const TString& producer, std::optional<ui32> partition = std::nullopt);
30+
std::shared_ptr<ISimpleBlockingWriteSession> CreateWriteSession(TTopicClient& client, const TString& producer, std::optional<ui32> partition = std::nullopt, TString topic = TEST_TOPIC);
3131

3232
struct TTestReadSession {
3333
struct MsgInfo {

0 commit comments

Comments
 (0)