Skip to content

type TPartitionId #1472

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/persqueue/account_read_quoter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ TAccountReadQuoter::TAccountReadQuoter(
TActorId recepient,
ui64 tabletId,
const NPersQueue::TTopicConverterPtr& topicConverter,
ui32 partition,
const TPartitionId& partition,
const TString& user,
const TTabletCountersBase& counters
)
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/persqueue/account_read_quoter.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class TAccountReadQuoter : public TActorBootstrapped<TAccountReadQuoter> {
TActorId recepient,
ui64 tabletId,
const NPersQueue::TTopicConverterPtr& topicConverter,
ui32 partition,
const TPartitionId& partition,
const TString& user,
const TTabletCountersBase& counters
);
Expand All @@ -112,7 +112,7 @@ class TAccountReadQuoter : public TActorBootstrapped<TAccountReadQuoter> {
const TActorId Recepient;
const ui64 TabletId;
const NPersQueue::TTopicConverterPtr TopicConverter;
const ui32 Partition;
const TPartitionId Partition;
const TString User;
const TString ConsumerPath;
const ui64 ReadCreditBytes;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/blob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ TPartitionedBlob::TPartitionedBlob(const TPartitionedBlob& x)
, MaxBlobSize(x.MaxBlobSize)
{}

TPartitionedBlob::TPartitionedBlob(const ui32 partition, const ui64 offset, const TString& sourceId, const ui64 seqNo, const ui16 totalParts,
TPartitionedBlob::TPartitionedBlob(const TPartitionId& partition, const ui64 offset, const TString& sourceId, const ui64 seqNo, const ui16 totalParts,
const ui32 totalSize, THead& head, THead& newHead, bool headCleared, bool needCompactHead, const ui32 maxBlobSize)
: Partition(partition)
, Offset(offset)
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/persqueue/blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ class TPartitionedBlob {

TPartitionedBlob(const TPartitionedBlob& x);

TPartitionedBlob(const ui32 partition, const ui64 offset, const TString& sourceId, const ui64 seqNo,
TPartitionedBlob(const TPartitionId& partition, const ui64 offset, const TString& sourceId, const ui64 seqNo,
const ui16 totalParts, const ui32 totalSize, THead& head, THead& newHead, bool headCleared, bool needCompactHead, const ui32 maxBlobSize);

std::optional<std::pair<TKey, TString>> Add(TClientBlob&& blob);
Expand All @@ -287,7 +287,7 @@ class TPartitionedBlob {
TString CompactHead(bool glueHead, THead& head, bool glueNewHead, THead& newHead, ui32 estimatedSize);

private:
ui32 Partition;
TPartitionId Partition;
ui64 Offset;
ui16 InternalPartsCount;
ui64 StartOffset;
Expand Down
22 changes: 11 additions & 11 deletions ydb/core/persqueue/cache_eviction.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,19 @@ namespace NPQ {
ERequestType Type;
TActorId Sender;
ui64 CookiePQ;
ui32 Partition;
TPartitionId Partition;
ui32 MetadataWritesCount;
TVector<TRequestedBlob> Blobs;

TKvRequest(ERequestType type, TActorId sender, ui64 cookie, ui32 partition)
TKvRequest(ERequestType type, TActorId sender, ui64 cookie, const TPartitionId& partition)
: Type(type)
, Sender(sender)
, CookiePQ(cookie)
, Partition(partition)
, MetadataWritesCount(0)
{}

TBlobId GetBlobId(ui32 pos) const { return TBlobId(Partition, Blobs[pos].Offset, Blobs[pos].PartNo, Blobs[pos].Count, Blobs[pos].InternalPartsCount); }
TBlobId GetBlobId(ui32 pos) const { return TBlobId(Partition.InternalPartitionId, Blobs[pos].Offset, Blobs[pos].PartNo, Blobs[pos].Count, Blobs[pos].InternalPartsCount); }

THolder<TEvKeyValue::TEvRequest> MakeKvRequest() const
{
Expand Down Expand Up @@ -110,7 +110,7 @@ namespace NPQ {
}

void Verify(const TRequestedBlob& blob) const {
TKey key(TKeyPrefix::TypeData, 0, blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount, false);
TKey key(TKeyPrefix::TypeData, TPartitionId(0), blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount, false);
Y_ABORT_UNLESS(blob.Value.size() == blob.Size);
TClientBlob::CheckBlob(key, blob.Value);
}
Expand Down Expand Up @@ -258,7 +258,7 @@ namespace NPQ {

for (const auto& blob : kvReq.Blobs) {
// Touching blobs in L2. We don't need data here
TCacheBlobL2 key = {kvReq.Partition, blob.Offset, blob.PartNo, nullptr};
TCacheBlobL2 key = {kvReq.Partition.InternalPartitionId, blob.Offset, blob.PartNo, nullptr};
if (blob.Cached)
reqData->RequestedBlobs.push_back(key);
else
Expand All @@ -275,10 +275,10 @@ namespace NPQ {
THolder<TCacheL2Request> reqData = MakeHolder<TCacheL2Request>(TabletId);

for (const TRequestedBlob& reqBlob : kvReq.Blobs) {
TBlobId blob(kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount);
TBlobId blob(kvReq.Partition.InternalPartitionId, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount);
{ // there could be a new blob with same id (for big messages)
if (RemoveExists(ctx, blob)) {
TCacheBlobL2 removed = {kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, nullptr};
TCacheBlobL2 removed = {kvReq.Partition.InternalPartitionId, reqBlob.Offset, reqBlob.PartNo, nullptr};
reqData->RemovedBlobs.push_back(removed);
}
}
Expand All @@ -290,7 +290,7 @@ namespace NPQ {
if (L1Strategy)
L1Strategy->SaveHeadBlob(blob);

TCacheBlobL2 blobL2 = {kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, cached};
TCacheBlobL2 blobL2 = {kvReq.Partition.InternalPartitionId, reqBlob.Offset, reqBlob.PartNo, cached};
reqData->StoredBlobs.push_back(blobL2);

LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Caching head blob in L1. Partition "
Expand All @@ -314,7 +314,7 @@ namespace NPQ {
continue;

const TRequestedBlob& reqBlob = kvReq.Blobs[i];
TBlobId blob(kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount);
TBlobId blob(kvReq.Partition.InternalPartitionId, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount);
{
TValueL1 value;
if (CheckExists(ctx, blob, value)) {
Expand All @@ -328,7 +328,7 @@ namespace NPQ {
Cache[blob] = valL1; // weak
Counters.Inc(valL1);

TCacheBlobL2 blobL2 = {kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, cached};
TCacheBlobL2 blobL2 = {kvReq.Partition.InternalPartitionId, reqBlob.Offset, reqBlob.PartNo, cached};
reqData->StoredBlobs.push_back(blobL2);

LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Prefetched blob in L1. Partition "
Expand Down Expand Up @@ -428,7 +428,7 @@ namespace NPQ {
++numCached;
continue;
}
TBlobId blobId(kvReq.Partition, blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount);
TBlobId blobId(kvReq.Partition.InternalPartitionId, blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount);
TCacheValue::TPtr cached = GetValue(ctx, blobId);
if (cached) {
++numCached;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/event_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ void ReplyPersQueueError(
const TActorContext& ctx,
ui64 tabletId,
const TString& topicName,
TMaybe<ui32> partition,
TMaybe<TPartitionId> partition,
NKikimr::TTabletCountersBase& counters,
NKikimrServices::EServiceKikimr service,
const ui64 responseCookie,
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/persqueue/event_helpers.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include "partition_id.h"

#include <ydb/core/tablet/tablet_counters.h>
#include <ydb/library/services/services.pb.h>
#include <ydb/public/api/protos/draft/persqueue_error_codes.pb.h>
Expand All @@ -14,7 +16,7 @@ void ReplyPersQueueError(
const TActorContext& ctx,
ui64 tabletId,
const TString& topicName,
TMaybe<ui32> partition,
TMaybe<TPartitionId> partition,
NKikimr::TTabletCountersBase& counters,
NKikimrServices::EServiceKikimr service,
const ui64 responseCookie,
Expand Down
55 changes: 31 additions & 24 deletions ydb/core/persqueue/events/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,6 @@ struct TEvPQ {
EvQuotaCountersUpdated,
EvConsumerRemoved,
EvFetchResponse,
EvSourceIdRequest,
EvSourceIdResponse,
EvPublishRead,
EvForgetRead,
EvRegisterDirectReadSession,
Expand Down Expand Up @@ -313,13 +311,18 @@ struct TEvPQ {
};

struct TEvMonResponse : public TEventLocal<TEvMonResponse, EvMonResponse> {
TEvMonResponse(ui32 partition, const TVector<TString>& res, const TString& str)
TEvMonResponse(const NPQ::TPartitionId& partition, const TVector<TString>& res, const TString& str)
: Partition(partition)
, Res(res)
, Str(str)
{}

ui32 Partition;
TEvMonResponse(const TVector<TString>& res, const TString& str)
: Res(res)
, Str(str)
{}

TMaybe<NPQ::TPartitionId> Partition;
TVector<TString> Res;
TString Str;
};
Expand Down Expand Up @@ -398,11 +401,13 @@ struct TEvPQ {
};

struct TEvPartitionOffsetsResponse : public TEventLocal<TEvPartitionOffsetsResponse, EvPartitionOffsetsResponse> {
explicit TEvPartitionOffsetsResponse(NKikimrPQ::TOffsetsResponse::TPartResult& partResult)
TEvPartitionOffsetsResponse(NKikimrPQ::TOffsetsResponse::TPartResult& partResult, const NPQ::TPartitionId& partition)
: PartResult(partResult)
, Partition(partition)
{}

NKikimrPQ::TOffsetsResponse::TPartResult PartResult;
NPQ::TPartitionId Partition;
};

struct TEvPartitionStatus : public TEventLocal<TEvPartitionStatus, EvPartitionStatus> {
Expand All @@ -424,11 +429,13 @@ struct TEvPQ {
};

struct TEvPartitionStatusResponse : public TEventLocal<TEvPartitionStatusResponse, EvPartitionStatusResponse> {
explicit TEvPartitionStatusResponse(NKikimrPQ::TStatusResponse::TPartResult& partResult)
TEvPartitionStatusResponse(NKikimrPQ::TStatusResponse::TPartResult& partResult, const NPQ::TPartitionId& partition)
: PartResult(partResult)
, Partition(partition)
{}

NKikimrPQ::TStatusResponse::TPartResult PartResult;
NPQ::TPartitionId Partition;
};


Expand All @@ -442,11 +449,11 @@ struct TEvPQ {
};

struct TEvInitComplete : public TEventLocal<TEvInitComplete, EvInitComplete> {
explicit TEvInitComplete(const ui32 partition)
explicit TEvInitComplete(const NPQ::TPartitionId& partition)
: Partition(partition)
{}

ui32 Partition;
NPQ::TPartitionId Partition;
};

struct TEvError : public TEventLocal<TEvError, EvError> {
Expand All @@ -462,7 +469,7 @@ struct TEvPQ {
};

struct TEvBlobRequest : public TEventLocal<TEvBlobRequest, EvBlobRequest> {
TEvBlobRequest(const TString& user, const ui64 cookie, const ui32 partition, const ui64 readOffset,
TEvBlobRequest(const TString& user, const ui64 cookie, const NPQ::TPartitionId& partition, const ui64 readOffset,
TVector<NPQ::TRequestedBlob>&& blobs)
: User(user)
, Cookie(cookie)
Expand All @@ -473,7 +480,7 @@ struct TEvPQ {

TString User;
ui64 Cookie;
ui32 Partition;
NPQ::TPartitionId Partition;
ui64 ReadOffset;
TVector<NPQ::TRequestedBlob> Blobs;
};
Expand Down Expand Up @@ -565,12 +572,12 @@ struct TEvPQ {
};

struct TEvPartitionConfigChanged : public TEventLocal<TEvPartitionConfigChanged, EvPartitionConfigChanged> {
explicit TEvPartitionConfigChanged(ui32 partition) :
explicit TEvPartitionConfigChanged(const NPQ::TPartitionId& partition) :
Partition(partition)
{
}

ui32 Partition;
NPQ::TPartitionId Partition;
};

struct TEvChangeCacheConfig : public TEventLocal<TEvChangeCacheConfig, EvChangeCacheConfig> {
Expand All @@ -588,35 +595,35 @@ struct TEvPQ {
};

struct TEvPartitionCounters : public TEventLocal<TEvPartitionCounters, EvPartitionCounters> {
TEvPartitionCounters(const ui32 partition, const TTabletCountersBase& counters)
TEvPartitionCounters(const NPQ::TPartitionId& partition, const TTabletCountersBase& counters)
: Partition(partition)
{
Counters.Populate(counters);
}

const ui32 Partition;
const NPQ::TPartitionId Partition;
TTabletCountersBase Counters;
};

struct TEvPartitionLabeledCounters : public TEventLocal<TEvPartitionLabeledCounters, EvPartitionLabeledCounters> {
TEvPartitionLabeledCounters(const ui32 partition, const TTabletLabeledCountersBase& labeledCounters)
TEvPartitionLabeledCounters(const NPQ::TPartitionId& partition, const TTabletLabeledCountersBase& labeledCounters)
: Partition(partition)
, LabeledCounters(labeledCounters)
{
}

const ui32 Partition;
const NPQ::TPartitionId Partition;
TTabletLabeledCountersBase LabeledCounters;
};

struct TEvPartitionLabeledCountersDrop : public TEventLocal<TEvPartitionLabeledCountersDrop, EvPartitionLabeledCountersDrop> {
TEvPartitionLabeledCountersDrop(const ui32 partition, const TString& group)
TEvPartitionLabeledCountersDrop(const NPQ::TPartitionId& partition, const TString& group)
: Partition(partition)
, Group(group)
{
}

const ui32 Partition;
const NPQ::TPartitionId Partition;
TString Group;
};

Expand Down Expand Up @@ -798,7 +805,7 @@ struct TEvPQ {
};

struct TEvTxCalcPredicateResult : public TEventLocal<TEvTxCalcPredicateResult, EvTxCalcPredicateResult> {
TEvTxCalcPredicateResult(ui64 step, ui64 txId, ui32 partition, bool predicate) :
TEvTxCalcPredicateResult(ui64 step, ui64 txId, const NPQ::TPartitionId& partition, bool predicate) :
Step(step),
TxId(txId),
Partition(partition),
Expand All @@ -808,7 +815,7 @@ struct TEvPQ {

ui64 Step;
ui64 TxId;
ui32 Partition;
NPQ::TPartitionId Partition;
bool Predicate = false;
};

Expand All @@ -826,7 +833,7 @@ struct TEvPQ {
};

struct TEvProposePartitionConfigResult : public TEventLocal<TEvProposePartitionConfigResult, EvProposePartitionConfigResult> {
TEvProposePartitionConfigResult(ui64 step, ui64 txId, ui32 partition) :
TEvProposePartitionConfigResult(ui64 step, ui64 txId, const NPQ::TPartitionId& partition) :
Step(step),
TxId(txId),
Partition(partition)
Expand All @@ -835,7 +842,7 @@ struct TEvPQ {

ui64 Step;
ui64 TxId;
ui32 Partition;
NPQ::TPartitionId Partition;
};

struct TEvTxCommit : public TEventLocal<TEvTxCommit, EvTxCommit> {
Expand All @@ -850,7 +857,7 @@ struct TEvPQ {
};

struct TEvTxCommitDone : public TEventLocal<TEvTxCommitDone, EvTxCommitDone> {
TEvTxCommitDone(ui64 step, ui64 txId, ui32 partition) :
TEvTxCommitDone(ui64 step, ui64 txId, const NPQ::TPartitionId& partition) :
Step(step),
TxId(txId),
Partition(partition)
Expand All @@ -859,7 +866,7 @@ struct TEvPQ {

ui64 Step;
ui64 TxId;
ui32 Partition;
NPQ::TPartitionId Partition;
};

struct TEvTxRollback : public TEventLocal<TEvTxRollback, EvTxRollback> {
Expand Down
Loading