Skip to content

Commit e90285a

Browse files
save the WriteId + PartitionId (#1037)
1 parent 829da8b commit e90285a

File tree

8 files changed

+570
-72
lines changed

8 files changed

+570
-72
lines changed

ydb/core/persqueue/events/internal.h

+3-3
Original file line numberDiff line numberDiff line change
@@ -1029,7 +1029,7 @@ struct TEvPQ {
10291029
{
10301030
}
10311031

1032-
ui32 Cookie; // ShadowPartitionId
1032+
ui32 Cookie; // InternalPartitionId
10331033
};
10341034

10351035
struct TEvGetWriteInfoResponse : public TEventLocal<TEvGetWriteInfoResponse, EvGetWriteInfoResponse> {
@@ -1044,14 +1044,14 @@ struct TEvPQ {
10441044
{
10451045
}
10461046

1047-
ui32 Cookie; // ShadowPartitionId
1047+
ui32 Cookie; // InternalPartitionId
10481048
THashMap<TString, NPQ::TSeqNoRange> SeqNo; // SourceId -> (MinSeqNo, MaxSeqNo)
10491049
std::deque<NPQ::TDataKey> BodyKeys;
10501050
TVector<NPQ::TClientBlob> Head;
10511051
};
10521052

10531053
struct TEvGetWriteInfoError : public TEventLocal<TEvGetWriteInfoError, EvGetWriteInfoError> {
1054-
ui32 Cookie; // ShadowPartitionId
1054+
ui32 Cookie; // InternalPartitionId
10551055
TString Message;
10561056

10571057
TEvGetWriteInfoError(ui32 cookie, TString message) :

ydb/core/persqueue/partition_id.h

+5-6
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@ namespace NKikimr::NPQ {
1313
class TPartitionId {
1414
public:
1515
TPartitionId() = default;
16+
1617
explicit TPartitionId(ui32 partition) :
17-
TPartitionId(partition, Nothing(), partition)
18+
OriginalPartitionId(partition),
19+
InternalPartitionId(partition)
1820
{
1921
}
2022

@@ -46,12 +48,9 @@ class TPartitionId {
4648
}
4749
}
4850

49-
//
50-
// FIXME: используется в RequestRange
51-
//
52-
TPartitionId NextInternalPartitionId() const
51+
bool IsSupportivePartition() const
5352
{
54-
return {OriginalPartitionId, WriteId, InternalPartitionId + 1};
53+
return WriteId.Defined();
5554
}
5655

5756
ui32 OriginalPartitionId = 0;

ydb/core/persqueue/partition_init.cpp

+23-9
Original file line numberDiff line numberDiff line change
@@ -990,32 +990,46 @@ bool DiskIsFull(TEvKeyValue::TEvResponse::TPtr& ev) {
990990
return !diskIsOk;
991991
}
992992

993+
static std::pair<TKeyPrefix, TKeyPrefix> MakeKeyPrefixRange(TKeyPrefix::EType type, const TPartitionId& partition)
994+
{
995+
TKeyPrefix from(type, partition);
996+
TKeyPrefix to(type, TPartitionId(partition.OriginalPartitionId, partition.WriteId, partition.InternalPartitionId + 1));
997+
998+
return {std::move(from), std::move(to)};
999+
}
1000+
9931001
static void RequestRange(const TActorContext& ctx, const TActorId& dst, const TPartitionId& partition,
9941002
TKeyPrefix::EType c, bool includeData = false, const TString& key = "", bool dropTmp = false) {
9951003
THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
996-
auto read = request->Record.AddCmdReadRange();
997-
auto range = read->MutableRange();
998-
TKeyPrefix from(c, partition);
1004+
1005+
auto keyPrefixes = MakeKeyPrefixRange(c, partition);
1006+
TKeyPrefix& from = keyPrefixes.first;
1007+
const TKeyPrefix& to = keyPrefixes.second;
1008+
9991009
if (!key.empty()) {
10001010
Y_ABORT_UNLESS(key.StartsWith(TStringBuf(from.Data(), from.Size())));
10011011
from.Clear();
10021012
from.Append(key.data(), key.size());
10031013
}
1004-
range->SetFrom(from.Data(), from.Size());
10051014

1006-
TKeyPrefix to(c, partition.NextInternalPartitionId());
1015+
auto read = request->Record.AddCmdReadRange();
1016+
auto range = read->MutableRange();
1017+
1018+
range->SetFrom(from.Data(), from.Size());
10071019
range->SetTo(to.Data(), to.Size());
10081020

1009-
if(includeData)
1021+
if (includeData)
10101022
read->SetIncludeData(true);
10111023

10121024
if (dropTmp) {
1025+
keyPrefixes = MakeKeyPrefixRange(TKeyPrefix::TypeTmpData, partition);
1026+
const TKeyPrefix& from = keyPrefixes.first;
1027+
const TKeyPrefix& to = keyPrefixes.second;
1028+
10131029
auto del = request->Record.AddCmdDeleteRange();
10141030
auto range = del->MutableRange();
1015-
TKeyPrefix from(TKeyPrefix::TypeTmpData, partition);
1016-
range->SetFrom(from.Data(), from.Size());
10171031

1018-
TKeyPrefix to(TKeyPrefix::TypeTmpData, partition.NextInternalPartitionId());
1032+
range->SetFrom(from.Data(), from.Size());
10191033
range->SetTo(to.Data(), to.Size());
10201034
}
10211035

0 commit comments

Comments
 (0)