Skip to content

Commit 2b0adf9

Browse files
committed
fix
1 parent a59ef51 commit 2b0adf9

File tree

4 files changed

+115
-11
lines changed

4 files changed

+115
-11
lines changed

ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp

+37
Original file line numberDiff line numberDiff line change
@@ -757,6 +757,43 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
757757
}
758758
}
759759

760+
Y_UNIT_TEST(ControlPlane_BackCompatibility) {
761+
auto topicName = "back-compatibility-test";
762+
763+
TTopicSdkTestSetup setup = CreateSetup();
764+
TTopicClient client = setup.MakeClient();
765+
766+
{
767+
TCreateTopicSettings createSettings;
768+
createSettings
769+
.BeginConfigurePartitioningSettings()
770+
.MinActivePartitions(3)
771+
.EndConfigurePartitioningSettings();
772+
client.CreateTopic(topicName, createSettings).Wait();
773+
}
774+
775+
{
776+
auto describeAfterAlter = client.DescribeTopic(topicName).GetValueSync();
777+
778+
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), 3);
779+
}
780+
781+
{
782+
TAlterTopicSettings alterSettings;
783+
alterSettings
784+
.BeginAlterPartitioningSettings()
785+
.MinActivePartitions(5)
786+
.EndAlterTopicPartitioningSettings();
787+
client.AlterTopic(topicName, alterSettings).Wait();
788+
}
789+
790+
{
791+
auto describeAfterAlter = client.DescribeTopic(topicName).GetValueSync();
792+
793+
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), 5);
794+
}
795+
}
796+
760797
Y_UNIT_TEST(ControlPlane_PauseAutoPartitioning) {
761798
auto topicName = "autoscalit-topic";
762799

ydb/services/datastreams/datastreams_proxy.cpp

+7-3
Original file line numberDiff line numberDiff line change
@@ -514,9 +514,14 @@ namespace NKikimr::NDataStreams::V1 {
514514
}
515515

516516
if (!GetProtoRequest()->has_partitioning_settings() ||
517+
517518
(GetProtoRequest()->partitioning_settings().has_auto_partitioning_settings() &&
518-
GetProtoRequest()->partitioning_settings().auto_partitioning_settings().strategy() ==
519-
Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED)) {
519+
520+
(GetProtoRequest()->partitioning_settings().auto_partitioning_settings().strategy() ==
521+
Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED) ||
522+
523+
(GetProtoRequest()->partitioning_settings().auto_partitioning_settings().strategy() ==
524+
Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_UNSPECIFIED))) {
520525

521526
if (!ValidateShardsCount(*GetProtoRequest(), pqGroupDescription, error))
522527
{
@@ -537,7 +542,6 @@ namespace NKikimr::NDataStreams::V1 {
537542
auto& as = s.auto_partitioning_settings();
538543
switch(as.strategy()) {
539544
case Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_UNSPECIFIED:
540-
break;
541545
case Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED:
542546
case Ydb::DataStreams::V1::AutoPartitioningStrategy::AutoPartitioningStrategy_INT_MAX_SENTINEL_DO_NOT_USE_:
543547
case Ydb::DataStreams::V1::AutoPartitioningStrategy::AutoPartitioningStrategy_INT_MIN_SENTINEL_DO_NOT_USE_:

ydb/services/datastreams/datastreams_ut.cpp

+60
Original file line numberDiff line numberDiff line change
@@ -2867,6 +2867,66 @@ Y_UNIT_TEST_SUITE(DataStreams) {
28672867
UNIT_ASSERT_VALUES_EQUAL(d.partitioning_settings().auto_partitioning_settings().strategy(), ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED);
28682868
}
28692869

2870+
{
2871+
auto result = testServer.DataStreamsClient->UpdateStream(streamForAlterTest,
2872+
NYDS_V1::TUpdateStreamSettings()
2873+
.TargetShardCount(10)
2874+
).ExtractValueSync();
2875+
2876+
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
2877+
if (result.GetStatus() != EStatus::SUCCESS) {
2878+
result.GetIssues().PrintTo(Cerr);
2879+
}
2880+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2881+
}
2882+
2883+
{
2884+
auto result = testServer.DataStreamsClient->DescribeStream(streamForAlterTest).ExtractValueSync();
2885+
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
2886+
Cerr << result.GetIssues().ToString() << "\n";
2887+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2888+
2889+
auto& d = result.GetResult().stream_description();
2890+
UNIT_ASSERT_VALUES_EQUAL(d.shards().size(), 10);
2891+
UNIT_ASSERT_VALUES_EQUAL(d.stream_status(), YDS_V1::StreamDescription::ACTIVE);
2892+
UNIT_ASSERT_VALUES_EQUAL(d.stream_name(), streamForAlterTest);
2893+
UNIT_ASSERT_VALUES_EQUAL(d.stream_arn(), streamForAlterTest);
2894+
2895+
UNIT_ASSERT_VALUES_EQUAL(d.partitioning_settings().auto_partitioning_settings().strategy(), ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED);
2896+
}
2897+
2898+
{
2899+
auto result = testServer.DataStreamsClient->UpdateStream(streamForAlterTest,
2900+
NYDS_V1::TUpdateStreamSettings()
2901+
.TargetShardCount(15)
2902+
.BeginConfigurePartitioningSettings()
2903+
.BeginConfigureAutoPartitioningSettings()
2904+
.EndConfigureAutoPartitioningSettings()
2905+
.EndConfigurePartitioningSettings()
2906+
).ExtractValueSync();
2907+
2908+
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
2909+
if (result.GetStatus() != EStatus::SUCCESS) {
2910+
result.GetIssues().PrintTo(Cerr);
2911+
}
2912+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2913+
}
2914+
2915+
{
2916+
auto result = testServer.DataStreamsClient->DescribeStream(streamForAlterTest).ExtractValueSync();
2917+
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
2918+
Cerr << result.GetIssues().ToString() << "\n";
2919+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2920+
2921+
auto& d = result.GetResult().stream_description();
2922+
UNIT_ASSERT_VALUES_EQUAL(d.shards().size(), 15);
2923+
UNIT_ASSERT_VALUES_EQUAL(d.stream_status(), YDS_V1::StreamDescription::ACTIVE);
2924+
UNIT_ASSERT_VALUES_EQUAL(d.stream_name(), streamForAlterTest);
2925+
UNIT_ASSERT_VALUES_EQUAL(d.stream_arn(), streamForAlterTest);
2926+
2927+
UNIT_ASSERT_VALUES_EQUAL(d.partitioning_settings().auto_partitioning_settings().strategy(), ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED);
2928+
}
2929+
28702930
{
28712931
auto result = testServer.DataStreamsClient->CreateStream(streamName2,
28722932
NYDS_V1::TCreateStreamSettings()

ydb/services/lib/actors/pq_schema_actor.cpp

+11-8
Original file line numberDiff line numberDiff line change
@@ -1165,7 +1165,7 @@ namespace NKikimr::NGRpcProxy::V1 {
11651165
NPQ::Migrate(*pqTabletConfig);
11661166
auto partConfig = pqTabletConfig->MutablePartitionConfig();
11671167

1168-
auto finalAutoPartitioningEnabled = false;
1168+
auto needHandleAutoPartitioning = false;
11691169
if (appData->FeatureFlags.GetEnableTopicSplitMerge()) {
11701170

11711171
auto reqHasAutoPartitioningStrategyChange = request.has_alter_partitioning_settings() &&
@@ -1176,10 +1176,13 @@ namespace NKikimr::NGRpcProxy::V1 {
11761176
pqTabletConfig->GetPartitionStrategy().HasPartitionStrategyType() &&
11771177
pqTabletConfig->GetPartitionStrategy().GetPartitionStrategyType();
11781178

1179-
if (reqHasAutoPartitioningStrategyChange) {
1180-
finalAutoPartitioningEnabled = request.alter_partitioning_settings().alter_auto_partitioning_settings().set_strategy() != ::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED;
1181-
} else if (pqConfigHasAutoPartitioningStrategy) {
1182-
finalAutoPartitioningEnabled = pqTabletConfig->GetPartitionStrategy().GetPartitionStrategyType() != ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED;
1179+
if (pqConfigHasAutoPartitioningStrategy && pqTabletConfig->GetPartitionStrategy().GetPartitionStrategyType() != ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED) {
1180+
needHandleAutoPartitioning = true;
1181+
} else if (reqHasAutoPartitioningStrategyChange) {
1182+
auto strategy = request.alter_partitioning_settings().alter_auto_partitioning_settings().set_strategy();
1183+
needHandleAutoPartitioning = strategy == ::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_PAUSED ||
1184+
strategy == ::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP ||
1185+
strategy == ::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN;
11831186
}
11841187

11851188
}
@@ -1197,12 +1200,12 @@ namespace NKikimr::NGRpcProxy::V1 {
11971200
if (settings.has_set_min_active_partitions()) {
11981201
auto minParts = IfEqualThenDefault<i64>(settings.set_min_active_partitions(), 0L, 1L);
11991202
pqDescr.SetTotalGroupCount(minParts);
1200-
if (finalAutoPartitioningEnabled) {
1203+
if (needHandleAutoPartitioning) {
12011204
pqTabletConfig->MutablePartitionStrategy()->SetMinPartitionCount(minParts);
12021205
}
12031206
}
12041207

1205-
if (finalAutoPartitioningEnabled) {
1208+
if (needHandleAutoPartitioning) {
12061209
if (settings.has_set_max_active_partitions()) {
12071210
pqTabletConfig->MutablePartitionStrategy()->SetMaxPartitionCount(settings.set_max_active_partitions());
12081211
}
@@ -1238,7 +1241,7 @@ namespace NKikimr::NGRpcProxy::V1 {
12381241
}
12391242
}
12401243

1241-
if (finalAutoPartitioningEnabled) {
1244+
if (needHandleAutoPartitioning) {
12421245
auto code = ValidatePartitionStrategy(*pqTabletConfig, error);
12431246
if (code) return code->YdbCode;
12441247
}

0 commit comments

Comments
 (0)