Skip to content

Commit c826c25

Browse files
LOGBROKER-8894 enable minSeqNo save in partition (iter3) (#1768)
1 parent f0d28ca commit c826c25

8 files changed

+70
-26
lines changed

ydb/core/persqueue/partition_sourcemanager.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ bool TPartitionSourceManager::HasParents() const {
6060

6161
TPartitionSourceManager::TModificationBatch::TModificationBatch(TPartitionSourceManager& manager, ESourceIdFormat format)
6262
: Manager(manager)
63-
, Node(Manager.GetPartitionNode())
63+
, Node(Manager.GetPartitionNode())
6464
, SourceIdWriter(format)
6565
, HeartbeatEmitter(Manager.Partition.SourceIdStorage) {
6666
}
@@ -104,6 +104,7 @@ TPartitionSourceManager& TPartitionSourceManager::TModificationBatch::GetManager
104104
TPartitionSourceManager::TSourceInfo Convert(TSourceIdInfo value) {
105105
TPartitionSourceManager::TSourceInfo result(value.State);
106106
result.SeqNo = value.SeqNo;
107+
result.MinSeqNo = value.MinSeqNo;
107108
result.Offset = value.Offset;
108109
result.Explicit = value.Explicit;
109110
result.WriteTimestamp = value.WriteTimestamp;

ydb/core/persqueue/partition_sourcemanager.h

+1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ class TPartitionSourceManager {
2525

2626
TSourceIdInfo::EState State;
2727
ui64 SeqNo = 0;
28+
ui64 MinSeqNo = 0;
2829
ui64 Offset = 0;
2930
bool Explicit = false;
3031
TInstant WriteTimestamp;

ydb/core/persqueue/partition_write.cpp

+5-5
Original file line numberDiff line numberDiff line change
@@ -219,12 +219,12 @@ void TPartition::ProcessReserveRequests(const TActorContext& ctx) {
219219

220220
const ui64 currentSize = ReservedSize + WriteInflightSize + WriteCycleSize;
221221
if (currentSize != 0 && currentSize + size > maxWriteInflightSize) {
222-
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Reserve processing: maxWriteInflightSize riched. Partition: " << Partition);
222+
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Reserve processing: maxWriteInflightSize riched. Partition: " << Partition);
223223
break;
224224
}
225225

226226
if (WaitingForSubDomainQuota(ctx, currentSize)) {
227-
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Reserve processing: SubDomainOutOfSpace. Partition: " << Partition);
227+
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Reserve processing: SubDomainOutOfSpace. Partition: " << Partition);
228228
break;
229229
}
230230

@@ -727,7 +727,7 @@ void TPartition::HandleOnWrite(TEvPQ::TEvDeregisterMessageGroup::TPtr& ev, const
727727
return ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::SOURCEID_DELETED,
728728
"SourceId doesn't exist");
729729
}
730-
730+
731731
EmplaceRequest(TDeregisterMessageGroupMsg(*ev->Get()), ctx);
732732
}
733733

@@ -1519,7 +1519,7 @@ void TPartition::HandleWrites(const TActorContext& ctx) {
15191519
THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
15201520

15211521
Y_ABORT_UNLESS(Head.PackedSize + NewHead.PackedSize <= 2 * MaxSizeCheck);
1522-
1522+
15231523
TInstant now = ctx.Now();
15241524
WriteCycleStartTime = now;
15251525

@@ -1592,7 +1592,7 @@ bool TPartition::WaitingForSubDomainQuota(const TActorContext& ctx, const ui64 w
15921592

15931593
void TPartition::WriteBlobWithQuota(const TActorContext& /*ctx*/, THolder<TEvKeyValue::TEvRequest>&& request) {
15941594
PQ_LOG_T("TPartition::WriteBlobWithQuota.");
1595-
1595+
15961596
// Request quota and write blob.
15971597
// Mirrored topics are not quoted in local dc.
15981598
const bool skip = !IsQuotingEnabled() || TopicWriteQuotaResourcePath.empty();

ydb/core/persqueue/pq_impl.cpp

+19-19
Original file line numberDiff line numberDiff line change
@@ -146,11 +146,11 @@ class TReadProxy : public TActorBootstrapped<TReadProxy> {
146146
PreparedResponse = std::make_shared<NKikimrClient::TResponse>();
147147
}
148148
}
149-
149+
150150
auto& responseRecord = isDirectRead ? *PreparedResponse : Response->Record;
151151
responseRecord.SetStatus(NMsgBusProxy::MSTATUS_OK);
152-
responseRecord.SetErrorCode(NPersQueue::NErrorCode::OK);
153-
152+
responseRecord.SetErrorCode(NPersQueue::NErrorCode::OK);
153+
154154
Y_ABORT_UNLESS(readResult.ResultSize() > 0);
155155
bool isStart = false;
156156
if (!responseRecord.HasPartitionResponse()) {
@@ -191,7 +191,7 @@ class TReadProxy : public TActorBootstrapped<TReadProxy> {
191191
}
192192

193193
if (isNewMsg) {
194-
if (!isStart && readResult.GetResult(i).HasTotalParts()
194+
if (!isStart && readResult.GetResult(i).HasTotalParts()
195195
&& readResult.GetResult(i).GetTotalParts() + i > readResult.ResultSize()) //last blob is not full
196196
break;
197197
partResp->AddResult()->CopyFrom(readResult.GetResult(i));
@@ -292,7 +292,7 @@ class TReadProxy : public TActorBootstrapped<TReadProxy> {
292292
};
293293

294294

295-
TActorId CreateReadProxy(const TActorId& sender, const TActorId& tablet, ui32 tabletGeneration,
295+
TActorId CreateReadProxy(const TActorId& sender, const TActorId& tablet, ui32 tabletGeneration,
296296
const TDirectReadKey& directReadKey, const NKikimrClient::TPersQueueRequest& request,
297297
const TActorContext& ctx)
298298
{
@@ -304,7 +304,7 @@ class TResponseBuilder {
304304
public:
305305

306306
TResponseBuilder(const TActorId& sender, const TActorId& tablet, const TString& topicName, const ui32 partition, const ui64 messageNo,
307-
const TString& reqId, const TMaybe<ui64> cookie, NMetrics::TResourceMetrics* resourceMetrics,
307+
const TString& reqId, const TMaybe<ui64> cookie, NMetrics::TResourceMetrics* resourceMetrics,
308308
const TActorContext& ctx)
309309
: Sender(sender)
310310
, Tablet(tablet)
@@ -639,7 +639,7 @@ struct TPersQueue::TReplyToActor {
639639
Event(std::move(event))
640640
{
641641
}
642-
642+
643643
TActorId ActorId;
644644
TEventBasePtr Event;
645645
};
@@ -840,7 +840,7 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&
840840
ctx.Send(ctx.SelfID, new TEvents::TEvPoisonPill());
841841
return;
842842
}
843-
843+
844844
Y_ABORT_UNLESS(readRange.HasStatus());
845845
if (readRange.GetStatus() != NKikimrProto::OK && readRange.GetStatus() != NKikimrProto::NODATA) {
846846
LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE,
@@ -1267,7 +1267,7 @@ void TPersQueue::FinishResponse(THashMap<ui64, TAutoPtr<TResponseBuilder>>::iter
12671267

12681268

12691269
void TPersQueue::Handle(TEvPersQueue::TEvUpdateConfig::TPtr& ev, const TActorContext& ctx)
1270-
{
1270+
{
12711271
if (!ConfigInited) {
12721272
UpdateConfigRequests.emplace_back(ev->Release(), ev->Sender);
12731273
return;
@@ -1304,7 +1304,7 @@ void TPersQueue::TrySendUpdateConfigResponses(const TActorContext& ctx)
13041304

13051305
ChangeConfigNotification.clear();
13061306
}
1307-
1307+
13081308
void TPersQueue::CreateTopicConverter(const NKikimrPQ::TPQTabletConfig& config,
13091309
NPersQueue::TConverterFactoryPtr& converterFactory,
13101310
NPersQueue::TTopicConverterPtr& topicConverter,
@@ -2109,7 +2109,7 @@ void TPersQueue::HandleReadRequest(
21092109
ReplyError(ctx, responseCookie, NPersQueue::NErrorCode::READ_ERROR_NO_SESSION,
21102110
TStringBuilder() << "Read prepare request with unknown(old?) session id " << cmd.GetSessionId());
21112111
return;
2112-
}
2112+
}
21132113
}
21142114

21152115
THolder<TEvPQ::TEvRead> event =
@@ -2375,7 +2375,7 @@ void TPersQueue::Handle(TEvPersQueue::TEvRequest::TPtr& ev, const TActorContext&
23752375
}
23762376
ResponseProxy[responseCookie] = ans;
23772377
Counters->Simple()[COUNTER_PQ_TABLET_INFLIGHT].Set(ResponseProxy.size());
2378-
2378+
23792379
if (!ConfigInited) {
23802380
ReplyError(ctx, responseCookie, NPersQueue::NErrorCode::INITIALIZING, "tablet is not ready");
23812381
return;
@@ -2396,11 +2396,11 @@ void TPersQueue::Handle(TEvPersQueue::TEvRequest::TPtr& ev, const TActorContext&
23962396
ReplyError(ctx, responseCookie, NPersQueue::NErrorCode::BAD_REQUEST, "no partition number");
23972397
return;
23982398
}
2399-
2399+
24002400
TPartitionId partition(req.GetPartition());
24012401
auto it = Partitions.find(partition);
24022402

2403-
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " got client message batch for topic '"
2403+
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " got client message batch for topic '"
24042404
<< (TopicConverter ? TopicConverter->GetClientsideName() : "Undefined") << "' partition " << partition);
24052405

24062406
if (it == Partitions.end()) {
@@ -2859,7 +2859,7 @@ void TPersQueue::Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorConte
28592859

28602860
std::unique_ptr<TEvTxProcessing::TEvReadSetAck> ack;
28612861
if (!(event.GetFlags() & NKikimrTx::TEvReadSet::FLAG_NO_ACK)) {
2862-
ack = std::make_unique<TEvTxProcessing::TEvReadSetAck>(*ev->Get(), TabletID());
2862+
ack = std::make_unique<TEvTxProcessing::TEvReadSetAck>(*ev->Get(), TabletID());
28632863
}
28642864

28652865
if (auto tx = GetTransaction(ctx, event.GetTxId()); tx && tx->Senders.contains(event.GetTabletProducer())) {
@@ -2927,7 +2927,7 @@ void TPersQueue::Handle(TEvPQ::TEvTxCalcPredicateResult::TPtr& ev, const TActorC
29272927
void TPersQueue::Handle(TEvPQ::TEvProposePartitionConfigResult::TPtr& ev, const TActorContext& ctx)
29282928
{
29292929
const TEvPQ::TEvProposePartitionConfigResult& event = *ev->Get();
2930-
2930+
29312931
auto tx = GetTransaction(ctx, event.TxId);
29322932
if (!tx) {
29332933
return;
@@ -3582,7 +3582,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
35823582
Y_ABORT_UNLESS(tx.PartitionRepliesCount <= tx.PartitionRepliesExpected);
35833583

35843584
PQ_LOG_T("TxId="<< tx.TxId << ", State=EXECUTING" <<
3585-
", tx.PartitionRepliesCount=" << tx.PartitionRepliesCount <<
3585+
", tx.PartitionRepliesCount=" << tx.PartitionRepliesCount <<
35863586
", tx.PartitionRepliesExpected=" << tx.PartitionRepliesExpected);
35873587
if (tx.PartitionRepliesCount == tx.PartitionRepliesExpected) {
35883588
Y_ABORT_UNLESS(!TxQueue.empty());
@@ -3728,7 +3728,7 @@ TPartition* TPersQueue::CreatePartitionActor(const TPartitionId& partitionId,
37283728
const TActorContext& ctx)
37293729
{
37303730
int channels = Info()->Channels.size() - NKeyValue::BLOB_CHANNEL; // channels 0,1 are reserved in tablet
3731-
Y_ABORT_UNLESS(channels > 0);
3731+
Y_ABORT_UNLESS(channels > 0);
37323732

37333733
return new TPartition(TabletID(),
37343734
partitionId,
@@ -3793,7 +3793,7 @@ void TPersQueue::EnsurePartitionsAreNotDeleted(const NKikimrPQ::TPQTabletConfig&
37933793
Y_VERIFY_S(was.contains(partition.GetPartitionId()), "New config is bad, missing partition " << partition.GetPartitionId());
37943794
}
37953795
}
3796-
3796+
37973797
void TPersQueue::InitTransactions(const NKikimrClient::TKeyValueResponse::TReadRangeResult& readRange,
37983798
THashMap<ui32, TVector<TTransaction>>& partitionTxs)
37993799
{

ydb/core/persqueue/sourceid.cpp

+8
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ void THeartbeatProcessor::ForgetSourceId(const TString& sourceId) {
103103

104104
TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs)
105105
: SeqNo(seqNo)
106+
, MinSeqNo(seqNo)
106107
, Offset(offset)
107108
, WriteTimestamp(createTs)
108109
, CreateTimestamp(createTs)
@@ -111,6 +112,7 @@ TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs)
111112

112113
TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, THeartbeat&& heartbeat)
113114
: SeqNo(seqNo)
115+
, MinSeqNo(seqNo)
114116
, Offset(offset)
115117
, WriteTimestamp(createTs)
116118
, CreateTimestamp(createTs)
@@ -120,6 +122,7 @@ TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, THeartb
120122

121123
TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, TMaybe<TPartitionKeyRange>&& keyRange, bool isInSplit)
122124
: SeqNo(seqNo)
125+
, MinSeqNo(seqNo)
123126
, Offset(offset)
124127
, CreateTimestamp(createTs)
125128
, Explicit(true)
@@ -133,6 +136,9 @@ TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, TMaybe<
133136
TSourceIdInfo TSourceIdInfo::Updated(ui64 seqNo, ui64 offset, TInstant writeTs) const {
134137
auto copy = *this;
135138
copy.SeqNo = seqNo;
139+
if (copy.MinSeqNo == 0) {
140+
copy.MinSeqNo = seqNo;
141+
}
136142
copy.Offset = offset;
137143
copy.WriteTimestamp = writeTs;
138144

@@ -178,6 +184,7 @@ void TSourceIdInfo::Serialize(TBuffer& data) const {
178184
TSourceIdInfo TSourceIdInfo::Parse(const NKikimrPQ::TMessageGroupInfo& proto) {
179185
TSourceIdInfo result;
180186
result.SeqNo = proto.GetSeqNo();
187+
result.MinSeqNo = proto.GetMinSeqNo();
181188
result.Offset = proto.GetOffset();
182189
result.WriteTimestamp = TInstant::FromValue(proto.GetWriteTimestamp());
183190
result.CreateTimestamp = TInstant::FromValue(proto.GetCreateTimestamp());
@@ -197,6 +204,7 @@ TSourceIdInfo TSourceIdInfo::Parse(const NKikimrPQ::TMessageGroupInfo& proto) {
197204

198205
void TSourceIdInfo::Serialize(NKikimrPQ::TMessageGroupInfo& proto) const {
199206
proto.SetSeqNo(SeqNo);
207+
proto.SetMinSeqNo(MinSeqNo);
200208
proto.SetOffset(Offset);
201209
proto.SetWriteTimestamp(WriteTimestamp.GetValue());
202210
proto.SetCreateTimestamp(CreateTimestamp.GetValue());

ydb/core/persqueue/sourceid.h

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ struct TSourceIdInfo {
2424
};
2525

2626
ui64 SeqNo = 0;
27+
ui64 MinSeqNo = 0;
2728
ui64 Offset = 0;
2829
TInstant WriteTimestamp;
2930
TInstant CreateTimestamp;

ydb/core/persqueue/ut/sourceid_ut.cpp

+33-1
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
258258
Y_UNIT_TEST(SourceIdStorageComplexDelete) {
259259
TSourceIdStorage storage;
260260
for (ui64 i = 1; i <= 10000 + 1; ++i) { // add 10000 + one extra sources
261-
storage.RegisterSourceId(TestSourceId(i), i, i, TInstant::Seconds(10 * i));
261+
storage.RegisterSourceId(TestSourceId(i), i, i , TInstant::Seconds(10 * i));
262262
}
263263

264264
NKikimrPQ::TPartitionConfig config;
@@ -460,6 +460,38 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
460460
}
461461
}
462462

463+
Y_UNIT_TEST(SourceIdMinSeqNo) {
464+
TSourceIdStorage storage;
465+
466+
const auto sourceId = TestSourceId(1);
467+
const auto sourceIdInfo = TSourceIdInfo(2, 10, TInstant::Seconds(100));
468+
const auto anotherSourceId = TestSourceId(2);
469+
const auto anotherSourceIdInfo = TSourceIdInfo(0, 20, TInstant::Seconds(200));
470+
471+
storage.RegisterSourceId(sourceId, sourceIdInfo);
472+
storage.RegisterSourceId(anotherSourceId, anotherSourceIdInfo);
473+
{
474+
auto it = storage.GetInMemorySourceIds().find(anotherSourceId);
475+
UNIT_ASSERT_VALUES_EQUAL(it->second.MinSeqNo, 0);
476+
}
477+
478+
storage.RegisterSourceId(sourceId, sourceIdInfo.Updated(3, 11, TInstant::Seconds(100)));
479+
{
480+
auto it = storage.GetInMemorySourceIds().find(sourceId);
481+
UNIT_ASSERT_VALUES_EQUAL(it->second.MinSeqNo, 2);
482+
}
483+
storage.RegisterSourceId(sourceId, sourceIdInfo.Updated(1, 12, TInstant::Seconds(100)));
484+
{
485+
auto it = storage.GetInMemorySourceIds().find(sourceId);
486+
UNIT_ASSERT_VALUES_EQUAL(it->second.MinSeqNo, 2);
487+
}
488+
storage.RegisterSourceId(anotherSourceId, anotherSourceIdInfo.Updated(3, 12, TInstant::Seconds(100)));
489+
{
490+
auto it = storage.GetInMemorySourceIds().find(anotherSourceId);
491+
UNIT_ASSERT_VALUES_EQUAL(it->second.MinSeqNo, 3);
492+
}
493+
}
494+
463495
} // TSourceIdTests
464496

465497
} // namespace NKikimr::NPQ

ydb/core/protos/pqconfig.proto

+1
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,7 @@ message TMessageGroupInfo {
377377
}
378378

379379
optional uint64 SeqNo = 1;
380+
optional uint64 MinSeqNo = 9;
380381
optional uint64 Offset = 2;
381382
optional uint64 WriteTimestamp = 3; // TInstant::TValue
382383
optional uint64 CreateTimestamp = 4; // TInstant::TValue

0 commit comments

Comments
 (0)