Skip to content

Commit f360d3b

Browse files
committed
Forbid autoscaling with retention by size
1 parent 4878a53 commit f360d3b

File tree

2 files changed

+75
-41
lines changed

2 files changed

+75
-41
lines changed

ydb/core/persqueue/ut/autoscaling_ut.cpp

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,36 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
571571
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetThresholdTime().Seconds(), alterThreshold);
572572
}
573573

574+
Y_UNIT_TEST(ControlPlane_AutoscalingWithStorageSizeRetention) {
575+
auto autoscalingTestTopic = "autoscalit-topic";
576+
TTopicSdkTestSetup setup = CreateSetup();
577+
TTopicClient client = setup.MakeClient();
578+
579+
TCreateTopicSettings createSettings;
580+
createSettings
581+
.RetentionStorageMb(1024)
582+
.BeginConfigurePartitioningSettings()
583+
.BeginConfigureAutoscalingSettings()
584+
.Strategy(EAutoscalingStrategy::ScaleUp)
585+
.EndConfigureAutoscalingSettings()
586+
.EndConfigurePartitioningSettings();
587+
auto result = client.CreateTopic(autoscalingTestTopic, createSettings).GetValueSync();
588+
589+
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::BAD_REQUEST);
590+
591+
createSettings.RetentionStorageMb(0);
592+
result = client.CreateTopic(autoscalingTestTopic, createSettings).GetValueSync();
593+
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::SUCCESS);
594+
595+
TAlterTopicSettings alterSettings;
596+
alterSettings
597+
.SetRetentionStorageMb(1024);
598+
599+
result = client.AlterTopic(autoscalingTestTopic, alterSettings).GetValueSync();
600+
601+
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::BAD_REQUEST);
602+
}
603+
574604
Y_UNIT_TEST(PartitionSplit_AutosplitByLoad) {
575605
TTopicSdkTestSetup setup = CreateSetup();
576606
TTopicClient client = setup.MakeClient();

ydb/services/lib/actors/pq_schema_actor.cpp

Lines changed: 45 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -717,6 +717,10 @@ namespace NKikimr::NGRpcProxy::V1 {
717717
error = TStringBuilder() << "Partition scale threshold time must be greater then 1 second, provided " << strategy.GetScaleThresholdSeconds() << " seconds";
718718
return TYdbPqCodes(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::VALIDATION_ERROR);
719719
}
720+
if (strategy.GetPartitionStrategyType() != ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED && config.GetPartitionConfig().HasStorageLimitBytes()) {
721+
error = TStringBuilder() << "Partitions autoscaling is incompatible with retention storage bytes option";
722+
return TYdbPqCodes(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::VALIDATION_ERROR);
723+
}
720724

721725
return std::nullopt;
722726
}
@@ -736,6 +740,29 @@ namespace NKikimr::NGRpcProxy::V1 {
736740
auto minParts = 1;
737741
auto* pqTabletConfig = pqDescr->MutablePQTabletConfig();
738742
auto partConfig = pqTabletConfig->MutablePartitionConfig();
743+
744+
switch (settings.retention_case()) {
745+
case Ydb::PersQueue::V1::TopicSettings::kRetentionPeriodMs: {
746+
partConfig->SetLifetimeSeconds(Max(settings.retention_period_ms() / 1000ll, 1ll));
747+
}
748+
break;
749+
750+
case Ydb::PersQueue::V1::TopicSettings::kRetentionStorageBytes: {
751+
if (settings.retention_storage_bytes() <= 0) {
752+
error = TStringBuilder() << "retention_storage_bytes must be positive, provided " <<
753+
settings.retention_storage_bytes();
754+
return Ydb::StatusIds::BAD_REQUEST;
755+
}
756+
partConfig->SetStorageLimitBytes(settings.retention_storage_bytes());
757+
}
758+
break;
759+
760+
default: {
761+
error = TStringBuilder() << "retention_storage_bytes or retention_period_ms should be set";
762+
return Ydb::StatusIds::BAD_REQUEST;
763+
}
764+
}
765+
739766
if (settings.has_partitions_count()) {
740767
if (settings.partitions_count() > 0) {
741768
minParts = settings.partitions_count();
@@ -816,28 +843,6 @@ namespace NKikimr::NGRpcProxy::V1 {
816843
partConfig->SetMaxSizeInPartition(settings.max_partition_storage_size() ? settings.max_partition_storage_size() : Max<i64>());
817844
partConfig->SetMaxCountInPartition(Max<i32>());
818845

819-
switch (settings.retention_case()) {
820-
case Ydb::PersQueue::V1::TopicSettings::kRetentionPeriodMs: {
821-
partConfig->SetLifetimeSeconds(Max(settings.retention_period_ms() / 1000ll, 1ll));
822-
}
823-
break;
824-
825-
case Ydb::PersQueue::V1::TopicSettings::kRetentionStorageBytes: {
826-
if (settings.retention_storage_bytes() <= 0) {
827-
error = TStringBuilder() << "retention_storage_bytes must be positive, provided " <<
828-
settings.retention_storage_bytes();
829-
return Ydb::StatusIds::BAD_REQUEST;
830-
}
831-
partConfig->SetStorageLimitBytes(settings.retention_storage_bytes());
832-
}
833-
break;
834-
835-
default: {
836-
error = TStringBuilder() << "retention_storage_bytes or retention_period_ms should be set";
837-
return Ydb::StatusIds::BAD_REQUEST;
838-
}
839-
}
840-
841846
if (settings.message_group_seqno_retention_period_ms() > 0 && settings.message_group_seqno_retention_period_ms() < settings.retention_period_ms()) {
842847
error = TStringBuilder() << "message_group_seqno_retention_period_ms (provided " << settings.message_group_seqno_retention_period_ms() << ") must be more then retention_period_ms (provided " << settings.retention_period_ms() << ")";
843848
return Ydb::StatusIds::BAD_REQUEST;
@@ -1077,6 +1082,10 @@ namespace NKikimr::NGRpcProxy::V1 {
10771082

10781083
auto pqTabletConfig = pqDescr->MutablePQTabletConfig();
10791084
auto partConfig = pqTabletConfig->MutablePartitionConfig();
1085+
1086+
if (request.retention_storage_mb())
1087+
partConfig->SetStorageLimitBytes(request.retention_storage_mb() * 1024 * 1024);
1088+
10801089
if (request.has_partitioning_settings()) {
10811090
const auto& settings = request.partitioning_settings();
10821091
if (settings.min_active_partitions() > 0) {
@@ -1162,9 +1171,6 @@ namespace NKikimr::NGRpcProxy::V1 {
11621171
partConfig->SetLifetimeSeconds(TDuration::Days(1).Seconds());
11631172
}
11641173

1165-
if (request.retention_storage_mb())
1166-
partConfig->SetStorageLimitBytes(request.retention_storage_mb() * 1024 * 1024);
1167-
11681174
if (local) {
11691175
auto partSpeed = request.partition_write_speed_bytes_per_second();
11701176
if (partSpeed == 0) {
@@ -1237,9 +1243,16 @@ namespace NKikimr::NGRpcProxy::V1 {
12371243
auto pqTabletConfig = pqDescr.MutablePQTabletConfig();
12381244
NPQ::Migrate(*pqTabletConfig);
12391245
auto partConfig = pqTabletConfig->MutablePartitionConfig();
1240-
if (request.has_alter_partitioning_settings()) {
1241-
auto splitMergeFeatureEnabled = AppData(ctx)->FeatureFlags.GetEnableTopicSplitMerge();
1246+
auto splitMergeFeatureEnabled = AppData(ctx)->FeatureFlags.GetEnableTopicSplitMerge();
12421247

1248+
if (request.has_set_retention_storage_mb()) {
1249+
CHECK_CDC;
1250+
partConfig->ClearStorageLimitBytes();
1251+
if (request.set_retention_storage_mb())
1252+
partConfig->SetStorageLimitBytes(request.set_retention_storage_mb() * 1024 * 1024);
1253+
}
1254+
1255+
if (request.has_alter_partitioning_settings()) {
12431256
const auto& settings = request.alter_partitioning_settings();
12441257
if (settings.has_set_min_active_partitions()) {
12451258
auto minParts = IfEqualThenDefault(settings.set_min_active_partitions(), 0L, 1L);
@@ -1248,6 +1261,7 @@ namespace NKikimr::NGRpcProxy::V1 {
12481261
pqTabletConfig->MutablePartitionStrategy()->SetMinPartitionCount(minParts);
12491262
}
12501263
}
1264+
12511265
if (splitMergeFeatureEnabled) {
12521266
if (settings.has_set_max_active_partitions()) {
12531267
pqTabletConfig->MutablePartitionStrategy()->SetMaxPartitionCount(settings.set_max_active_partitions());
@@ -1278,11 +1292,12 @@ namespace NKikimr::NGRpcProxy::V1 {
12781292
}
12791293
}
12801294
}
1281-
if (auto code = ValidatePartitionStrategy(*pqTabletConfig, error); code) {
1282-
return code->YdbCode;
1283-
}
12841295
}
1296+
}
12851297

1298+
if (splitMergeFeatureEnabled) {
1299+
auto code = ValidatePartitionStrategy(*pqTabletConfig, error);
1300+
if (code) return code->YdbCode;
12861301
}
12871302

12881303
if (request.alter_attributes().size()) {
@@ -1299,14 +1314,6 @@ namespace NKikimr::NGRpcProxy::V1 {
12991314
partConfig->SetLifetimeSeconds(request.set_retention_period().seconds());
13001315
}
13011316

1302-
1303-
if (request.has_set_retention_storage_mb()) {
1304-
CHECK_CDC;
1305-
partConfig->ClearStorageLimitBytes();
1306-
if (request.set_retention_storage_mb())
1307-
partConfig->SetStorageLimitBytes(request.set_retention_storage_mb() * 1024 * 1024);
1308-
}
1309-
13101317
bool local = true; //todo: check locality
13111318
if (local || pqConfig.GetTopicsAreFirstClassCitizen()) {
13121319
if (request.has_set_partition_write_speed_bytes_per_second()) {
@@ -1434,7 +1441,4 @@ namespace NKikimr::NGRpcProxy::V1 {
14341441

14351442
return CheckConfig(*pqTabletConfig, supportedClientServiceTypes, error, ctx, Ydb::StatusIds::ALREADY_EXISTS);
14361443
}
1437-
1438-
1439-
14401444
}

0 commit comments

Comments
 (0)