Skip to content

Commit 429745b

Browse files
niksavelievuzhastik
authored andcommitted
Topics alter fix to 24-3 (#9755)
1 parent 42672e6 commit 429745b

File tree

5 files changed

+198
-14
lines changed

5 files changed

+198
-14
lines changed

ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -727,6 +727,43 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
727727
}
728728
}
729729

730+
Y_UNIT_TEST(ControlPlane_BackCompatibility) {
731+
auto topicName = "back-compatibility-test";
732+
733+
TTopicSdkTestSetup setup = CreateSetup();
734+
TTopicClient client = setup.MakeClient();
735+
736+
{
737+
TCreateTopicSettings createSettings;
738+
createSettings
739+
.BeginConfigurePartitioningSettings()
740+
.MinActivePartitions(3)
741+
.EndConfigurePartitioningSettings();
742+
client.CreateTopic(topicName, createSettings).Wait();
743+
}
744+
745+
{
746+
auto describeAfterAlter = client.DescribeTopic(topicName).GetValueSync();
747+
748+
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), 3);
749+
}
750+
751+
{
752+
TAlterTopicSettings alterSettings;
753+
alterSettings
754+
.BeginAlterPartitioningSettings()
755+
.MinActivePartitions(5)
756+
.EndAlterTopicPartitioningSettings();
757+
client.AlterTopic(topicName, alterSettings).Wait();
758+
}
759+
760+
{
761+
auto describeAfterAlter = client.DescribeTopic(topicName).GetValueSync();
762+
763+
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), 5);
764+
}
765+
}
766+
730767
Y_UNIT_TEST(ControlPlane_PauseAutoPartitioning) {
731768
auto topicName = "autoscalit-topic";
732769

ydb/services/datastreams/datastreams_proxy.cpp

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -471,14 +471,7 @@ namespace NKikimr::NDataStreams::V1 {
471471
Y_UNUSED(selfInfo);
472472

473473
TString error;
474-
if (!GetProtoRequest()->has_partitioning_settings()) {
475-
if (!ValidateShardsCount(*GetProtoRequest(), pqGroupDescription, error))
476-
{
477-
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::BAD_REQUEST), error);
478-
}
479474

480-
groupConfig.SetTotalGroupCount(GetProtoRequest()->target_shard_count());
481-
}
482475
switch (GetProtoRequest()->retention_case()) {
483476
case Ydb::DataStreams::V1::UpdateStreamRequest::RetentionCase::kRetentionPeriodHours:
484477
groupConfig.MutablePQTabletConfig()->MutablePartitionConfig()->SetLifetimeSeconds(
@@ -520,7 +513,19 @@ namespace NKikimr::NDataStreams::V1 {
520513
}
521514
}
522515

523-
if (GetProtoRequest()->has_partitioning_settings()) {
516+
if (!GetProtoRequest()->has_partitioning_settings() ||
517+
(GetProtoRequest()->partitioning_settings().has_auto_partitioning_settings() &&
518+
(GetProtoRequest()->partitioning_settings().auto_partitioning_settings().strategy() == Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED) ||
519+
(GetProtoRequest()->partitioning_settings().auto_partitioning_settings().strategy() == Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_UNSPECIFIED))) {
520+
521+
if (!ValidateShardsCount(*GetProtoRequest(), pqGroupDescription, error))
522+
{
523+
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::BAD_REQUEST), error);
524+
}
525+
526+
groupConfig.SetTotalGroupCount(GetProtoRequest()->target_shard_count());
527+
528+
} else {
524529
auto r = ValidatePartitioningSettings(GetProtoRequest()->partitioning_settings());
525530
if (!r.empty()) {
526531
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), r);

ydb/services/datastreams/datastreams_ut.cpp

Lines changed: 121 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2706,7 +2706,6 @@ Y_UNIT_TEST_SUITE(DataStreams) {
27062706

27072707
TString streamName = "test-topic";
27082708
TString streamName2 = "test-topic-2";
2709-
27102709
{
27112710
NYdb::NTopic::TTopicClient pqClient(*testServer.Driver);
27122711
auto settings = NYdb::NTopic::TCreateTopicSettings()
@@ -2807,6 +2806,127 @@ Y_UNIT_TEST_SUITE(DataStreams) {
28072806
UNIT_ASSERT_VALUES_EQUAL(description.shards(4).parent_shard_id(), "shard-000001");
28082807
}
28092808

2809+
auto streamForAlterTest = "stream-alter-test";
2810+
{
2811+
auto result = testServer.DataStreamsClient->CreateStream(streamForAlterTest,
2812+
NYDS_V1::TCreateStreamSettings()
2813+
.ShardCount(3)
2814+
).ExtractValueSync();
2815+
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
2816+
if (result.GetStatus() != EStatus::SUCCESS) {
2817+
result.GetIssues().PrintTo(Cerr);
2818+
}
2819+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2820+
}
2821+
2822+
{
2823+
auto result = testServer.DataStreamsClient->DescribeStream(streamForAlterTest).ExtractValueSync();
2824+
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
2825+
Cerr << result.GetIssues().ToString() << "\n";
2826+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2827+
2828+
auto& d = result.GetResult().stream_description();
2829+
UNIT_ASSERT_VALUES_EQUAL(d.shards().size(), 3);
2830+
UNIT_ASSERT_VALUES_EQUAL(d.stream_status(), YDS_V1::StreamDescription::ACTIVE);
2831+
UNIT_ASSERT_VALUES_EQUAL(d.stream_name(), streamForAlterTest);
2832+
UNIT_ASSERT_VALUES_EQUAL(d.stream_arn(), streamForAlterTest);
2833+
2834+
UNIT_ASSERT_VALUES_EQUAL(d.partitioning_settings().auto_partitioning_settings().strategy(), ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED);
2835+
}
2836+
2837+
{
2838+
auto result = testServer.DataStreamsClient->UpdateStream(streamForAlterTest,
2839+
NYDS_V1::TUpdateStreamSettings()
2840+
.TargetShardCount(5)
2841+
.BeginConfigurePartitioningSettings()
2842+
.BeginConfigureAutoPartitioningSettings()
2843+
.Strategy(NYdb::NDataStreams::V1::EAutoPartitioningStrategy::Disabled)
2844+
.EndConfigureAutoPartitioningSettings()
2845+
.EndConfigurePartitioningSettings()
2846+
).ExtractValueSync();
2847+
2848+
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
2849+
if (result.GetStatus() != EStatus::SUCCESS) {
2850+
result.GetIssues().PrintTo(Cerr);
2851+
}
2852+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2853+
}
2854+
2855+
{
2856+
auto result = testServer.DataStreamsClient->DescribeStream(streamForAlterTest).ExtractValueSync();
2857+
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
2858+
Cerr << result.GetIssues().ToString() << "\n";
2859+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2860+
2861+
auto& d = result.GetResult().stream_description();
2862+
UNIT_ASSERT_VALUES_EQUAL(d.shards().size(), 5);
2863+
UNIT_ASSERT_VALUES_EQUAL(d.stream_status(), YDS_V1::StreamDescription::ACTIVE);
2864+
UNIT_ASSERT_VALUES_EQUAL(d.stream_name(), streamForAlterTest);
2865+
UNIT_ASSERT_VALUES_EQUAL(d.stream_arn(), streamForAlterTest);
2866+
2867+
UNIT_ASSERT_VALUES_EQUAL(d.partitioning_settings().auto_partitioning_settings().strategy(), ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED);
2868+
}
2869+
2870+
{
2871+
auto result = testServer.DataStreamsClient->UpdateStream(streamForAlterTest,
2872+
NYDS_V1::TUpdateStreamSettings()
2873+
.TargetShardCount(10)
2874+
).ExtractValueSync();
2875+
2876+
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
2877+
if (result.GetStatus() != EStatus::SUCCESS) {
2878+
result.GetIssues().PrintTo(Cerr);
2879+
}
2880+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2881+
}
2882+
2883+
{
2884+
auto result = testServer.DataStreamsClient->DescribeStream(streamForAlterTest).ExtractValueSync();
2885+
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
2886+
Cerr << result.GetIssues().ToString() << "\n";
2887+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2888+
2889+
auto& d = result.GetResult().stream_description();
2890+
UNIT_ASSERT_VALUES_EQUAL(d.shards().size(), 10);
2891+
UNIT_ASSERT_VALUES_EQUAL(d.stream_status(), YDS_V1::StreamDescription::ACTIVE);
2892+
UNIT_ASSERT_VALUES_EQUAL(d.stream_name(), streamForAlterTest);
2893+
UNIT_ASSERT_VALUES_EQUAL(d.stream_arn(), streamForAlterTest);
2894+
2895+
UNIT_ASSERT_VALUES_EQUAL(d.partitioning_settings().auto_partitioning_settings().strategy(), ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED);
2896+
}
2897+
2898+
{
2899+
auto result = testServer.DataStreamsClient->UpdateStream(streamForAlterTest,
2900+
NYDS_V1::TUpdateStreamSettings()
2901+
.TargetShardCount(15)
2902+
.BeginConfigurePartitioningSettings()
2903+
.BeginConfigureAutoPartitioningSettings()
2904+
.EndConfigureAutoPartitioningSettings()
2905+
.EndConfigurePartitioningSettings()
2906+
).ExtractValueSync();
2907+
2908+
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
2909+
if (result.GetStatus() != EStatus::SUCCESS) {
2910+
result.GetIssues().PrintTo(Cerr);
2911+
}
2912+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2913+
}
2914+
2915+
{
2916+
auto result = testServer.DataStreamsClient->DescribeStream(streamForAlterTest).ExtractValueSync();
2917+
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
2918+
Cerr << result.GetIssues().ToString() << "\n";
2919+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2920+
2921+
auto& d = result.GetResult().stream_description();
2922+
UNIT_ASSERT_VALUES_EQUAL(d.shards().size(), 15);
2923+
UNIT_ASSERT_VALUES_EQUAL(d.stream_status(), YDS_V1::StreamDescription::ACTIVE);
2924+
UNIT_ASSERT_VALUES_EQUAL(d.stream_name(), streamForAlterTest);
2925+
UNIT_ASSERT_VALUES_EQUAL(d.stream_arn(), streamForAlterTest);
2926+
2927+
UNIT_ASSERT_VALUES_EQUAL(d.partitioning_settings().auto_partitioning_settings().strategy(), ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED);
2928+
}
2929+
28102930
{
28112931
auto result = testServer.DataStreamsClient->CreateStream(streamName2,
28122932
NYDS_V1::TCreateStreamSettings()

ydb/services/lib/actors/pq_schema_actor.cpp

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1254,7 +1254,29 @@ namespace NKikimr::NGRpcProxy::V1 {
12541254
auto pqTabletConfig = pqDescr.MutablePQTabletConfig();
12551255
NPQ::Migrate(*pqTabletConfig);
12561256
auto partConfig = pqTabletConfig->MutablePartitionConfig();
1257-
auto splitMergeFeatureEnabled = appData->FeatureFlags.GetEnableTopicSplitMerge();
1257+
1258+
auto needHandleAutoPartitioning = false;
1259+
if (appData->FeatureFlags.GetEnableTopicSplitMerge()) {
1260+
1261+
auto reqHasAutoPartitioningStrategyChange = request.has_alter_partitioning_settings() &&
1262+
request.alter_partitioning_settings().has_alter_auto_partitioning_settings() &&
1263+
request.alter_partitioning_settings().alter_auto_partitioning_settings().has_set_strategy();
1264+
1265+
auto pqConfigHasAutoPartitioningStrategy = pqTabletConfig->HasPartitionStrategy() &&
1266+
pqTabletConfig->GetPartitionStrategy().HasPartitionStrategyType() &&
1267+
pqTabletConfig->GetPartitionStrategy().GetPartitionStrategyType();
1268+
1269+
if (pqConfigHasAutoPartitioningStrategy && pqTabletConfig->GetPartitionStrategy().GetPartitionStrategyType() != ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED) {
1270+
needHandleAutoPartitioning = true;
1271+
} else if (reqHasAutoPartitioningStrategyChange) {
1272+
auto strategy = request.alter_partitioning_settings().alter_auto_partitioning_settings().set_strategy();
1273+
needHandleAutoPartitioning = strategy == ::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_PAUSED ||
1274+
strategy == ::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP ||
1275+
strategy == ::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN;
1276+
}
1277+
1278+
}
1279+
12581280

12591281
if (request.has_set_retention_storage_mb()) {
12601282
CHECK_CDC;
@@ -1268,12 +1290,12 @@ namespace NKikimr::NGRpcProxy::V1 {
12681290
if (settings.has_set_min_active_partitions()) {
12691291
auto minParts = IfEqualThenDefault<i64>(settings.set_min_active_partitions(), 0L, 1L);
12701292
pqDescr.SetTotalGroupCount(minParts);
1271-
if (splitMergeFeatureEnabled) {
1293+
if (needHandleAutoPartitioning) {
12721294
pqTabletConfig->MutablePartitionStrategy()->SetMinPartitionCount(minParts);
12731295
}
12741296
}
12751297

1276-
if (splitMergeFeatureEnabled) {
1298+
if (needHandleAutoPartitioning) {
12771299
if (settings.has_set_max_active_partitions()) {
12781300
pqTabletConfig->MutablePartitionStrategy()->SetMaxPartitionCount(settings.set_max_active_partitions());
12791301
}
@@ -1309,7 +1331,7 @@ namespace NKikimr::NGRpcProxy::V1 {
13091331
}
13101332
}
13111333

1312-
if (splitMergeFeatureEnabled) {
1334+
if (needHandleAutoPartitioning) {
13131335
auto code = ValidatePartitionStrategy(*pqTabletConfig, error);
13141336
if (code) return code->YdbCode;
13151337
}

ydb/services/persqueue_v1/actors/schema_actors.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1094,7 +1094,7 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv
10941094
}
10951095

10961096
const auto &config = pqDescr.GetPQTabletConfig();
1097-
if (AppData(TActivationContext::ActorContextFor(SelfId()))->FeatureFlags.GetEnableTopicSplitMerge()) {
1097+
if (AppData(TActivationContext::ActorContextFor(SelfId()))->FeatureFlags.GetEnableTopicSplitMerge() && NPQ::SplitMergeEnabled(config)) {
10981098
Result.mutable_partitioning_settings()->set_min_active_partitions(config.GetPartitionStrategy().GetMinPartitionCount());
10991099
} else {
11001100
Result.mutable_partitioning_settings()->set_min_active_partitions(pqDescr.GetTotalGroupCount());

0 commit comments

Comments
 (0)