Skip to content

Commit 6f872bd

Browse files
authored
autopartitioning for kinesis (#6244)
1 parent 5baf70d commit 6f872bd

File tree

5 files changed

+510
-22
lines changed

5 files changed

+510
-22
lines changed

ydb/public/api/protos/draft/datastreams.proto

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ option cc_enable_arenas = true;
44
import "ydb/public/api/protos/ydb_operation.proto";
55

66
import "google/protobuf/descriptor.proto";
7+
import "google/protobuf/duration.proto";
78

89
package Ydb.DataStreams.V1;
910
option java_package = "com.yandex.ydb.datastreams.v1";
@@ -122,6 +123,8 @@ message StreamDescription {
122123

123124
// stream metering mode
124125
StreamModeDetails stream_mode_details = 14;
126+
127+
PartitioningSettings partitioning_settings = 15;
125128
}
126129

127130
// Represents range of possible sequence numbers for the shard
@@ -276,6 +279,53 @@ message StreamModeDetails {
276279
StreamMode stream_mode = 1;
277280
}
278281

282+
enum AutoPartitioningStrategy {
283+
// The auto partitioning algorithm is not specified. The default value will be used.
284+
AUTO_PARTITIONING_STRATEGY_UNSPECIFIED = 0;
285+
// The auto partitioning is disabled.
286+
AUTO_PARTITIONING_STRATEGY_DISABLED = 1;
287+
// The auto partitioning algorithm will increase partitions count depending on the load characteristics.
288+
// The auto partitioning algorithm will never decrease the number of partitions.
289+
AUTO_PARTITIONING_STRATEGY_SCALE_UP = 2;
290+
// The auto partitioning algorithm will both increase and decrease partitions count depending on the load characteristics.
291+
AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN = 3;
292+
}
293+
294+
// Partitioning settings for stream.
295+
message PartitioningSettings {
296+
// Auto merge would stop working when the partitions count reaches min_active_partitions
297+
// Zero value means default - 1.
298+
int64 min_active_partitions = 1;
299+
// Auto split would stop working when the partitions count reaches max_active_partitions
300+
// Zero value means default - 1.
301+
int64 max_active_partitions = 3;
302+
// Settings for the partitions count auto partitioning.
303+
AutoPartitioningSettings auto_partitioning_settings = 4;
304+
}
305+
306+
message AutoPartitioningSettings {
307+
// Strategy of auto partitioning.
308+
AutoPartitioningStrategy strategy = 1;
309+
// Partition write speed auto partitioning options.
310+
AutoPartitioningWriteSpeedStrategy partition_write_speed = 2;
311+
}
312+
313+
message AutoPartitioningWriteSpeedStrategy {
314+
//Partition will be auto partitioned up (divided into 2 partitions)
315+
//after write speed to the partition exceeds up_utilization_percent (in percentage of maximum write speed to the partition) for the period of time stabilization_window
316+
317+
//Partition will become a candidate to the auto partitioned down
318+
//after write speed doesn’t reach down_utilization_percent (in percentage of maximum write speed to the partition) for the period of time stabilization_window
319+
//This candidate partition will be auto partitioned down when other neighbour partition will become a candidate to the auto partitioning down and not earlier than a retention period.
320+
321+
// Zero value means default - 300.
322+
google.protobuf.Duration stabilization_window = 1;
323+
// Zero value means default - 90.
324+
int32 up_utilization_percent = 2;
325+
// Zero value means default - 30.
326+
int32 down_utilization_percent = 3;
327+
}
328+
279329
message CreateStreamRequest {
280330
Ydb.Operations.OperationParams operation_params = 1;
281331
// Name of the stream
@@ -292,6 +342,8 @@ message CreateStreamRequest {
292342
}
293343
// stream metering mode
294344
StreamModeDetails stream_mode_details = 7;
345+
346+
PartitioningSettings partitioning_settings = 8;
295347
}
296348

297349
message CreateStreamResponse {
@@ -399,6 +451,7 @@ message UpdateStreamRequest {
399451
// stream metering mode
400452
StreamModeDetails stream_mode_details = 7;
401453

454+
PartitioningSettings partitioning_settings = 8;
402455
}
403456

404457
message UpdateStreamResponse {
@@ -911,4 +964,3 @@ message UpdateStreamModeResponse {
911964

912965
message UpdateStreamModeResult {
913966
}
914-

ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.cpp

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,41 @@
1111

1212
namespace NYdb::NDataStreams::V1 {
1313

14+
TPartitioningSettingsBuilder<TCreateStreamSettings> TCreateStreamSettings::BeginConfigurePartitioningSettings() {
15+
return { *this };
16+
}
17+
18+
TPartitioningSettingsBuilder<TUpdateStreamSettings> TUpdateStreamSettings::BeginConfigurePartitioningSettings() {
19+
return { *this };
20+
}
21+
22+
void SetPartitionSettings(const TPartitioningSettings& ps, ::Ydb::DataStreams::V1::PartitioningSettings* pt) {
23+
pt->set_max_active_partitions(ps.GetMaxActivePartitions());
24+
pt->set_min_active_partitions(ps.GetMinActivePartitions());
25+
26+
::Ydb::DataStreams::V1::AutoPartitioningStrategy strategy;
27+
switch (ps.GetAutoPartitioningSettings().GetStrategy()) {
28+
case EAutoPartitioningStrategy::Unspecified:
29+
case EAutoPartitioningStrategy::Disabled:
30+
strategy = ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED;
31+
break;
32+
case EAutoPartitioningStrategy::ScaleUp:
33+
strategy = ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP;
34+
break;
35+
case EAutoPartitioningStrategy::ScaleUpAndDown:
36+
strategy = ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN;
37+
break;
38+
}
39+
40+
pt->mutable_auto_partitioning_settings()->set_strategy(strategy);
41+
pt->mutable_auto_partitioning_settings()->mutable_partition_write_speed()
42+
->mutable_stabilization_window()->set_seconds(ps.GetAutoPartitioningSettings().GetStabilizationWindow().Seconds());
43+
pt->mutable_auto_partitioning_settings()->mutable_partition_write_speed()
44+
->set_up_utilization_percent(ps.GetAutoPartitioningSettings().GetUpUtilizationPercent());
45+
pt->mutable_auto_partitioning_settings()->mutable_partition_write_speed()
46+
->set_down_utilization_percent(ps.GetAutoPartitioningSettings().GetDownUtilizationPercent());
47+
}
48+
1449
class TDataStreamsClient::TImpl : public TClientImplCommon<TDataStreamsClient::TImpl> {
1550
public:
1651
TImpl(std::shared_ptr <TGRpcConnectionsImpl> &&connections, const TCommonClientSettings &settings)
@@ -88,6 +123,10 @@ namespace NYdb::NDataStreams::V1 {
88123
*settings.StreamMode_ == ESM_PROVISIONED ? Ydb::DataStreams::V1::StreamMode::PROVISIONED
89124
: Ydb::DataStreams::V1::StreamMode::ON_DEMAND);
90125
}
126+
127+
if (settings.PartitioningSettings_.Defined()) {
128+
SetPartitionSettings(*settings.PartitioningSettings_, req.mutable_partitioning_settings());
129+
}
91130
});
92131
}
93132

@@ -372,6 +411,10 @@ namespace NYdb::NDataStreams::V1 {
372411
*settings.StreamMode_ == ESM_PROVISIONED ? Ydb::DataStreams::V1::StreamMode::PROVISIONED
373412
: Ydb::DataStreams::V1::StreamMode::ON_DEMAND);
374413
}
414+
415+
if (settings.PartitioningSettings_.Defined()) {
416+
SetPartitionSettings(*settings.PartitioningSettings_, req.mutable_partitioning_settings());
417+
}
375418
});
376419
}
377420

@@ -907,4 +950,3 @@ namespace NYdb::NDataStreams::V1 {
907950
TProtoRequestSettings settings
908951
);
909952
}
910-

ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,13 +101,155 @@ namespace NYdb::NDataStreams::V1 {
101101
TString ExplicitHashDecimal;
102102
};
103103

104+
enum class EAutoPartitioningStrategy: ui32 {
105+
Unspecified = 0,
106+
Disabled = 1,
107+
ScaleUp = 2,
108+
ScaleUpAndDown = 3,
109+
};
110+
111+
struct TCreateStreamSettings;
112+
struct TUpdateStreamSettings;
113+
114+
115+
template<typename TSettings>
116+
struct TPartitioningSettingsBuilder;
117+
template<typename TSettings>
118+
struct TAutoPartitioningSettingsBuilder;
119+
120+
struct TAutoPartitioningSettings {
121+
friend struct TAutoPartitioningSettingsBuilder<TCreateStreamSettings>;
122+
friend struct TAutoPartitioningSettingsBuilder<TUpdateStreamSettings>;
123+
public:
124+
TAutoPartitioningSettings()
125+
: Strategy_(EAutoPartitioningStrategy::Disabled)
126+
, StabilizationWindow_(TDuration::Seconds(0))
127+
, DownUtilizationPercent_(0)
128+
, UpUtilizationPercent_(0) {
129+
}
130+
TAutoPartitioningSettings(const Ydb::DataStreams::V1::AutoPartitioningSettings& settings);
131+
TAutoPartitioningSettings(EAutoPartitioningStrategy strategy, TDuration stabilizationWindow, ui64 downUtilizationPercent, ui64 upUtilizationPercent)
132+
: Strategy_(strategy)
133+
, StabilizationWindow_(stabilizationWindow)
134+
, DownUtilizationPercent_(downUtilizationPercent)
135+
, UpUtilizationPercent_(upUtilizationPercent) {}
136+
137+
EAutoPartitioningStrategy GetStrategy() const { return Strategy_; };
138+
TDuration GetStabilizationWindow() const { return StabilizationWindow_; };
139+
ui32 GetDownUtilizationPercent() const { return DownUtilizationPercent_; };
140+
ui32 GetUpUtilizationPercent() const { return UpUtilizationPercent_; };
141+
private:
142+
EAutoPartitioningStrategy Strategy_;
143+
TDuration StabilizationWindow_;
144+
ui32 DownUtilizationPercent_;
145+
ui32 UpUtilizationPercent_;
146+
};
147+
148+
149+
class TPartitioningSettings {
150+
using TSelf = TPartitioningSettings;
151+
friend struct TPartitioningSettingsBuilder<TCreateStreamSettings>;
152+
friend struct TPartitioningSettingsBuilder<TUpdateStreamSettings>;
153+
public:
154+
TPartitioningSettings() : MinActivePartitions_(0), MaxActivePartitions_(0), AutoPartitioningSettings_(){}
155+
TPartitioningSettings(const Ydb::DataStreams::V1::PartitioningSettings& settings);
156+
TPartitioningSettings(ui64 minActivePartitions, ui64 maxActivePartitions, TAutoPartitioningSettings autoscalingSettings = {})
157+
: MinActivePartitions_(minActivePartitions)
158+
, MaxActivePartitions_(maxActivePartitions)
159+
, AutoPartitioningSettings_(autoscalingSettings) {
160+
}
161+
162+
ui64 GetMinActivePartitions() const { return MinActivePartitions_; };
163+
ui64 GetMaxActivePartitions() const { return MaxActivePartitions_; };
164+
TAutoPartitioningSettings GetAutoPartitioningSettings() const { return AutoPartitioningSettings_; };
165+
private:
166+
ui64 MinActivePartitions_;
167+
ui64 MaxActivePartitions_;
168+
TAutoPartitioningSettings AutoPartitioningSettings_;
169+
};
170+
104171
struct TCreateStreamSettings : public NYdb::TOperationRequestSettings<TCreateStreamSettings> {
105172
FLUENT_SETTING(ui32, ShardCount);
106173
FLUENT_SETTING_OPTIONAL(ui32, RetentionPeriodHours);
107174
FLUENT_SETTING_OPTIONAL(ui32, RetentionStorageMegabytes);
108175
FLUENT_SETTING(ui64, WriteQuotaKbPerSec);
109176
FLUENT_SETTING_OPTIONAL(EStreamMode, StreamMode);
177+
178+
FLUENT_SETTING_OPTIONAL(TPartitioningSettings, PartitioningSettings);
179+
TPartitioningSettingsBuilder<TCreateStreamSettings> BeginConfigurePartitioningSettings();
110180
};
181+
template<typename TSettings>
182+
struct TAutoPartitioningSettingsBuilder {
183+
using TSelf = TAutoPartitioningSettingsBuilder<TSettings>;
184+
public:
185+
TAutoPartitioningSettingsBuilder(TPartitioningSettingsBuilder<TSettings>& parent, TAutoPartitioningSettings& settings): Parent_(parent), Settings_(settings) {}
186+
187+
TSelf Strategy(EAutoPartitioningStrategy value) {
188+
Settings_.Strategy_ = value;
189+
return *this;
190+
}
191+
192+
TSelf StabilizationWindow(TDuration value) {
193+
Settings_.StabilizationWindow_ = value;
194+
return *this;
195+
}
196+
197+
TSelf DownUtilizationPercent(ui32 value) {
198+
Settings_.DownUtilizationPercent_ = value;
199+
return *this;
200+
}
201+
202+
TSelf UpUtilizationPercent(ui32 value) {
203+
Settings_.UpUtilizationPercent_ = value;
204+
return *this;
205+
}
206+
207+
TPartitioningSettingsBuilder<TSettings>& EndConfigureAutoPartitioningSettings() {
208+
return Parent_;
209+
}
210+
211+
private:
212+
TPartitioningSettingsBuilder<TSettings>& Parent_;
213+
TAutoPartitioningSettings& Settings_;
214+
};
215+
216+
template<typename TSettings>
217+
struct TPartitioningSettingsBuilder {
218+
using TSelf = TPartitioningSettingsBuilder;
219+
public:
220+
TPartitioningSettingsBuilder(TSettings& parent): Parent_(parent) {}
221+
222+
TSelf MinActivePartitions(ui64 value) {
223+
if (!Parent_.PartitioningSettings_.Defined()) {
224+
Parent_.PartitioningSettings_.ConstructInPlace();
225+
}
226+
(*Parent_.PartitioningSettings_).MinActivePartitions_ = value;
227+
return *this;
228+
}
229+
230+
TSelf MaxActivePartitions(ui64 value) {
231+
if (!Parent_.PartitioningSettings_.Defined()) {
232+
Parent_.PartitioningSettings_.ConstructInPlace();
233+
}
234+
(*Parent_.PartitioningSettings_).MaxActivePartitions_ = value;
235+
return *this;
236+
}
237+
238+
TAutoPartitioningSettingsBuilder<TSettings> BeginConfigureAutoPartitioningSettings() {
239+
if (!Parent_.PartitioningSettings_.Defined()) {
240+
Parent_.PartitioningSettings_.ConstructInPlace();
241+
}
242+
return {*this, (*Parent_.PartitioningSettings_).AutoPartitioningSettings_};
243+
}
244+
245+
TSettings& EndConfigurePartitioningSettings() {
246+
return Parent_;
247+
}
248+
249+
private:
250+
TSettings& Parent_;
251+
};
252+
111253
struct TListStreamsSettings : public NYdb::TOperationRequestSettings<TListStreamsSettings> {
112254
FLUENT_SETTING(ui32, Limit);
113255
FLUENT_SETTING(TString, ExclusiveStartStreamName);
@@ -155,6 +297,8 @@ namespace NYdb::NDataStreams::V1 {
155297
FLUENT_SETTING(ui64, WriteQuotaKbPerSec);
156298
FLUENT_SETTING_OPTIONAL(EStreamMode, StreamMode);
157299

300+
FLUENT_SETTING_OPTIONAL(TPartitioningSettings, PartitioningSettings);
301+
TPartitioningSettingsBuilder<TUpdateStreamSettings> BeginConfigurePartitioningSettings();
158302
};
159303
struct TPutRecordSettings : public NYdb::TOperationRequestSettings<TPutRecordSettings> {};
160304
struct TPutRecordsSettings : public NYdb::TOperationRequestSettings<TPutRecordsSettings> {};

0 commit comments

Comments
 (0)