Skip to content

Commit d6acae1

Browse files
authored
Set ending_sequence_number for inactive partitions of datashard (#9636) (#9668)
1 parent b678673 commit d6acae1

File tree

2 files changed

+63
-42
lines changed

2 files changed

+63
-42
lines changed

ydb/services/datastreams/datastreams_proxy.cpp

Lines changed: 48 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,47 @@ namespace NKikimr::NDataStreams::V1 {
9494

9595
return {};
9696
}
97+
98+
void SetShardProperties(::Ydb::DataStreams::V1::Shard* shard,
99+
const ::NKikimrSchemeOp::TPersQueueGroupDescription_TPartition& partition,
100+
const bool autoPartitioningEnabled,
101+
const size_t allShardsCount,
102+
const std::map<ui64, std::pair<ui64, ui64>>& offsets) {
103+
shard->set_shard_id(GetShardName(partition.GetPartitionId()));
104+
105+
106+
const auto& parents = partition.GetParentPartitionIds();
107+
if (parents.size() > 0) {
108+
shard->set_parent_shard_id(GetShardName(parents[0]));
109+
}
110+
if (parents.size() > 1) {
111+
shard->set_adjacent_parent_shard_id(GetShardName(parents[1]));
112+
}
113+
114+
auto* rangeProto = shard->mutable_hash_key_range();
115+
if (autoPartitioningEnabled) {
116+
NYql::NDecimal::TUint128 from = partition.HasKeyRange() && partition.GetKeyRange().HasFromBound()
117+
? NPQ::AsInt<NYql::NDecimal::TUint128>(partition.GetKeyRange().GetFromBound()) + 1: 0;
118+
NYql::NDecimal::TUint128 to = partition.HasKeyRange() && partition.GetKeyRange().HasToBound()
119+
? NPQ::AsInt<NYql::NDecimal::TUint128>(partition.GetKeyRange().GetToBound()): -1;
120+
rangeProto->set_starting_hash_key(Uint128ToDecimalString(from));
121+
rangeProto->set_ending_hash_key(Uint128ToDecimalString(to));
122+
} else {
123+
auto range = RangeFromShardNumber(partition.GetPartitionId(), allShardsCount);
124+
rangeProto->set_starting_hash_key(Uint128ToDecimalString(range.Start));
125+
rangeProto->set_ending_hash_key(Uint128ToDecimalString(range.End));
126+
}
127+
128+
auto it = offsets.find(partition.GetPartitionId());
129+
if (it != offsets.end()) {
130+
auto* rangeProto = shard->mutable_sequence_number_range();
131+
rangeProto->set_starting_sequence_number(TStringBuilder() << it->second.first);
132+
133+
if (::NKikimrPQ::ETopicPartitionStatus::Active != partition.GetStatus()) {
134+
rangeProto->set_ending_sequence_number(TStringBuilder() << it->second.second);
135+
}
136+
}
137+
}
97138
}
98139

99140

@@ -845,32 +886,7 @@ namespace NKikimr::NDataStreams::V1 {
845886
break;
846887
} else {
847888
auto* shard = description.add_shards();
848-
shard->set_shard_id(shardName);
849-
850-
const auto& parents = partition.GetParentPartitionIds();
851-
if (parents.size() > 0) {
852-
shard->set_parent_shard_id(GetShardName(parents[0]));
853-
}
854-
if (parents.size() > 1) {
855-
shard->set_adjacent_parent_shard_id(GetShardName(parents[1]));
856-
}
857-
858-
auto* rangeProto = shard->mutable_hash_key_range();
859-
if (NPQ::SplitMergeEnabled(pqConfig)) {
860-
NYql::NDecimal::TUint128 from = partition.HasKeyRange() && partition.GetKeyRange().HasFromBound() ? NPQ::AsInt<NYql::NDecimal::TUint128>(partition.GetKeyRange().GetFromBound()) + 1: 0;
861-
NYql::NDecimal::TUint128 to = partition.HasKeyRange() && partition.GetKeyRange().HasToBound() ? NPQ::AsInt<NYql::NDecimal::TUint128>(partition.GetKeyRange().GetToBound()): -1;
862-
rangeProto->set_starting_hash_key(Uint128ToDecimalString(from));
863-
rangeProto->set_ending_hash_key(Uint128ToDecimalString(to));
864-
} else {
865-
auto range = RangeFromShardNumber(partitionId, PQGroup.GetPartitions().size());
866-
rangeProto->set_starting_hash_key(Uint128ToDecimalString(range.Start));
867-
rangeProto->set_ending_hash_key(Uint128ToDecimalString(range.End));
868-
}
869-
auto it = StartEndOffsetsPerPartition.find(partitionId);
870-
if (it != StartEndOffsetsPerPartition.end()) {
871-
auto* rangeProto = shard->mutable_sequence_number_range();
872-
rangeProto->set_starting_sequence_number(TStringBuilder() << it->second.first);
873-
}
889+
SetShardProperties(shard, partition, NPQ::SplitMergeEnabled(pqConfig), PQGroup.GetPartitions().size(), StartEndOffsetsPerPartition);
874890
}
875891
}
876892
}
@@ -1754,6 +1770,7 @@ namespace NKikimr::NDataStreams::V1 {
17541770
std::vector<NKikimrSchemeOp::TPersQueueGroupDescription::TPartition> Shards;
17551771
ui32 LeftToRead = 0;
17561772
ui32 AllShardsCount = 0;
1773+
bool AutoPartitioningEnabled = false;
17571774
std::atomic<ui32> GotOffsetResponds;
17581775
std::vector<TActorId> Pipes;
17591776
};
@@ -1847,7 +1864,8 @@ namespace NKikimr::NDataStreams::V1 {
18471864
}
18481865

18491866
using TPartition = NKikimrSchemeOp::TPersQueueGroupDescription::TPartition;
1850-
const auto& partitions = topicInfo.PQGroupInfo->Description.GetPartitions();
1867+
const auto& description = topicInfo.PQGroupInfo->Description;
1868+
const auto& partitions = description.GetPartitions();
18511869
TString startingShardId = this->GetProtoRequest()->Getexclusive_start_shard_id();
18521870
ui64 startingTimepoint{0};
18531871
bool onlyOpenShards{true};
@@ -1895,6 +1913,8 @@ namespace NKikimr::NDataStreams::V1 {
18951913
}}
18961914
};
18971915

1916+
AutoPartitioningEnabled = NPQ::SplitMergeEnabled(description.GetPQTabletConfig());
1917+
18981918
const auto alreadyRead = NextToken.GetAlreadyRead();
18991919
if (alreadyRead > (ui32)partitions.size()) {
19001920
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT),
@@ -1970,21 +1990,7 @@ namespace NKikimr::NDataStreams::V1 {
19701990
void TListShardsActor::SendResponse(const TActorContext& ctx) {
19711991
Ydb::DataStreams::V1::ListShardsResult result;
19721992
for (auto& shard : Shards) {
1973-
auto awsShard = result.Addshards();
1974-
// TODO:
1975-
// awsShard->set_parent_shard_id("");
1976-
// awsShard->set_adjacent_parent_shard_id(prevShardName);
1977-
auto range = RangeFromShardNumber(shard.GetPartitionId(), AllShardsCount);
1978-
awsShard->mutable_hash_key_range()->set_starting_hash_key(
1979-
Uint128ToDecimalString(range.Start));
1980-
awsShard->mutable_hash_key_range()->set_ending_hash_key(
1981-
Uint128ToDecimalString(range.End));
1982-
awsShard->mutable_sequence_number_range()->set_starting_sequence_number(
1983-
std::to_string(StartEndOffsetsPerPartition[shard.GetPartitionId()].first));
1984-
//TODO: fill it only for closed partitions
1985-
//awsShard->mutable_sequence_number_range()->set_ending_sequence_number(
1986-
// std::to_string(StartEndOffsetsPerPartition[shard.GetPartitionId()].second));
1987-
awsShard->set_shard_id(GetShardName(shard.GetPartitionId()));
1993+
SetShardProperties(result.Addshards(), shard, AutoPartitioningEnabled, AllShardsCount, StartEndOffsetsPerPartition);
19881994
}
19891995
if (LeftToRead > 0) {
19901996
TNextToken token(StreamName, NextToken.GetAlreadyRead() + Shards.size(), MaxResults, TInstant::Now().MilliSeconds());

ydb/services/datastreams/datastreams_ut.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2756,6 +2756,18 @@ Y_UNIT_TEST_SUITE(DataStreams) {
27562756
UNIT_ASSERT_VALUES_EQUAL(description.partitioning_settings().auto_partitioning_settings().partition_write_speed().down_utilization_percent(), 13);
27572757
}
27582758

2759+
{
2760+
std::vector<NYDS_V1::TDataRecord> records;
2761+
for (ui32 i = 1; i <= 30; ++i) {
2762+
TString data = Sprintf("%04u", i);
2763+
records.push_back({data, data, ""});
2764+
}
2765+
auto result = testServer.DataStreamsClient->PutRecords(streamName, records).ExtractValueSync();
2766+
Cerr << result.GetResult().DebugString() << Endl;
2767+
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
2768+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2769+
}
2770+
27592771
{
27602772
ui64 txId = 107;
27612773
SplitPartition(*kikimr->GetRuntime(), txId, 1, "a");
@@ -2776,8 +2788,11 @@ Y_UNIT_TEST_SUITE(DataStreams) {
27762788

27772789
UNIT_ASSERT_VALUES_EQUAL(description.shards().size(), 5);
27782790
UNIT_ASSERT_VALUES_EQUAL(description.shards(0).sequence_number_range().starting_sequence_number(), "0");
2791+
UNIT_ASSERT_VALUES_EQUAL(description.shards(0).sequence_number_range().ending_sequence_number(), "");
27792792
UNIT_ASSERT_VALUES_EQUAL(description.shards(0).hash_key_range().starting_hash_key(), "0");
27802793
UNIT_ASSERT_VALUES_EQUAL(description.shards(0).hash_key_range().ending_hash_key(), "113427455640312821154458202477256070484");
2794+
UNIT_ASSERT_VALUES_EQUAL(description.shards(1).sequence_number_range().starting_sequence_number(), "0");
2795+
UNIT_ASSERT_VALUES_EQUAL(description.shards(1).sequence_number_range().ending_sequence_number(), "8");
27812796
UNIT_ASSERT_VALUES_EQUAL(description.shards(1).hash_key_range().starting_hash_key(), "113427455640312821154458202477256070485");
27822797
UNIT_ASSERT_VALUES_EQUAL(description.shards(1).hash_key_range().ending_hash_key(), "226854911280625642308916404954512140969");
27832798
UNIT_ASSERT_VALUES_EQUAL(description.shards(2).hash_key_range().starting_hash_key(), "226854911280625642308916404954512140970");

0 commit comments

Comments
 (0)