Skip to content

Commit 5285086

Browse files
authored
Merge 262817a into 8bf9bed
2 parents 8bf9bed + 262817a commit 5285086

File tree

8 files changed

+109
-7
lines changed

8 files changed

+109
-7
lines changed

ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp

+35
Original file line numberDiff line numberDiff line change
@@ -800,6 +800,41 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
800800
auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync();
801801
UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 3);
802802

803+
bool firstPartitionFound = false;
804+
for (const auto& partition : describe.GetTopicDescription().GetPartitions()) {
805+
if (partition.GetPartitionId() == 0) {
806+
firstPartitionFound = true;
807+
UNIT_ASSERT(!partition.GetActive());
808+
UNIT_ASSERT_EQUAL(partition.GetChildPartitionIds().size(), 2);
809+
auto childIds = partition.GetChildPartitionIds();
810+
std::sort(childIds.begin(), childIds.end());
811+
UNIT_ASSERT_EQUAL(childIds[0], 1);
812+
UNIT_ASSERT_EQUAL(childIds[1], 2);
813+
}
814+
}
815+
UNIT_ASSERT(firstPartitionFound);
816+
817+
TString secondPartitionTo = "";
818+
TString thirdPartitionFrom = "";
819+
for (const auto& partition : describe.GetTopicDescription().GetPartitions()) {
820+
if (partition.GetPartitionId() == 1 || partition.GetPartitionId() == 2) {
821+
UNIT_ASSERT(partition.GetActive());
822+
if (partition.GetPartitionId() == 1) {
823+
UNIT_ASSERT(partition.GetToBound().Defined() && !partition.GetToBound()->Empty());
824+
secondPartitionTo = *partition.GetToBound();
825+
}
826+
if (partition.GetPartitionId() == 2) {
827+
UNIT_ASSERT(partition.GetFromBound().Defined() && !partition.GetFromBound()->Empty());
828+
thirdPartitionFrom = *partition.GetFromBound();
829+
}
830+
UNIT_ASSERT_EQUAL(partition.GetParentPartitionIds().size(), 1);
831+
UNIT_ASSERT_EQUAL(partition.GetParentPartitionIds()[0], 0);
832+
}
833+
}
834+
835+
UNIT_ASSERT(!secondPartitionTo.Empty());
836+
UNIT_ASSERT(!thirdPartitionFrom.Empty());
837+
803838
auto writeSession2 = CreateWriteSession(client, "producer-1", 1, TEST_TOPIC, false);
804839
UNIT_ASSERT(writeSession2->Write(Msg(msg, 3)));
805840
UNIT_ASSERT(writeSession2->Write(Msg(msg, 4)));

ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,10 @@ class TAlterPQ: public TSubOperation {
136136
return nullptr;
137137
}
138138

139+
if (!alterConfig.HasPartitionStrategy() && tabletConfig->HasPartitionStrategy()) {
140+
alterConfig.MutablePartitionStrategy()->CopyFrom(tabletConfig->GetPartitionStrategy());
141+
}
142+
139143
if (alterConfig.GetPartitionConfig().HasLifetimeSeconds()) {
140144
const auto lifetimeSeconds = alterConfig.GetPartitionConfig().GetLifetimeSeconds();
141145
if (lifetimeSeconds <= 0 || (ui32)lifetimeSeconds > TSchemeShard::MaxPQLifetimeSeconds) {

ydb/public/api/protos/ydb_topic.proto

+5
Original file line numberDiff line numberDiff line change
@@ -1087,6 +1087,11 @@ message DescribeTopicResult {
10871087

10881088
// Partition location, filled only when include_location in request is true.
10891089
PartitionLocation partition_location = 6;
1090+
1091+
// Inclusive left border. Emptiness means -inf.
1092+
optional bytes from_bound = 7;
1093+
// Exclusive right border. Emptiness means +inf.
1094+
optional bytes to_bound = 8;
10901095
}
10911096

10921097
message TopicStats {

ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp

+5
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,10 @@ namespace {
507507
config.Opts->AddLongOption("starting-message-timestamp", "Unix timestamp starting from '1970-01-01 00:00:00' from which read is allowed")
508508
.Optional()
509509
.StoreResult(&StartingMessageTimestamp_);
510+
config.Opts->AddLongOption("important", "Is consumer important")
511+
.Optional()
512+
.DefaultValue(false)
513+
.StoreResult(&IsImportant_);
510514
config.Opts->SetFreeArgsNum(1);
511515
SetFreeArgTitle(0, "<topic-path>", "Topic path");
512516
AddAllowedCodecs(config, AllowedCodecs);
@@ -537,6 +541,7 @@ namespace {
537541
codecs.push_back(NTopic::ECodec::RAW);
538542
}
539543
consumerSettings.SetSupportedCodecs(codecs);
544+
consumerSettings.SetImportant(IsImportant_);
540545

541546
readRuleSettings.AppendAddConsumers(consumerSettings);
542547

ydb/public/lib/ydb_cli/commands/ydb_service_topic.h

+1
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ namespace NYdb::NConsoleClient {
122122

123123
private:
124124
TString ConsumerName_;
125+
bool IsImportant_;
125126
TMaybe<ui64> StartingMessageTimestamp_;
126127
};
127128

ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp

+25
Original file line numberDiff line numberDiff line change
@@ -395,13 +395,22 @@ TPartitionInfo::TPartitionInfo(const Ydb::Topic::DescribeTopicResult::PartitionI
395395
for (const auto& partId : partitionInfo.parent_partition_ids()) {
396396
ParentPartitionIds_.push_back(partId);
397397
}
398+
398399
if (partitionInfo.has_partition_stats()) {
399400
PartitionStats_ = TPartitionStats{partitionInfo.partition_stats()};
400401
}
401402

402403
if (partitionInfo.has_partition_location()) {
403404
PartitionLocation_ = TPartitionLocation{partitionInfo.partition_location()};
404405
}
406+
407+
if (partitionInfo.has_from_bound()) {
408+
FromBound_ = TString(partitionInfo.from_bound());
409+
}
410+
411+
if (partitionInfo.has_to_bound()) {
412+
ToBound_ = TString(partitionInfo.to_bound());
413+
}
405414
}
406415

407416
TPartitionInfo::TPartitionInfo(const Ydb::Topic::DescribeConsumerResult::PartitionInfo& partitionInfo)
@@ -437,6 +446,14 @@ const TMaybe<TPartitionLocation>& TPartitionInfo::GetPartitionLocation() const {
437446
return PartitionLocation_;
438447
}
439448

449+
const TVector<ui64> TPartitionInfo::GetChildPartitionIds() const {
450+
return ChildPartitionIds_;
451+
}
452+
453+
const TVector<ui64> TPartitionInfo::GetParentPartitionIds() const {
454+
return ParentPartitionIds_;
455+
}
456+
440457
bool TPartitionInfo::GetActive() const {
441458
return Active_;
442459
}
@@ -445,6 +462,14 @@ ui64 TPartitionInfo::GetPartitionId() const {
445462
return PartitionId_;
446463
}
447464

465+
const TMaybe<TString>& TPartitionInfo::GetFromBound() const {
466+
return FromBound_;
467+
}
468+
469+
const TMaybe<TString>& TPartitionInfo::GetToBound() const {
470+
return ToBound_;
471+
}
472+
448473
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
449474
// TTopicClient
450475

ydb/public/sdk/cpp/client/ydb_topic/include/control_plane.h

+14-2
Original file line numberDiff line numberDiff line change
@@ -144,14 +144,21 @@ class TPartitionInfo {
144144
const TMaybe<TPartitionConsumerStats>& GetPartitionConsumerStats() const;
145145
const TMaybe<TPartitionLocation>& GetPartitionLocation() const;
146146

147+
const TMaybe<TString>& GetFromBound() const;
148+
const TMaybe<TString>& GetToBound() const;
149+
147150
private:
148151
ui64 PartitionId_;
149152
bool Active_;
150153
TVector<ui64> ChildPartitionIds_;
151154
TVector<ui64> ParentPartitionIds_;
155+
152156
TMaybe<TPartitionStats> PartitionStats_;
153157
TMaybe<TPartitionConsumerStats> PartitionConsumerStats_;
154158
TMaybe<TPartitionLocation> PartitionLocation_;
159+
160+
TMaybe<TString> FromBound_;
161+
TMaybe<TString> ToBound_;
155162
};
156163

157164
struct TAlterPartitioningSettings;
@@ -206,11 +213,11 @@ class TPartitioningSettings {
206213
public:
207214
TPartitioningSettings() : MinActivePartitions_(0), MaxActivePartitions_(0), PartitionCountLimit_(0), AutoPartitioningSettings_(){}
208215
TPartitioningSettings(const Ydb::Topic::PartitioningSettings& settings);
209-
TPartitioningSettings(ui64 minActivePartitions, ui64 maxActivePartitions, TAutoPartitioningSettings autoscalingSettings = {})
216+
TPartitioningSettings(ui64 minActivePartitions, ui64 maxActivePartitions, TAutoPartitioningSettings autoPartitioning = {})
210217
: MinActivePartitions_(minActivePartitions)
211218
, MaxActivePartitions_(maxActivePartitions)
212219
, PartitionCountLimit_(0)
213-
, AutoPartitioningSettings_(autoscalingSettings)
220+
, AutoPartitioningSettings_(autoPartitioning)
214221
{
215222
}
216223

@@ -459,6 +466,11 @@ struct TConsumerSettings {
459466
return *this;
460467
}
461468

469+
TConsumerSettings& SetImportant(bool isImportant) {
470+
Important_ = isImportant;
471+
return *this;
472+
}
473+
462474
TSettings& EndAddConsumer() { return Parent_; };
463475

464476
private:

ydb/services/persqueue_v1/actors/schema_actors.cpp

+20-5
Original file line numberDiff line numberDiff line change
@@ -1070,10 +1070,26 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv
10701070

10711071
if (response.PQGroupInfo) {
10721072
const auto& pqDescr = response.PQGroupInfo->Description;
1073-
for(ui32 i = 0; i < pqDescr.GetTotalGroupCount(); ++i) {
1074-
auto part = Result.add_partitions();
1075-
part->set_partition_id(i);
1076-
part->set_active(true);
1073+
for (auto& sourcePart: pqDescr.GetPartitions()) {
1074+
auto destPart = Result.add_partitions();
1075+
destPart->set_partition_id(sourcePart.GetPartitionId());
1076+
destPart->set_active(sourcePart.GetStatus() == ::NKikimrPQ::ETopicPartitionStatus::Active);
1077+
if (sourcePart.HasKeyRange()) {
1078+
if (sourcePart.GetKeyRange().HasFromBound()) {
1079+
destPart->set_from_bound(sourcePart.GetKeyRange().GetFromBound());
1080+
}
1081+
if (sourcePart.GetKeyRange().HasToBound()) {
1082+
destPart->set_to_bound(sourcePart.GetKeyRange().GetToBound());
1083+
}
1084+
}
1085+
1086+
for (size_t i = 0; i < sourcePart.ChildPartitionIdsSize(); ++i) {
1087+
destPart->add_child_partition_ids(static_cast<int64_t>(sourcePart.GetChildPartitionIds(i)));
1088+
}
1089+
1090+
for (size_t i = 0; i < sourcePart.ParentPartitionIdsSize(); ++i) {
1091+
destPart->add_parent_partition_ids(static_cast<int64_t>(sourcePart.GetParentPartitionIds(i)));
1092+
}
10771093
}
10781094

10791095
const auto &config = pqDescr.GetPQTabletConfig();
@@ -1401,7 +1417,6 @@ void TDescribePartitionActor::ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TE
14011417
for (auto partData : record.GetPartResult()) {
14021418
if ((ui32)partData.GetPartition() != Settings.Partitions[0])
14031419
continue;
1404-
14051420
Y_ABORT_UNLESS((ui32)(partData.GetPartition()) == Settings.Partitions[0]);
14061421
partResult->set_partition_id(partData.GetPartition());
14071422
partResult->set_active(true);

0 commit comments

Comments
 (0)