@@ -94,6 +94,47 @@ namespace NKikimr::NDataStreams::V1 {
94
94
95
95
return {};
96
96
}
97
+
98
+ void SetShardProperties (::Ydb::DataStreams::V1::Shard* shard,
99
+ const ::NKikimrSchemeOp::TPersQueueGroupDescription_TPartition& partition,
100
+ const bool autoPartitioningEnabled,
101
+ const size_t allShardsCount,
102
+ const std::map<ui64, std::pair<ui64, ui64>>& offsets) {
103
+ shard->set_shard_id (GetShardName (partition.GetPartitionId ()));
104
+
105
+
106
+ const auto & parents = partition.GetParentPartitionIds ();
107
+ if (parents.size () > 0 ) {
108
+ shard->set_parent_shard_id (GetShardName (parents[0 ]));
109
+ }
110
+ if (parents.size () > 1 ) {
111
+ shard->set_adjacent_parent_shard_id (GetShardName (parents[1 ]));
112
+ }
113
+
114
+ auto * rangeProto = shard->mutable_hash_key_range ();
115
+ if (autoPartitioningEnabled) {
116
+ NYql::NDecimal::TUint128 from = partition.HasKeyRange () && partition.GetKeyRange ().HasFromBound ()
117
+ ? NPQ::AsInt<NYql::NDecimal::TUint128>(partition.GetKeyRange ().GetFromBound ()) + 1 : 0 ;
118
+ NYql::NDecimal::TUint128 to = partition.HasKeyRange () && partition.GetKeyRange ().HasToBound ()
119
+ ? NPQ::AsInt<NYql::NDecimal::TUint128>(partition.GetKeyRange ().GetToBound ()): -1 ;
120
+ rangeProto->set_starting_hash_key (Uint128ToDecimalString (from));
121
+ rangeProto->set_ending_hash_key (Uint128ToDecimalString (to));
122
+ } else {
123
+ auto range = RangeFromShardNumber (partition.GetPartitionId (), allShardsCount);
124
+ rangeProto->set_starting_hash_key (Uint128ToDecimalString (range.Start ));
125
+ rangeProto->set_ending_hash_key (Uint128ToDecimalString (range.End ));
126
+ }
127
+
128
+ auto it = offsets.find (partition.GetPartitionId ());
129
+ if (it != offsets.end ()) {
130
+ auto * rangeProto = shard->mutable_sequence_number_range ();
131
+ rangeProto->set_starting_sequence_number (TStringBuilder () << it->second .first );
132
+
133
+ if (::NKikimrPQ::ETopicPartitionStatus::Active != partition.GetStatus ()) {
134
+ rangeProto->set_ending_sequence_number (TStringBuilder () << it->second .second );
135
+ }
136
+ }
137
+ }
97
138
}
98
139
99
140
@@ -845,32 +886,7 @@ namespace NKikimr::NDataStreams::V1 {
845
886
break ;
846
887
} else {
847
888
auto * shard = description.add_shards ();
848
- shard->set_shard_id (shardName);
849
-
850
- const auto & parents = partition.GetParentPartitionIds ();
851
- if (parents.size () > 0 ) {
852
- shard->set_parent_shard_id (GetShardName (parents[0 ]));
853
- }
854
- if (parents.size () > 1 ) {
855
- shard->set_adjacent_parent_shard_id (GetShardName (parents[1 ]));
856
- }
857
-
858
- auto * rangeProto = shard->mutable_hash_key_range ();
859
- if (NPQ::SplitMergeEnabled (pqConfig)) {
860
- NYql::NDecimal::TUint128 from = partition.HasKeyRange () && partition.GetKeyRange ().HasFromBound () ? NPQ::AsInt<NYql::NDecimal::TUint128>(partition.GetKeyRange ().GetFromBound ()) + 1 : 0 ;
861
- NYql::NDecimal::TUint128 to = partition.HasKeyRange () && partition.GetKeyRange ().HasToBound () ? NPQ::AsInt<NYql::NDecimal::TUint128>(partition.GetKeyRange ().GetToBound ()): -1 ;
862
- rangeProto->set_starting_hash_key (Uint128ToDecimalString (from));
863
- rangeProto->set_ending_hash_key (Uint128ToDecimalString (to));
864
- } else {
865
- auto range = RangeFromShardNumber (partitionId, PQGroup.GetPartitions ().size ());
866
- rangeProto->set_starting_hash_key (Uint128ToDecimalString (range.Start ));
867
- rangeProto->set_ending_hash_key (Uint128ToDecimalString (range.End ));
868
- }
869
- auto it = StartEndOffsetsPerPartition.find (partitionId);
870
- if (it != StartEndOffsetsPerPartition.end ()) {
871
- auto * rangeProto = shard->mutable_sequence_number_range ();
872
- rangeProto->set_starting_sequence_number (TStringBuilder () << it->second .first );
873
- }
889
+ SetShardProperties (shard, partition, NPQ::SplitMergeEnabled (pqConfig), PQGroup.GetPartitions ().size (), StartEndOffsetsPerPartition);
874
890
}
875
891
}
876
892
}
@@ -1754,6 +1770,7 @@ namespace NKikimr::NDataStreams::V1 {
1754
1770
std::vector<NKikimrSchemeOp::TPersQueueGroupDescription::TPartition> Shards;
1755
1771
ui32 LeftToRead = 0 ;
1756
1772
ui32 AllShardsCount = 0 ;
1773
+ bool AutoPartitioningEnabled = false ;
1757
1774
std::atomic<ui32> GotOffsetResponds;
1758
1775
std::vector<TActorId> Pipes;
1759
1776
};
@@ -1847,7 +1864,8 @@ namespace NKikimr::NDataStreams::V1 {
1847
1864
}
1848
1865
1849
1866
using TPartition = NKikimrSchemeOp::TPersQueueGroupDescription::TPartition;
1850
- const auto & partitions = topicInfo.PQGroupInfo ->Description .GetPartitions ();
1867
+ const auto & description = topicInfo.PQGroupInfo ->Description ;
1868
+ const auto & partitions = description.GetPartitions ();
1851
1869
TString startingShardId = this ->GetProtoRequest ()->Getexclusive_start_shard_id ();
1852
1870
ui64 startingTimepoint{0 };
1853
1871
bool onlyOpenShards{true };
@@ -1895,6 +1913,8 @@ namespace NKikimr::NDataStreams::V1 {
1895
1913
}}
1896
1914
};
1897
1915
1916
+ AutoPartitioningEnabled = NPQ::SplitMergeEnabled (description.GetPQTabletConfig ());
1917
+
1898
1918
const auto alreadyRead = NextToken.GetAlreadyRead ();
1899
1919
if (alreadyRead > (ui32)partitions.size ()) {
1900
1920
return ReplyWithError (Ydb::StatusIds::BAD_REQUEST, static_cast <size_t >(NYds::EErrorCodes::INVALID_ARGUMENT),
@@ -1970,21 +1990,7 @@ namespace NKikimr::NDataStreams::V1 {
1970
1990
void TListShardsActor::SendResponse (const TActorContext& ctx) {
1971
1991
Ydb::DataStreams::V1::ListShardsResult result;
1972
1992
for (auto & shard : Shards) {
1973
- auto awsShard = result.Addshards ();
1974
- // TODO:
1975
- // awsShard->set_parent_shard_id("");
1976
- // awsShard->set_adjacent_parent_shard_id(prevShardName);
1977
- auto range = RangeFromShardNumber (shard.GetPartitionId (), AllShardsCount);
1978
- awsShard->mutable_hash_key_range ()->set_starting_hash_key (
1979
- Uint128ToDecimalString (range.Start ));
1980
- awsShard->mutable_hash_key_range ()->set_ending_hash_key (
1981
- Uint128ToDecimalString (range.End ));
1982
- awsShard->mutable_sequence_number_range ()->set_starting_sequence_number (
1983
- std::to_string (StartEndOffsetsPerPartition[shard.GetPartitionId ()].first ));
1984
- // TODO: fill it only for closed partitions
1985
- // awsShard->mutable_sequence_number_range()->set_ending_sequence_number(
1986
- // std::to_string(StartEndOffsetsPerPartition[shard.GetPartitionId()].second));
1987
- awsShard->set_shard_id (GetShardName (shard.GetPartitionId ()));
1993
+ SetShardProperties (result.Addshards (), shard, AutoPartitioningEnabled, AllShardsCount, StartEndOffsetsPerPartition);
1988
1994
}
1989
1995
if (LeftToRead > 0 ) {
1990
1996
TNextToken token (StreamName, NextToken.GetAlreadyRead () + Shards.size (), MaxResults, TInstant::Now ().MilliSeconds ());
0 commit comments