diff --git a/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp index d363acca852b..87747e44d3a3 100644 --- a/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp +++ b/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp @@ -757,6 +757,43 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { } } + Y_UNIT_TEST(ControlPlane_BackCompatibility) { + auto topicName = "back-compatibility-test"; + + TTopicSdkTestSetup setup = CreateSetup(); + TTopicClient client = setup.MakeClient(); + + { + TCreateTopicSettings createSettings; + createSettings + .BeginConfigurePartitioningSettings() + .MinActivePartitions(3) + .EndConfigurePartitioningSettings(); + client.CreateTopic(topicName, createSettings).Wait(); + } + + { + auto describeAfterAlter = client.DescribeTopic(topicName).GetValueSync(); + + UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), 3); + } + + { + TAlterTopicSettings alterSettings; + alterSettings + .BeginAlterPartitioningSettings() + .MinActivePartitions(5) + .EndAlterTopicPartitioningSettings(); + client.AlterTopic(topicName, alterSettings).Wait(); + } + + { + auto describeAfterAlter = client.DescribeTopic(topicName).GetValueSync(); + + UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), 5); + } + } + Y_UNIT_TEST(ControlPlane_PauseAutoPartitioning) { auto topicName = "autoscalit-topic"; diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp index 48277a3c5ff0..4a92862a0c7c 100644 --- a/ydb/services/datastreams/datastreams_proxy.cpp +++ b/ydb/services/datastreams/datastreams_proxy.cpp @@ -471,14 +471,7 @@ namespace NKikimr::NDataStreams::V1 { Y_UNUSED(selfInfo); TString error; - if (!GetProtoRequest()->has_partitioning_settings()) { - if (!ValidateShardsCount(*GetProtoRequest(), pqGroupDescription, error)) - { - return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast(NYds::EErrorCodes::BAD_REQUEST), error); - } - groupConfig.SetTotalGroupCount(GetProtoRequest()->target_shard_count()); - } switch (GetProtoRequest()->retention_case()) { case Ydb::DataStreams::V1::UpdateStreamRequest::RetentionCase::kRetentionPeriodHours: groupConfig.MutablePQTabletConfig()->MutablePartitionConfig()->SetLifetimeSeconds( @@ -520,7 +513,19 @@ namespace NKikimr::NDataStreams::V1 { } } - if (GetProtoRequest()->has_partitioning_settings()) { + if (!GetProtoRequest()->has_partitioning_settings() || + (GetProtoRequest()->partitioning_settings().has_auto_partitioning_settings() && + (GetProtoRequest()->partitioning_settings().auto_partitioning_settings().strategy() == Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED) || + (GetProtoRequest()->partitioning_settings().auto_partitioning_settings().strategy() == Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_UNSPECIFIED))) { + + if (!ValidateShardsCount(*GetProtoRequest(), pqGroupDescription, error)) + { + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast(NYds::EErrorCodes::BAD_REQUEST), error); + } + + groupConfig.SetTotalGroupCount(GetProtoRequest()->target_shard_count()); + + } else { auto r = ValidatePartitioningSettings(GetProtoRequest()->partitioning_settings()); if (!r.empty()) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast(NYds::EErrorCodes::INVALID_ARGUMENT), r); diff --git a/ydb/services/datastreams/datastreams_ut.cpp b/ydb/services/datastreams/datastreams_ut.cpp index 7d978c7515fc..4a7ae905c95b 100644 --- a/ydb/services/datastreams/datastreams_ut.cpp +++ b/ydb/services/datastreams/datastreams_ut.cpp @@ -2706,7 +2706,6 @@ Y_UNIT_TEST_SUITE(DataStreams) { TString streamName = "test-topic"; TString streamName2 = "test-topic-2"; - { NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); auto settings = NYdb::NTopic::TCreateTopicSettings() @@ -2807,6 +2806,127 @@ Y_UNIT_TEST_SUITE(DataStreams) { UNIT_ASSERT_VALUES_EQUAL(description.shards(4).parent_shard_id(), "shard-000001"); } + auto streamForAlterTest = "stream-alter-test"; + { + auto result = testServer.DataStreamsClient->CreateStream(streamForAlterTest, + NYDS_V1::TCreateStreamSettings() + .ShardCount(3) + ).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + if (result.GetStatus() != EStatus::SUCCESS) { + result.GetIssues().PrintTo(Cerr); + } + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + auto result = testServer.DataStreamsClient->DescribeStream(streamForAlterTest).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + Cerr << result.GetIssues().ToString() << "\n"; + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto& d = result.GetResult().stream_description(); + UNIT_ASSERT_VALUES_EQUAL(d.shards().size(), 3); + UNIT_ASSERT_VALUES_EQUAL(d.stream_status(), YDS_V1::StreamDescription::ACTIVE); + UNIT_ASSERT_VALUES_EQUAL(d.stream_name(), streamForAlterTest); + UNIT_ASSERT_VALUES_EQUAL(d.stream_arn(), streamForAlterTest); + + UNIT_ASSERT_VALUES_EQUAL(d.partitioning_settings().auto_partitioning_settings().strategy(), ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED); + } + + { + auto result = testServer.DataStreamsClient->UpdateStream(streamForAlterTest, + NYDS_V1::TUpdateStreamSettings() + .TargetShardCount(5) + .BeginConfigurePartitioningSettings() + .BeginConfigureAutoPartitioningSettings() + .Strategy(NYdb::NDataStreams::V1::EAutoPartitioningStrategy::Disabled) + .EndConfigureAutoPartitioningSettings() + .EndConfigurePartitioningSettings() + ).ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + if (result.GetStatus() != EStatus::SUCCESS) { + result.GetIssues().PrintTo(Cerr); + } + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + auto result = testServer.DataStreamsClient->DescribeStream(streamForAlterTest).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + Cerr << result.GetIssues().ToString() << "\n"; + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto& d = result.GetResult().stream_description(); + UNIT_ASSERT_VALUES_EQUAL(d.shards().size(), 5); + UNIT_ASSERT_VALUES_EQUAL(d.stream_status(), YDS_V1::StreamDescription::ACTIVE); + UNIT_ASSERT_VALUES_EQUAL(d.stream_name(), streamForAlterTest); + UNIT_ASSERT_VALUES_EQUAL(d.stream_arn(), streamForAlterTest); + + UNIT_ASSERT_VALUES_EQUAL(d.partitioning_settings().auto_partitioning_settings().strategy(), ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED); + } + + { + auto result = testServer.DataStreamsClient->UpdateStream(streamForAlterTest, + NYDS_V1::TUpdateStreamSettings() + .TargetShardCount(10) + ).ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + if (result.GetStatus() != EStatus::SUCCESS) { + result.GetIssues().PrintTo(Cerr); + } + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + auto result = testServer.DataStreamsClient->DescribeStream(streamForAlterTest).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + Cerr << result.GetIssues().ToString() << "\n"; + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto& d = result.GetResult().stream_description(); + UNIT_ASSERT_VALUES_EQUAL(d.shards().size(), 10); + UNIT_ASSERT_VALUES_EQUAL(d.stream_status(), YDS_V1::StreamDescription::ACTIVE); + UNIT_ASSERT_VALUES_EQUAL(d.stream_name(), streamForAlterTest); + UNIT_ASSERT_VALUES_EQUAL(d.stream_arn(), streamForAlterTest); + + UNIT_ASSERT_VALUES_EQUAL(d.partitioning_settings().auto_partitioning_settings().strategy(), ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED); + } + + { + auto result = testServer.DataStreamsClient->UpdateStream(streamForAlterTest, + NYDS_V1::TUpdateStreamSettings() + .TargetShardCount(15) + .BeginConfigurePartitioningSettings() + .BeginConfigureAutoPartitioningSettings() + .EndConfigureAutoPartitioningSettings() + .EndConfigurePartitioningSettings() + ).ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + if (result.GetStatus() != EStatus::SUCCESS) { + result.GetIssues().PrintTo(Cerr); + } + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + auto result = testServer.DataStreamsClient->DescribeStream(streamForAlterTest).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + Cerr << result.GetIssues().ToString() << "\n"; + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto& d = result.GetResult().stream_description(); + UNIT_ASSERT_VALUES_EQUAL(d.shards().size(), 15); + UNIT_ASSERT_VALUES_EQUAL(d.stream_status(), YDS_V1::StreamDescription::ACTIVE); + UNIT_ASSERT_VALUES_EQUAL(d.stream_name(), streamForAlterTest); + UNIT_ASSERT_VALUES_EQUAL(d.stream_arn(), streamForAlterTest); + + UNIT_ASSERT_VALUES_EQUAL(d.partitioning_settings().auto_partitioning_settings().strategy(), ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED); + } + { auto result = testServer.DataStreamsClient->CreateStream(streamName2, NYDS_V1::TCreateStreamSettings() diff --git a/ydb/services/lib/actors/pq_schema_actor.cpp b/ydb/services/lib/actors/pq_schema_actor.cpp index 05a2275b58e4..bbb641ac5625 100644 --- a/ydb/services/lib/actors/pq_schema_actor.cpp +++ b/ydb/services/lib/actors/pq_schema_actor.cpp @@ -1164,7 +1164,29 @@ namespace NKikimr::NGRpcProxy::V1 { auto pqTabletConfig = pqDescr.MutablePQTabletConfig(); NPQ::Migrate(*pqTabletConfig); auto partConfig = pqTabletConfig->MutablePartitionConfig(); - auto splitMergeFeatureEnabled = appData->FeatureFlags.GetEnableTopicSplitMerge(); + + auto needHandleAutoPartitioning = false; + if (appData->FeatureFlags.GetEnableTopicSplitMerge()) { + + auto reqHasAutoPartitioningStrategyChange = request.has_alter_partitioning_settings() && + request.alter_partitioning_settings().has_alter_auto_partitioning_settings() && + request.alter_partitioning_settings().alter_auto_partitioning_settings().has_set_strategy(); + + auto pqConfigHasAutoPartitioningStrategy = pqTabletConfig->HasPartitionStrategy() && + pqTabletConfig->GetPartitionStrategy().HasPartitionStrategyType() && + pqTabletConfig->GetPartitionStrategy().GetPartitionStrategyType(); + + if (pqConfigHasAutoPartitioningStrategy && pqTabletConfig->GetPartitionStrategy().GetPartitionStrategyType() != ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED) { + needHandleAutoPartitioning = true; + } else if (reqHasAutoPartitioningStrategyChange) { + auto strategy = request.alter_partitioning_settings().alter_auto_partitioning_settings().set_strategy(); + needHandleAutoPartitioning = strategy == ::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_PAUSED || + strategy == ::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP || + strategy == ::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN; + } + + } + if (request.has_set_retention_storage_mb()) { CHECK_CDC; @@ -1178,12 +1200,12 @@ namespace NKikimr::NGRpcProxy::V1 { if (settings.has_set_min_active_partitions()) { auto minParts = IfEqualThenDefault(settings.set_min_active_partitions(), 0L, 1L); pqDescr.SetTotalGroupCount(minParts); - if (splitMergeFeatureEnabled) { + if (needHandleAutoPartitioning) { pqTabletConfig->MutablePartitionStrategy()->SetMinPartitionCount(minParts); } } - if (splitMergeFeatureEnabled) { + if (needHandleAutoPartitioning) { if (settings.has_set_max_active_partitions()) { pqTabletConfig->MutablePartitionStrategy()->SetMaxPartitionCount(settings.set_max_active_partitions()); } @@ -1219,7 +1241,7 @@ namespace NKikimr::NGRpcProxy::V1 { } } - if (splitMergeFeatureEnabled) { + if (needHandleAutoPartitioning) { auto code = ValidatePartitionStrategy(*pqTabletConfig, error); if (code) return code->YdbCode; } diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index c68c6fe205ad..63749dfc71a3 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -1094,7 +1094,7 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv } const auto &config = pqDescr.GetPQTabletConfig(); - if (AppData(TActivationContext::ActorContextFor(SelfId()))->FeatureFlags.GetEnableTopicSplitMerge()) { + if (AppData(TActivationContext::ActorContextFor(SelfId()))->FeatureFlags.GetEnableTopicSplitMerge() && NPQ::SplitMergeEnabled(config)) { Result.mutable_partitioning_settings()->set_min_active_partitions(config.GetPartitionStrategy().GetMinPartitionCount()); } else { Result.mutable_partitioning_settings()->set_min_active_partitions(pqDescr.GetTotalGroupCount());