Skip to content

Commit d8d22ec

Browse files
authored
Enable/disable autoscaling for the topic (#5792)
1 parent 789f39c commit d8d22ec

12 files changed

+338
-64
lines changed

ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp

+65-14
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
3737

3838
void SimpleTest(bool autoscaleAwareSDK) {
3939
TTopicSdkTestSetup setup = CreateSetup();
40-
setup.CreateTopic();
40+
setup.CreateTopicWithAutoscale();
4141

4242
TTopicClient client = setup.MakeClient();
4343

@@ -79,7 +79,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
7979

8080
void ReadingAfterSplitTest(bool autoscaleAwareSDK, bool autoCommit) {
8181
TTopicSdkTestSetup setup = CreateSetup();
82-
setup.CreateTopic();
82+
setup.CreateTopicWithAutoscale();
8383

8484
TTopicClient client = setup.MakeClient();
8585

@@ -133,7 +133,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
133133

134134
void ReadingAfterSplitTest_PreferedPartition(bool autoscaleAwareSDK) {
135135
TTopicSdkTestSetup setup = CreateSetup();
136-
setup.CreateTopic();
136+
setup.CreateTopicWithAutoscale();
137137

138138
TTopicClient client = setup.MakeClient();
139139

@@ -179,7 +179,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
179179

180180
Y_UNIT_TEST(PartitionSplit_BeforeAutoscaleAwareSDK) {
181181
TTopicSdkTestSetup setup = CreateSetup();
182-
setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 1, 100);
182+
setup.CreateTopicWithAutoscale(TEST_TOPIC, TEST_CONSUMER, 1, 100);
183183

184184
TTopicClient client = setup.MakeClient();
185185

@@ -226,7 +226,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
226226

227227
Y_UNIT_TEST(PartitionSplit_AutoscaleAwareSDK) {
228228
TTopicSdkTestSetup setup = CreateSetup();
229-
setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 1, 100);
229+
setup.CreateTopicWithAutoscale(TEST_TOPIC, TEST_CONSUMER, 1, 100);
230230

231231
TTopicClient client = setup.MakeClient();
232232

@@ -271,7 +271,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
271271

272272
void PartitionSplit_PreferedPartition(bool autoscaleAwareSDK) {
273273
TTopicSdkTestSetup setup = CreateSetup();
274-
setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 1, 100);
274+
setup.CreateTopicWithAutoscale(TEST_TOPIC, TEST_CONSUMER, 1, 100);
275275

276276
TTopicClient client = setup.MakeClient();
277277

@@ -348,7 +348,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
348348

349349
void PartitionMerge_PreferedPartition(bool autoscaleAwareSDK) {
350350
TTopicSdkTestSetup setup = CreateSetup();
351-
setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 2, 100);
351+
setup.CreateTopicWithAutoscale(TEST_TOPIC, TEST_CONSUMER, 2, 100);
352352

353353
TTopicClient client = setup.MakeClient();
354354

@@ -416,7 +416,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
416416

417417
void PartitionSplit_ReadEmptyPartitions(bool autoscaleAwareSDK) {
418418
TTopicSdkTestSetup setup = CreateSetup();
419-
setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 1, 100);
419+
setup.CreateTopicWithAutoscale(TEST_TOPIC, TEST_CONSUMER, 1, 100);
420420

421421
TTopicClient client = setup.MakeClient();
422422
TTestReadSession readSession("session-0", client, Max<size_t>(), false, {}, autoscaleAwareSDK);
@@ -441,7 +441,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
441441

442442
Y_UNIT_TEST(PartitionSplit_ReadNotEmptyPartitions_BeforeAutoscaleAwareSDK) {
443443
TTopicSdkTestSetup setup = CreateSetup();
444-
setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 1, 100);
444+
setup.CreateTopicWithAutoscale(TEST_TOPIC, TEST_CONSUMER, 1, 100);
445445

446446
TTopicClient client = setup.MakeClient();
447447
TTestReadSession readSession("Session-0", client, Max<size_t>(), false, {}, false);
@@ -468,7 +468,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
468468

469469
Y_UNIT_TEST(PartitionSplit_ReadNotEmptyPartitions_AutoscaleAwareSDK) {
470470
TTopicSdkTestSetup setup = CreateSetup();
471-
setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 1, 100);
471+
setup.CreateTopicWithAutoscale(TEST_TOPIC, TEST_CONSUMER, 1, 100);
472472

473473
TTopicClient client = setup.MakeClient();
474474
TTestReadSession readSession("Session-0", client, Max<size_t>(), false, {}, true);
@@ -489,7 +489,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
489489

490490
Y_UNIT_TEST(PartitionSplit_ManySession_BeforeAutoscaleAwareSDK) {
491491
TTopicSdkTestSetup setup = CreateSetup();
492-
setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 1, 100);
492+
setup.CreateTopicWithAutoscale(TEST_TOPIC, TEST_CONSUMER, 1, 100);
493493

494494
TTopicClient client = setup.MakeClient();
495495

@@ -521,7 +521,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
521521

522522
Y_UNIT_TEST(PartitionSplit_ManySession_AutoscaleAwareSDK) {
523523
TTopicSdkTestSetup setup = CreateSetup();
524-
setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 1, 100);
524+
setup.CreateTopicWithAutoscale(TEST_TOPIC, TEST_CONSUMER, 1, 100);
525525

526526
TTopicClient client = setup.MakeClient();
527527

@@ -566,7 +566,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
566566

567567
Y_UNIT_TEST(PartitionSplit_ManySession_existed_AutoscaleAwareSDK) {
568568
TTopicSdkTestSetup setup = CreateSetup();
569-
setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 1, 100);
569+
setup.CreateTopicWithAutoscale(TEST_TOPIC, TEST_CONSUMER, 1, 100);
570570

571571
TTopicClient client = setup.MakeClient();
572572

@@ -594,7 +594,7 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
594594

595595
Y_UNIT_TEST(CommitTopPast_BeforeAutoscaleAwareSDK) {
596596
TTopicSdkTestSetup setup = CreateSetup();
597-
setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 1, 100);
597+
setup.CreateTopicWithAutoscale(TEST_TOPIC, TEST_CONSUMER, 1, 100);
598598

599599
TTopicClient client = setup.MakeClient();
600600

@@ -692,6 +692,57 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
692692
UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetAutoscalingSettings().GetThresholdTime().Seconds(), alterThreshold);
693693
}
694694

695+
Y_UNIT_TEST(ControlPlane_DisableAutoPartitioning) {
696+
auto topicName = "autoscalit-topic";
697+
698+
TTopicSdkTestSetup setup = CreateSetup();
699+
TTopicClient client = setup.MakeClient();
700+
701+
{
702+
TCreateTopicSettings createSettings;
703+
createSettings
704+
.BeginConfigurePartitioningSettings()
705+
.MinActivePartitions(1)
706+
.MaxActivePartitions(100)
707+
.BeginConfigureAutoscalingSettings()
708+
.Strategy(EAutoscalingStrategy::ScaleUp)
709+
.EndConfigureAutoscalingSettings()
710+
.EndConfigurePartitioningSettings();
711+
client.CreateTopic(topicName, createSettings).Wait();
712+
}
713+
714+
{
715+
TAlterTopicSettings alterSettings;
716+
alterSettings
717+
.BeginAlterPartitioningSettings()
718+
.BeginAlterAutoscalingSettings()
719+
.Strategy(EAutoscalingStrategy::Disabled)
720+
.EndAlterAutoscalingSettings()
721+
.EndAlterTopicPartitioningSettings();
722+
auto f = client.AlterTopic(topicName, alterSettings);
723+
f.Wait();
724+
725+
auto v = f.GetValueSync();
726+
UNIT_ASSERT_C(!v.IsSuccess(), "Must receve error becuse max-partition is not 0");
727+
}
728+
729+
{
730+
TAlterTopicSettings alterSettings;
731+
alterSettings
732+
.BeginAlterPartitioningSettings()
733+
.MaxActivePartitions(0)
734+
.BeginAlterAutoscalingSettings()
735+
.Strategy(EAutoscalingStrategy::Disabled)
736+
.EndAlterAutoscalingSettings()
737+
.EndAlterTopicPartitioningSettings();
738+
auto f = client.AlterTopic(topicName, alterSettings);
739+
f.Wait();
740+
741+
auto v = f.GetValueSync();
742+
UNIT_ASSERT_C(v.IsSuccess(), "Error: " << v);
743+
}
744+
}
745+
695746
Y_UNIT_TEST(ControlPlane_AutoscalingWithStorageSizeRetention) {
696747
auto autoscalingTestTopic = "autoscalit-topic";
697748
TTopicSdkTestSetup setup = CreateSetup();

ydb/core/protos/pqconfig.proto

+6-6
Original file line numberDiff line numberDiff line change
@@ -409,14 +409,14 @@ message TPQTabletConfig {
409409
// Strategy for automatically changing the number of topic partitions depending on the load
410410
message TPartitionStrategy {
411411
// The minimum number of partitions that will be supported by the strategy
412-
required uint32 MinPartitionCount = 1 [default = 1];
412+
optional uint32 MinPartitionCount = 1 [default = 1];
413413
// The maximum number of partitions that will be supported by the strategy. The strategy will not create partitions if the specified
414414
// amount is reached, even if the load exceeds the current capabilities of the topic.
415-
required uint32 MaxPartitionCount = 2 [default = 1];;
416-
required uint32 ScaleThresholdSeconds = 3 [default = 300];
417-
required uint32 ScaleUpPartitionWriteSpeedThresholdPercent = 4 [default = 80];
418-
required uint32 ScaleDownPartitionWriteSpeedThresholdPercent = 5 [default = 20];
419-
required TPartitionStrategyType PartitionStrategyType = 6;
415+
optional uint32 MaxPartitionCount = 2 [default = 1];;
416+
optional uint32 ScaleThresholdSeconds = 3 [default = 300];
417+
optional uint32 ScaleUpPartitionWriteSpeedThresholdPercent = 4 [default = 80];
418+
optional uint32 ScaleDownPartitionWriteSpeedThresholdPercent = 5 [default = 20];
419+
required TPartitionStrategyType PartitionStrategyType = 6 [default = DISABLED];
420420
}
421421
optional TPartitionStrategy PartitionStrategy = 35;
422422

ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp

+73-18
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,12 @@
33
#include "schemeshard_impl.h"
44

55
#include <ydb/core/base/subdomain.h>
6-
#include <ydb/core/persqueue/config/config.h>
76
#include <ydb/core/mind/hive/hive.h>
7+
#include <ydb/core/persqueue/config/config.h>
8+
#include <ydb/core/persqueue/partition_key_range/partition_key_range.h>
9+
#include <ydb/core/persqueue/utils.h>
10+
#include <ydb/services/lib/sharding/sharding.h>
11+
812

913
namespace {
1014

@@ -55,7 +59,9 @@ class TAlterPQ: public TSubOperation {
5559
const NKikimrSchemeOp::TPersQueueGroupDescription& alter,
5660
TString& errStr)
5761
{
58-
bool splitMergeEnabled = AppData()->FeatureFlags.GetEnableTopicSplitMerge();
62+
bool splitMergeEnabled = AppData()->FeatureFlags.GetEnableTopicSplitMerge()
63+
&& NPQ::SplitMergeEnabled(*tabletConfig)
64+
&& (!alter.HasPQTabletConfig() || !alter.GetPQTabletConfig().HasPartitionStrategy() || NPQ::SplitMergeEnabled(alter.GetPQTabletConfig()));
5965

6066
TTopicInfo::TPtr params = new TTopicInfo();
6167
const bool hasKeySchema = tabletConfig->PartitionKeySchemaSize();
@@ -64,6 +70,7 @@ class TAlterPQ: public TSubOperation {
6470
errStr = "Split and merge operations disabled";
6571
return nullptr;
6672
}
73+
6774
if (alter.SplitSize()) {
6875
for (const auto& split : alter.GetSplit()) {
6976
if (!split.HasPartition()) {
@@ -105,7 +112,7 @@ class TAlterPQ: public TSubOperation {
105112
params->TotalGroupCount = totalGroupCount;
106113
}
107114
}
108-
if (alter.HasPQTabletConfig() && alter.GetPQTabletConfig().HasPartitionStrategy()) {
115+
if (alter.HasPQTabletConfig() && alter.GetPQTabletConfig().HasPartitionStrategy() && NPQ::SplitMergeEnabled(alter.GetPQTabletConfig())) {
109116
const auto strategy = alter.GetPQTabletConfig().GetPartitionStrategy();
110117
if (strategy.GetMaxPartitionCount() < strategy.GetMinPartitionCount()) {
111118
errStr = Sprintf("Invalid min and max partition count specified: %u > %u", strategy.GetMinPartitionCount(), strategy.GetMaxPartitionCount());
@@ -168,6 +175,20 @@ class TAlterPQ: public TSubOperation {
168175
return nullptr;
169176
}
170177

178+
if (alterConfig.HasPartitionStrategy() && !NPQ::SplitMergeEnabled(alterConfig)
179+
&& tabletConfig->HasPartitionStrategy() && NPQ::SplitMergeEnabled(*tabletConfig)) {
180+
if (!alterConfig.GetPartitionStrategy().HasMaxPartitionCount() || 0 != alterConfig.GetPartitionStrategy().GetMaxPartitionCount()) {
181+
errStr = TStringBuilder() << "Can`t disable autoscaling. Disabling autoscaling is a destructive operation, "
182+
<< "after which all partitions will become active and the message order guarantee will be violated. "
183+
<< "If you are sure of this, then set max_active_partitions to 0.";
184+
return nullptr;
185+
}
186+
}
187+
188+
if (!alterConfig.HasPartitionStrategy() && tabletConfig->HasPartitionStrategy()) {
189+
alterConfig.MutablePartitionStrategy()->CopyFrom(tabletConfig->GetPartitionStrategy());
190+
}
191+
171192
const TPathElement::TPtr dbRootEl = context.SS->PathsById.at(context.SS->RootPathId());
172193
if (dbRootEl->UserAttrs->Attrs.contains("cloud_id")) {
173194
auto cloudId = dbRootEl->UserAttrs->Attrs.at("cloud_id");
@@ -234,7 +255,9 @@ class TAlterPQ: public TSubOperation {
234255
ui64 shardsToCreate,
235256
const TChannelsBindings& rbChannelsBinding,
236257
const TChannelsBindings& pqChannelsBinding,
237-
TOperationContext& context)
258+
TOperationContext& context,
259+
const NKikimrPQ::TPQTabletConfig& tabletConfig,
260+
const NKikimrPQ::TPQTabletConfig& newTabletConfig)
238261
{
239262
TPathElement::TPtr item = path.Base();
240263
NIceDb::TNiceDb db(context.GetDB());
@@ -250,10 +273,44 @@ class TAlterPQ: public TSubOperation {
250273
context.SS->PersistUpdateNextShardIdx(db);
251274
}
252275

253-
for (auto& shard : pqGroup->Shards) {
254-
auto shardIdx = shard.first;
255-
for (const auto& pqInfo : shard.second->Partitions) {
256-
context.SS->PersistPersQueue(db, item->PathId, shardIdx, *pqInfo.Get());
276+
bool splitMergeWasDisabled = NKikimr::NPQ::SplitMergeEnabled(tabletConfig)
277+
&& !NKikimr::NPQ::SplitMergeEnabled(newTabletConfig);
278+
bool splitMergeWasEnabled = !NKikimr::NPQ::SplitMergeEnabled(tabletConfig)
279+
&& NKikimr::NPQ::SplitMergeEnabled(newTabletConfig);
280+
281+
if (splitMergeWasEnabled) {
282+
auto partitions = pqGroup->GetPartitions();
283+
284+
TString prevBound;
285+
for (size_t i = 0; i < partitions.size(); ++i) {
286+
auto* partitionInfo = partitions[i].second;
287+
if (i) {
288+
partitionInfo->KeyRange.ConstructInPlace();
289+
partitionInfo->KeyRange->FromBound = prevBound;
290+
}
291+
if (i != (partitions.size() - 1)) {
292+
if (!partitionInfo->KeyRange) {
293+
partitionInfo->KeyRange.ConstructInPlace();
294+
}
295+
auto range = NDataStreams::V1::RangeFromShardNumber(i, partitions.size());
296+
prevBound = NPQ::AsKeyBound(range.End);
297+
partitionInfo->KeyRange->ToBound = prevBound;
298+
}
299+
300+
context.SS->PersistPersQueue(db, item->PathId, partitions[i].first, *partitionInfo);
301+
}
302+
} else {
303+
for (auto& [shardIdx, tabletInfo] : pqGroup->Shards) {
304+
for (const auto& partitionInfo : tabletInfo->Partitions) {
305+
if (splitMergeWasDisabled) {
306+
// clear all splitmerge fields
307+
partitionInfo->Status = NKikimrPQ::ETopicPartitionStatus::Active;
308+
partitionInfo->KeyRange.Clear();
309+
partitionInfo->ParentPartitionIds.clear();
310+
partitionInfo->ChildPartitionIds.clear();
311+
}
312+
context.SS->PersistPersQueue(db, item->PathId, shardIdx, *partitionInfo.Get());
313+
}
257314
}
258315
}
259316

@@ -494,12 +551,8 @@ class TAlterPQ: public TSubOperation {
494551
return result;
495552
}
496553

497-
NKikimrPQ::TPQTabletConfig tabletConfig, newTabletConfig;
498-
if (!topic->TabletConfig.empty()) {
499-
bool parseOk = ParseFromStringNoSizeLimit(tabletConfig, topic->TabletConfig);
500-
Y_ABORT_UNLESS(parseOk, "Previously serialized pq tablet config cannot be parsed");
501-
}
502-
newTabletConfig = tabletConfig;
554+
NKikimrPQ::TPQTabletConfig tabletConfig = topic->GetTabletConfig();
555+
NKikimrPQ::TPQTabletConfig newTabletConfig = tabletConfig;
503556

504557
TTopicInfo::TPtr alterData = ParseParams(context, &newTabletConfig, alter, errStr);
505558

@@ -524,9 +577,11 @@ class TAlterPQ: public TSubOperation {
524577
return result;
525578
}
526579

527-
bool splitMergeEnabled = AppData()->FeatureFlags.GetEnableTopicSplitMerge();
528-
if (splitMergeEnabled) {
580+
bool splitMergeEnabled = AppData()->FeatureFlags.GetEnableTopicSplitMerge()
581+
&& NKikimr::NPQ::SplitMergeEnabled(tabletConfig)
582+
&& NKikimr::NPQ::SplitMergeEnabled(newTabletConfig);
529583

584+
if (splitMergeEnabled) {
530585
auto Hex = [](const auto& value) {
531586
return HexText(TBasicStringBuf(value));
532587
};
@@ -789,8 +844,8 @@ class TAlterPQ: public TSubOperation {
789844
}
790845

791846
topic->PrepareAlter(alterData);
792-
const TTxState& txState =
793-
PrepareChanges(OperationId, path, topic, shardsToCreate, tabletChannelsBinding, pqChannelsBinding, context);
847+
const TTxState& txState = PrepareChanges(OperationId, path, topic, shardsToCreate, tabletChannelsBinding,
848+
pqChannelsBinding, context, tabletConfig, newTabletConfig);
794849

795850
context.OnComplete.ActivateTx(OperationId);
796851
context.SS->ClearDescribePathCaches(path.Base());

ydb/core/tx/schemeshard/schemeshard__operation_common.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -881,6 +881,10 @@ void TPropose::PersistState(const TTxState& txState,
881881
context.OnComplete.PublishToSchemeBoard(OperationId, PathId);
882882

883883
TTopicInfo::TPtr pqGroup = context.SS->Topics[PathId];
884+
885+
NKikimrPQ::TPQTabletConfig tabletConfig = pqGroup->GetTabletConfig();
886+
NKikimrPQ::TPQTabletConfig newTabletConfig = pqGroup->AlterData->GetTabletConfig();
887+
884888
pqGroup->FinishAlter();
885889

886890
context.SS->PersistPersQueueGroup(db, PathId, pqGroup);

ydb/core/tx/schemeshard/schemeshard__operation_create_pq.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <ydb/core/engine/mkql_proto.h>
99
#include <ydb/core/persqueue/config/config.h>
1010
#include <ydb/core/persqueue/partition_key_range/partition_key_range.h>
11+
#include <ydb/core/persqueue/utils.h>
1112
#include <ydb/core/mind/hive/hive.h>
1213
#include <ydb/services/lib/sharding/sharding.h>
1314

@@ -103,7 +104,7 @@ TTopicInfo::TPtr CreatePersQueueGroup(TOperationContext& context,
103104
}
104105
}
105106

106-
bool splitMergeEnabled = AppData()->FeatureFlags.GetEnableTopicSplitMerge();
107+
bool splitMergeEnabled = AppData()->FeatureFlags.GetEnableTopicSplitMerge() && NKikimr::NPQ::SplitMergeEnabled(op.GetPQTabletConfig());
107108

108109
TString prevBound;
109110
for (ui32 i = 0; i < partitionCount; ++i) {

0 commit comments

Comments
 (0)