@@ -38,6 +38,18 @@ namespace NYdb::NConsoleClient {
38
38
std::pair<NTopic::EMeteringMode, TString>(NTopic::EMeteringMode::RequestUnits, " Read/write operations valued in request units, storage usage on hourly basis." ),
39
39
};
40
40
41
+ THashMap<TString, NTopic::EAutoscalingStrategy> AutoscaleStrategies = {
42
+ std::pair<TString, NTopic::EAutoscalingStrategy>(" disabled" , NTopic::EAutoscalingStrategy::Disabled),
43
+ std::pair<TString, NTopic::EAutoscalingStrategy>(" up" , NTopic::EAutoscalingStrategy::ScaleUp),
44
+ std::pair<TString, NTopic::EAutoscalingStrategy>(" up-and-down" , NTopic::EAutoscalingStrategy::ScaleUpAndDown),
45
+ };
46
+
47
+ THashMap<NTopic::EAutoscalingStrategy, TString> AutoscaleStrategiesDescriptions = {
48
+ std::pair<NTopic::EAutoscalingStrategy, TString>(NTopic::EAutoscalingStrategy::Disabled, " Automatic scaling of the number of partitions is disabled" ),
49
+ std::pair<NTopic::EAutoscalingStrategy, TString>(NTopic::EAutoscalingStrategy::ScaleUp, " The number of partitions can increase under high load, but cannot decrease" ),
50
+ std::pair<NTopic::EAutoscalingStrategy, TString>(NTopic::EAutoscalingStrategy::ScaleUpAndDown, " The number of partitions can increase under high load and decrease under low load" ),
51
+ };
52
+
41
53
THashMap<ETopicMetadataField, TString> TopicMetadataFieldsDescriptions = {
42
54
{ETopicMetadataField::Body, " Message data" },
43
55
{ETopicMetadataField::WriteTime, " Message write time, a UNIX timestamp the message was written to server." },
@@ -172,6 +184,72 @@ namespace {
172
184
return MeteringMode_;
173
185
}
174
186
187
+ void TCommandWithAutoscaling::AddAutoscaling (TClientCommand::TConfig& config, bool withDefault) {
188
+ TStringStream description;
189
+ description << " A strategy to automatically change the number of partitions depending on the load. Available strategies: " ;
190
+ NColorizer::TColors colors = NColorizer::AutoColors (Cout);
191
+ for (const auto & strategy: AutoscaleStrategies) {
192
+ auto findResult = AutoscaleStrategiesDescriptions.find (strategy.second );
193
+ Y_ABORT_UNLESS (findResult != AutoscaleStrategiesDescriptions.end (),
194
+ " Couldn't find description for %s autoscale strategy" , (TStringBuilder () << strategy.second ).c_str ());
195
+ description << " \n " << colors.BoldColor () << strategy.first << colors.OldColor ()
196
+ << " \n " << findResult->second ;
197
+ if (strategy.second == NTopic::EAutoscalingStrategy::Disabled) {
198
+ description << colors.CyanColor () << " (default)" << colors.OldColor ();
199
+ }
200
+ }
201
+
202
+ auto straregy = config.Opts ->AddLongOption (" autoscale-strategy" , description.Str ())
203
+ .Optional ()
204
+ .StoreResult (&AutoscaleStrategy_);
205
+ auto thresholdTime = config.Opts ->AddLongOption (" autoscale-threshold-time" , " Duration in seconds of high or low load before automatically scale the number of partitions" )
206
+ .Optional ()
207
+ .StoreResult (&ScaleThresholdTime_);
208
+ auto upThresholdPercent = config.Opts ->AddLongOption (" autoscale-scale-up-threshold-percent" , " The load percentage at which the number of partitions will increase." )
209
+ .Optional ()
210
+ .StoreResult (&ScaleUpThresholdPercent_);
211
+ auto downThresholdPercent = config.Opts ->AddLongOption (" autoscale-scale-down-threshold-percent" , " The load percentage at which the number of partitions will decrease." )
212
+ .Optional ()
213
+ .StoreResult (&ScaleDownThresholdPercent_);
214
+
215
+ if (withDefault) {
216
+ straregy.DefaultValue (" disabled" );
217
+ thresholdTime.DefaultValue (300 );
218
+ upThresholdPercent.DefaultValue (90 );
219
+ downThresholdPercent.DefaultValue (30 );
220
+ }
221
+ }
222
+
223
+ void TCommandWithAutoscaling::ParseAutoscalingStrategy () {
224
+ if (AutoscalingStrategyStr_.empty ()) {
225
+ return ;
226
+ }
227
+
228
+ TString toLowerStrategy = to_lower (AutoscalingStrategyStr_);
229
+ auto strategyIt = AutoscaleStrategies.find (toLowerStrategy);
230
+ if (strategyIt.IsEnd ()) {
231
+ throw TMisuseException () << " Autoscaling strategy " << AutoscalingStrategyStr_ << " is not available for this command" ;
232
+ } else {
233
+ AutoscaleStrategy_ = strategyIt->second ;
234
+ }
235
+ }
236
+
237
+ TMaybe<NTopic::EAutoscalingStrategy> TCommandWithAutoscaling::GetAutoscalingStrategy () const {
238
+ return AutoscaleStrategy_;
239
+ }
240
+
241
+ TMaybe<ui32> TCommandWithAutoscaling::GetScaleThresholdTime () const {
242
+ return ScaleThresholdTime_;
243
+ }
244
+
245
+ TMaybe<ui32> TCommandWithAutoscaling::GetScaleUpThresholdPercent () const {
246
+ return ScaleUpThresholdPercent_;
247
+ }
248
+
249
+ TMaybe<ui32> TCommandWithAutoscaling::GetScaleDownThresholdPercent () const {
250
+ return ScaleDownThresholdPercent_;
251
+ }
252
+
175
253
TCommandTopic::TCommandTopic ()
176
254
: TClientCommandTree(" topic" , {}, " TopicService operations" ) {
177
255
AddCommand (std::make_unique<TCommandTopicCreate>());
@@ -188,9 +266,13 @@ namespace {
188
266
189
267
void TCommandTopicCreate::Config (TConfig& config) {
190
268
TYdbCommand::Config (config);
191
- config.Opts ->AddLongOption (" partitions-count" , " Total partitions count for topic" )
269
+ config.Opts ->AddLongOption (" partitions-count" , " Initial number of partitions for topic" )
270
+ .DefaultValue (1 )
271
+ .StoreResult (&MinActivePartitions_);
272
+ config.Opts ->AddLongOption (" max-partitions-count" , " Maximum number of partitions for topic" )
192
273
.DefaultValue (1 )
193
- .StoreResult (&PartitionsCount_);
274
+ .Optional ()
275
+ .StoreResult (&MaxActivePartitions_);
194
276
config.Opts ->AddLongOption (" retention-period-hours" , " Duration in hours for which data in topic is stored" )
195
277
.DefaultValue (24 )
196
278
.Optional ()
@@ -207,21 +289,30 @@ namespace {
207
289
SetFreeArgTitle (0 , " <topic-path>" , " Topic path" );
208
290
AddAllowedCodecs (config, AllowedCodecs);
209
291
AddAllowedMeteringModes (config);
292
+ AddAutoscaling (config, true );
210
293
}
211
294
212
295
void TCommandTopicCreate::Parse (TConfig& config) {
213
296
TYdbCommand::Parse (config);
214
297
ParseTopicName (config, 0 );
215
298
ParseCodecs ();
216
299
ParseMeteringMode ();
300
+ ParseAutoscalingStrategy ();
217
301
}
218
302
219
303
int TCommandTopicCreate::Run (TConfig& config) {
220
304
TDriver driver = CreateDriver (config);
221
305
NYdb::NTopic::TTopicClient topicClient (driver);
222
306
223
307
auto settings = NYdb::NTopic::TCreateTopicSettings ();
224
- settings.PartitioningSettings (PartitionsCount_, PartitionsCount_);
308
+
309
+ auto autoscaleSettings = NTopic::TAutoscalingSettings (
310
+ GetAutoscalingStrategy () ? *GetAutoscalingStrategy () : NTopic::EAutoscalingStrategy::Disabled,
311
+ GetScaleThresholdTime () ? TDuration::Seconds (*GetScaleThresholdTime ()) : TDuration::Seconds (0 ),
312
+ GetScaleUpThresholdPercent () ? *GetScaleUpThresholdPercent () : 0 ,
313
+ GetScaleDownThresholdPercent () ? *GetScaleDownThresholdPercent () : 0 );
314
+
315
+ settings.PartitioningSettings (MinActivePartitions_, MaxActivePartitions_, autoscaleSettings);
225
316
settings.PartitionWriteBurstBytes (PartitionWriteSpeedKbps_ * 1_KB);
226
317
settings.PartitionWriteSpeedBytesPerSecond (PartitionWriteSpeedKbps_ * 1_KB);
227
318
@@ -249,8 +340,11 @@ namespace {
249
340
250
341
void TCommandTopicAlter::Config (TConfig& config) {
251
342
TYdbCommand::Config (config);
252
- config.Opts ->AddLongOption (" partitions-count" , " Total partitions count for topic" )
253
- .StoreResult (&PartitionsCount_);
343
+ config.Opts ->AddLongOption (" partitions-count" , " Initial number of partitions for topic" )
344
+ .StoreResult (&MinActivePartitions_);
345
+ config.Opts ->AddLongOption (" max-partitions-count" , " Maximum number of partitions for topic" )
346
+ .Optional ()
347
+ .StoreResult (&MaxActivePartitions_);
254
348
config.Opts ->AddLongOption (" retention-period-hours" , " Duration for which data in topic is stored" )
255
349
.Optional ()
256
350
.StoreResult (&RetentionPeriodHours_);
@@ -264,6 +358,7 @@ namespace {
264
358
SetFreeArgTitle (0 , " <topic-path>" , " Topic path" );
265
359
AddAllowedCodecs (config, AllowedCodecs);
266
360
AddAllowedMeteringModes (config);
361
+ AddAutoscaling (config, false );
267
362
}
268
363
269
364
void TCommandTopicAlter::Parse (TConfig& config) {
@@ -276,9 +371,32 @@ namespace {
276
371
NYdb::NTopic::TAlterTopicSettings TCommandTopicAlter::PrepareAlterSettings (
277
372
NYdb::NTopic::TDescribeTopicResult& describeResult) {
278
373
auto settings = NYdb::NTopic::TAlterTopicSettings ();
374
+ auto partitioningSettings = settings.BeginAlterPartitioningSettings ();
375
+
376
+ if (MinActivePartitions_.Defined () && (*MinActivePartitions_ != describeResult.GetTopicDescription ().GetPartitioningSettings ().GetMinActivePartitions ())) {
377
+ partitioningSettings.MinActivePartitions (*MinActivePartitions_);
378
+ }
379
+
380
+ if (MaxActivePartitions_.Defined () && (*MaxActivePartitions_ != describeResult.GetTopicDescription ().GetPartitioningSettings ().GetMaxActivePartitions ())) {
381
+ partitioningSettings.MaxActivePartitions (*MaxActivePartitions_);
382
+ }
383
+
384
+ auto autoscalingSettings = partitioningSettings.BeginAlterAutoscalingSettings ();
385
+
386
+ if (GetScaleThresholdTime ().Defined () && *GetScaleThresholdTime () != describeResult.GetTopicDescription ().GetPartitioningSettings ().GetAutoscalingSettings ().GetThresholdTime ().Seconds ()) {
387
+ autoscalingSettings.ThresholdTime (TDuration::Seconds (*GetScaleThresholdTime ()));
388
+ }
389
+
390
+ if (GetAutoscalingStrategy ().Defined () && *GetAutoscalingStrategy () != describeResult.GetTopicDescription ().GetPartitioningSettings ().GetAutoscalingSettings ().GetStrategy ()) {
391
+ autoscalingSettings.Strategy (*GetAutoscalingStrategy ());
392
+ }
393
+
394
+ if (GetScaleDownThresholdPercent ().Defined () && *GetScaleDownThresholdPercent () != describeResult.GetTopicDescription ().GetPartitioningSettings ().GetAutoscalingSettings ().GetScaleDownThresholdPercent ()) {
395
+ autoscalingSettings.ScaleDownThresholdPercent (*GetScaleDownThresholdPercent ());
396
+ }
279
397
280
- if (PartitionsCount_ .Defined () && (*PartitionsCount_ != describeResult.GetTopicDescription ().GetTotalPartitionsCount () )) {
281
- settings. AlterPartitioningSettings (*PartitionsCount_, *PartitionsCount_ );
398
+ if (GetScaleUpThresholdPercent () .Defined () && * GetScaleUpThresholdPercent () != describeResult.GetTopicDescription ().GetPartitioningSettings (). GetAutoscalingSettings (). GetScaleUpThresholdPercent ( )) {
399
+ autoscalingSettings. ScaleUpThresholdPercent (* GetScaleUpThresholdPercent () );
282
400
}
283
401
284
402
auto codecs = GetCodecs ();
0 commit comments