Skip to content

Commit 922d72d

Browse files
type TPartitionId (#1472)
1 parent ec89d22 commit 922d72d

36 files changed

+353
-204
lines changed

ydb/core/persqueue/account_read_quoter.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ TAccountReadQuoter::TAccountReadQuoter(
2929
TActorId recepient,
3030
ui64 tabletId,
3131
const NPersQueue::TTopicConverterPtr& topicConverter,
32-
ui32 partition,
32+
const TPartitionId& partition,
3333
const TString& user,
3434
const TTabletCountersBase& counters
3535
)

ydb/core/persqueue/account_read_quoter.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ class TAccountReadQuoter : public TActorBootstrapped<TAccountReadQuoter> {
8989
TActorId recepient,
9090
ui64 tabletId,
9191
const NPersQueue::TTopicConverterPtr& topicConverter,
92-
ui32 partition,
92+
const TPartitionId& partition,
9393
const TString& user,
9494
const TTabletCountersBase& counters
9595
);
@@ -112,7 +112,7 @@ class TAccountReadQuoter : public TActorBootstrapped<TAccountReadQuoter> {
112112
const TActorId Recepient;
113113
const ui64 TabletId;
114114
const NPersQueue::TTopicConverterPtr TopicConverter;
115-
const ui32 Partition;
115+
const TPartitionId Partition;
116116
const TString User;
117117
const TString ConsumerPath;
118118
const ui64 ReadCreditBytes;

ydb/core/persqueue/blob.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -736,7 +736,7 @@ TPartitionedBlob::TPartitionedBlob(const TPartitionedBlob& x)
736736
, MaxBlobSize(x.MaxBlobSize)
737737
{}
738738

739-
TPartitionedBlob::TPartitionedBlob(const ui32 partition, const ui64 offset, const TString& sourceId, const ui64 seqNo, const ui16 totalParts,
739+
TPartitionedBlob::TPartitionedBlob(const TPartitionId& partition, const ui64 offset, const TString& sourceId, const ui64 seqNo, const ui16 totalParts,
740740
const ui32 totalSize, THead& head, THead& newHead, bool headCleared, bool needCompactHead, const ui32 maxBlobSize)
741741
: Partition(partition)
742742
, Offset(offset)

ydb/core/persqueue/blob.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ class TPartitionedBlob {
264264

265265
TPartitionedBlob(const TPartitionedBlob& x);
266266

267-
TPartitionedBlob(const ui32 partition, const ui64 offset, const TString& sourceId, const ui64 seqNo,
267+
TPartitionedBlob(const TPartitionId& partition, const ui64 offset, const TString& sourceId, const ui64 seqNo,
268268
const ui16 totalParts, const ui32 totalSize, THead& head, THead& newHead, bool headCleared, bool needCompactHead, const ui32 maxBlobSize);
269269

270270
std::optional<std::pair<TKey, TString>> Add(TClientBlob&& blob);
@@ -287,7 +287,7 @@ class TPartitionedBlob {
287287
TString CompactHead(bool glueHead, THead& head, bool glueNewHead, THead& newHead, ui32 estimatedSize);
288288

289289
private:
290-
ui32 Partition;
290+
TPartitionId Partition;
291291
ui64 Offset;
292292
ui16 InternalPartsCount;
293293
ui64 StartOffset;

ydb/core/persqueue/cache_eviction.h

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,19 +54,19 @@ namespace NPQ {
5454
ERequestType Type;
5555
TActorId Sender;
5656
ui64 CookiePQ;
57-
ui32 Partition;
57+
TPartitionId Partition;
5858
ui32 MetadataWritesCount;
5959
TVector<TRequestedBlob> Blobs;
6060

61-
TKvRequest(ERequestType type, TActorId sender, ui64 cookie, ui32 partition)
61+
TKvRequest(ERequestType type, TActorId sender, ui64 cookie, const TPartitionId& partition)
6262
: Type(type)
6363
, Sender(sender)
6464
, CookiePQ(cookie)
6565
, Partition(partition)
6666
, MetadataWritesCount(0)
6767
{}
6868

69-
TBlobId GetBlobId(ui32 pos) const { return TBlobId(Partition, Blobs[pos].Offset, Blobs[pos].PartNo, Blobs[pos].Count, Blobs[pos].InternalPartsCount); }
69+
TBlobId GetBlobId(ui32 pos) const { return TBlobId(Partition.InternalPartitionId, Blobs[pos].Offset, Blobs[pos].PartNo, Blobs[pos].Count, Blobs[pos].InternalPartsCount); }
7070

7171
THolder<TEvKeyValue::TEvRequest> MakeKvRequest() const
7272
{
@@ -110,7 +110,7 @@ namespace NPQ {
110110
}
111111

112112
void Verify(const TRequestedBlob& blob) const {
113-
TKey key(TKeyPrefix::TypeData, 0, blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount, false);
113+
TKey key(TKeyPrefix::TypeData, TPartitionId(0), blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount, false);
114114
Y_ABORT_UNLESS(blob.Value.size() == blob.Size);
115115
TClientBlob::CheckBlob(key, blob.Value);
116116
}
@@ -258,7 +258,7 @@ namespace NPQ {
258258

259259
for (const auto& blob : kvReq.Blobs) {
260260
// Touching blobs in L2. We don't need data here
261-
TCacheBlobL2 key = {kvReq.Partition, blob.Offset, blob.PartNo, nullptr};
261+
TCacheBlobL2 key = {kvReq.Partition.InternalPartitionId, blob.Offset, blob.PartNo, nullptr};
262262
if (blob.Cached)
263263
reqData->RequestedBlobs.push_back(key);
264264
else
@@ -275,10 +275,10 @@ namespace NPQ {
275275
THolder<TCacheL2Request> reqData = MakeHolder<TCacheL2Request>(TabletId);
276276

277277
for (const TRequestedBlob& reqBlob : kvReq.Blobs) {
278-
TBlobId blob(kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount);
278+
TBlobId blob(kvReq.Partition.InternalPartitionId, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount);
279279
{ // there could be a new blob with same id (for big messages)
280280
if (RemoveExists(ctx, blob)) {
281-
TCacheBlobL2 removed = {kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, nullptr};
281+
TCacheBlobL2 removed = {kvReq.Partition.InternalPartitionId, reqBlob.Offset, reqBlob.PartNo, nullptr};
282282
reqData->RemovedBlobs.push_back(removed);
283283
}
284284
}
@@ -290,7 +290,7 @@ namespace NPQ {
290290
if (L1Strategy)
291291
L1Strategy->SaveHeadBlob(blob);
292292

293-
TCacheBlobL2 blobL2 = {kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, cached};
293+
TCacheBlobL2 blobL2 = {kvReq.Partition.InternalPartitionId, reqBlob.Offset, reqBlob.PartNo, cached};
294294
reqData->StoredBlobs.push_back(blobL2);
295295

296296
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Caching head blob in L1. Partition "
@@ -314,7 +314,7 @@ namespace NPQ {
314314
continue;
315315

316316
const TRequestedBlob& reqBlob = kvReq.Blobs[i];
317-
TBlobId blob(kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount);
317+
TBlobId blob(kvReq.Partition.InternalPartitionId, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount);
318318
{
319319
TValueL1 value;
320320
if (CheckExists(ctx, blob, value)) {
@@ -328,7 +328,7 @@ namespace NPQ {
328328
Cache[blob] = valL1; // weak
329329
Counters.Inc(valL1);
330330

331-
TCacheBlobL2 blobL2 = {kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, cached};
331+
TCacheBlobL2 blobL2 = {kvReq.Partition.InternalPartitionId, reqBlob.Offset, reqBlob.PartNo, cached};
332332
reqData->StoredBlobs.push_back(blobL2);
333333

334334
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Prefetched blob in L1. Partition "
@@ -428,7 +428,7 @@ namespace NPQ {
428428
++numCached;
429429
continue;
430430
}
431-
TBlobId blobId(kvReq.Partition, blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount);
431+
TBlobId blobId(kvReq.Partition.InternalPartitionId, blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount);
432432
TCacheValue::TPtr cached = GetValue(ctx, blobId);
433433
if (cached) {
434434
++numCached;

ydb/core/persqueue/event_helpers.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ void ReplyPersQueueError(
1515
const TActorContext& ctx,
1616
ui64 tabletId,
1717
const TString& topicName,
18-
TMaybe<ui32> partition,
18+
TMaybe<TPartitionId> partition,
1919
NKikimr::TTabletCountersBase& counters,
2020
NKikimrServices::EServiceKikimr service,
2121
const ui64 responseCookie,

ydb/core/persqueue/event_helpers.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#pragma once
22

3+
#include "partition_id.h"
4+
35
#include <ydb/core/tablet/tablet_counters.h>
46
#include <ydb/library/services/services.pb.h>
57
#include <ydb/public/api/protos/draft/persqueue_error_codes.pb.h>
@@ -14,7 +16,7 @@ void ReplyPersQueueError(
1416
const TActorContext& ctx,
1517
ui64 tabletId,
1618
const TString& topicName,
17-
TMaybe<ui32> partition,
19+
TMaybe<TPartitionId> partition,
1820
NKikimr::TTabletCountersBase& counters,
1921
NKikimrServices::EServiceKikimr service,
2022
const ui64 responseCookie,

ydb/core/persqueue/events/internal.h

Lines changed: 31 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -170,8 +170,6 @@ struct TEvPQ {
170170
EvQuotaCountersUpdated,
171171
EvConsumerRemoved,
172172
EvFetchResponse,
173-
EvSourceIdRequest,
174-
EvSourceIdResponse,
175173
EvPublishRead,
176174
EvForgetRead,
177175
EvRegisterDirectReadSession,
@@ -313,13 +311,18 @@ struct TEvPQ {
313311
};
314312

315313
struct TEvMonResponse : public TEventLocal<TEvMonResponse, EvMonResponse> {
316-
TEvMonResponse(ui32 partition, const TVector<TString>& res, const TString& str)
314+
TEvMonResponse(const NPQ::TPartitionId& partition, const TVector<TString>& res, const TString& str)
317315
: Partition(partition)
318316
, Res(res)
319317
, Str(str)
320318
{}
321319

322-
ui32 Partition;
320+
TEvMonResponse(const TVector<TString>& res, const TString& str)
321+
: Res(res)
322+
, Str(str)
323+
{}
324+
325+
TMaybe<NPQ::TPartitionId> Partition;
323326
TVector<TString> Res;
324327
TString Str;
325328
};
@@ -398,11 +401,13 @@ struct TEvPQ {
398401
};
399402

400403
struct TEvPartitionOffsetsResponse : public TEventLocal<TEvPartitionOffsetsResponse, EvPartitionOffsetsResponse> {
401-
explicit TEvPartitionOffsetsResponse(NKikimrPQ::TOffsetsResponse::TPartResult& partResult)
404+
TEvPartitionOffsetsResponse(NKikimrPQ::TOffsetsResponse::TPartResult& partResult, const NPQ::TPartitionId& partition)
402405
: PartResult(partResult)
406+
, Partition(partition)
403407
{}
404408

405409
NKikimrPQ::TOffsetsResponse::TPartResult PartResult;
410+
NPQ::TPartitionId Partition;
406411
};
407412

408413
struct TEvPartitionStatus : public TEventLocal<TEvPartitionStatus, EvPartitionStatus> {
@@ -424,11 +429,13 @@ struct TEvPQ {
424429
};
425430

426431
struct TEvPartitionStatusResponse : public TEventLocal<TEvPartitionStatusResponse, EvPartitionStatusResponse> {
427-
explicit TEvPartitionStatusResponse(NKikimrPQ::TStatusResponse::TPartResult& partResult)
432+
TEvPartitionStatusResponse(NKikimrPQ::TStatusResponse::TPartResult& partResult, const NPQ::TPartitionId& partition)
428433
: PartResult(partResult)
434+
, Partition(partition)
429435
{}
430436

431437
NKikimrPQ::TStatusResponse::TPartResult PartResult;
438+
NPQ::TPartitionId Partition;
432439
};
433440

434441

@@ -442,11 +449,11 @@ struct TEvPQ {
442449
};
443450

444451
struct TEvInitComplete : public TEventLocal<TEvInitComplete, EvInitComplete> {
445-
explicit TEvInitComplete(const ui32 partition)
452+
explicit TEvInitComplete(const NPQ::TPartitionId& partition)
446453
: Partition(partition)
447454
{}
448455

449-
ui32 Partition;
456+
NPQ::TPartitionId Partition;
450457
};
451458

452459
struct TEvError : public TEventLocal<TEvError, EvError> {
@@ -462,7 +469,7 @@ struct TEvPQ {
462469
};
463470

464471
struct TEvBlobRequest : public TEventLocal<TEvBlobRequest, EvBlobRequest> {
465-
TEvBlobRequest(const TString& user, const ui64 cookie, const ui32 partition, const ui64 readOffset,
472+
TEvBlobRequest(const TString& user, const ui64 cookie, const NPQ::TPartitionId& partition, const ui64 readOffset,
466473
TVector<NPQ::TRequestedBlob>&& blobs)
467474
: User(user)
468475
, Cookie(cookie)
@@ -473,7 +480,7 @@ struct TEvPQ {
473480

474481
TString User;
475482
ui64 Cookie;
476-
ui32 Partition;
483+
NPQ::TPartitionId Partition;
477484
ui64 ReadOffset;
478485
TVector<NPQ::TRequestedBlob> Blobs;
479486
};
@@ -565,12 +572,12 @@ struct TEvPQ {
565572
};
566573

567574
struct TEvPartitionConfigChanged : public TEventLocal<TEvPartitionConfigChanged, EvPartitionConfigChanged> {
568-
explicit TEvPartitionConfigChanged(ui32 partition) :
575+
explicit TEvPartitionConfigChanged(const NPQ::TPartitionId& partition) :
569576
Partition(partition)
570577
{
571578
}
572579

573-
ui32 Partition;
580+
NPQ::TPartitionId Partition;
574581
};
575582

576583
struct TEvChangeCacheConfig : public TEventLocal<TEvChangeCacheConfig, EvChangeCacheConfig> {
@@ -588,35 +595,35 @@ struct TEvPQ {
588595
};
589596

590597
struct TEvPartitionCounters : public TEventLocal<TEvPartitionCounters, EvPartitionCounters> {
591-
TEvPartitionCounters(const ui32 partition, const TTabletCountersBase& counters)
598+
TEvPartitionCounters(const NPQ::TPartitionId& partition, const TTabletCountersBase& counters)
592599
: Partition(partition)
593600
{
594601
Counters.Populate(counters);
595602
}
596603

597-
const ui32 Partition;
604+
const NPQ::TPartitionId Partition;
598605
TTabletCountersBase Counters;
599606
};
600607

601608
struct TEvPartitionLabeledCounters : public TEventLocal<TEvPartitionLabeledCounters, EvPartitionLabeledCounters> {
602-
TEvPartitionLabeledCounters(const ui32 partition, const TTabletLabeledCountersBase& labeledCounters)
609+
TEvPartitionLabeledCounters(const NPQ::TPartitionId& partition, const TTabletLabeledCountersBase& labeledCounters)
603610
: Partition(partition)
604611
, LabeledCounters(labeledCounters)
605612
{
606613
}
607614

608-
const ui32 Partition;
615+
const NPQ::TPartitionId Partition;
609616
TTabletLabeledCountersBase LabeledCounters;
610617
};
611618

612619
struct TEvPartitionLabeledCountersDrop : public TEventLocal<TEvPartitionLabeledCountersDrop, EvPartitionLabeledCountersDrop> {
613-
TEvPartitionLabeledCountersDrop(const ui32 partition, const TString& group)
620+
TEvPartitionLabeledCountersDrop(const NPQ::TPartitionId& partition, const TString& group)
614621
: Partition(partition)
615622
, Group(group)
616623
{
617624
}
618625

619-
const ui32 Partition;
626+
const NPQ::TPartitionId Partition;
620627
TString Group;
621628
};
622629

@@ -798,7 +805,7 @@ struct TEvPQ {
798805
};
799806

800807
struct TEvTxCalcPredicateResult : public TEventLocal<TEvTxCalcPredicateResult, EvTxCalcPredicateResult> {
801-
TEvTxCalcPredicateResult(ui64 step, ui64 txId, ui32 partition, bool predicate) :
808+
TEvTxCalcPredicateResult(ui64 step, ui64 txId, const NPQ::TPartitionId& partition, bool predicate) :
802809
Step(step),
803810
TxId(txId),
804811
Partition(partition),
@@ -808,7 +815,7 @@ struct TEvPQ {
808815

809816
ui64 Step;
810817
ui64 TxId;
811-
ui32 Partition;
818+
NPQ::TPartitionId Partition;
812819
bool Predicate = false;
813820
};
814821

@@ -826,7 +833,7 @@ struct TEvPQ {
826833
};
827834

828835
struct TEvProposePartitionConfigResult : public TEventLocal<TEvProposePartitionConfigResult, EvProposePartitionConfigResult> {
829-
TEvProposePartitionConfigResult(ui64 step, ui64 txId, ui32 partition) :
836+
TEvProposePartitionConfigResult(ui64 step, ui64 txId, const NPQ::TPartitionId& partition) :
830837
Step(step),
831838
TxId(txId),
832839
Partition(partition)
@@ -835,7 +842,7 @@ struct TEvPQ {
835842

836843
ui64 Step;
837844
ui64 TxId;
838-
ui32 Partition;
845+
NPQ::TPartitionId Partition;
839846
};
840847

841848
struct TEvTxCommit : public TEventLocal<TEvTxCommit, EvTxCommit> {
@@ -850,7 +857,7 @@ struct TEvPQ {
850857
};
851858

852859
struct TEvTxCommitDone : public TEventLocal<TEvTxCommitDone, EvTxCommitDone> {
853-
TEvTxCommitDone(ui64 step, ui64 txId, ui32 partition) :
860+
TEvTxCommitDone(ui64 step, ui64 txId, const NPQ::TPartitionId& partition) :
854861
Step(step),
855862
TxId(txId),
856863
Partition(partition)
@@ -859,7 +866,7 @@ struct TEvPQ {
859866

860867
ui64 Step;
861868
ui64 TxId;
862-
ui32 Partition;
869+
NPQ::TPartitionId Partition;
863870
};
864871

865872
struct TEvTxRollback : public TEventLocal<TEvTxRollback, EvTxRollback> {

0 commit comments

Comments
 (0)