Skip to content

Commit 3ad495b

Browse files
Merge 8b52a2f into 24479f4
2 parents 24479f4 + 8b52a2f commit 3ad495b

8 files changed

+79
-35
lines changed

ydb/core/persqueue/partition_sourcemanager.cpp

+3-2
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ bool TPartitionSourceManager::HasParents() const {
6060

6161
TPartitionSourceManager::TModificationBatch::TModificationBatch(TPartitionSourceManager& manager, ESourceIdFormat format)
6262
: Manager(manager)
63-
, Node(Manager.GetPartitionNode())
63+
, Node(Manager.GetPartitionNode())
6464
, SourceIdWriter(format)
6565
, HeartbeatEmitter(Manager.Partition.SourceIdStorage) {
6666
}
@@ -104,6 +104,7 @@ TPartitionSourceManager& TPartitionSourceManager::TModificationBatch::GetManager
104104
TPartitionSourceManager::TSourceInfo Convert(TSourceIdInfo value) {
105105
TPartitionSourceManager::TSourceInfo result(value.State);
106106
result.SeqNo = value.SeqNo;
107+
result.MinSeqNo = value.MinSeqNo;
107108
result.Offset = value.Offset;
108109
result.Explicit = value.Explicit;
109110
result.WriteTimestamp = value.WriteTimestamp;
@@ -147,7 +148,7 @@ std::optional<ui64> TPartitionSourceManager::TSourceManager::UpdatedSeqNo() cons
147148

148149
void TPartitionSourceManager::TSourceManager::Update(ui64 seqNo, ui64 offset, TInstant timestamp) {
149150
if (InMemory == MemoryStorage().end()) {
150-
Batch.SourceIdWriter.RegisterSourceId(SourceId, seqNo, offset, timestamp);
151+
Batch.SourceIdWriter.RegisterSourceId(SourceId, seqNo, seqNo, offset, timestamp);
151152
} else {
152153
Batch.SourceIdWriter.RegisterSourceId(SourceId, InMemory->second.Updated(seqNo, offset, timestamp));
153154
}

ydb/core/persqueue/partition_sourcemanager.h

+1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ class TPartitionSourceManager {
2525

2626
TSourceIdInfo::EState State;
2727
ui64 SeqNo = 0;
28+
ui64 MinSeqNo = 0;
2829
ui64 Offset = 0;
2930
bool Explicit = false;
3031
TInstant WriteTimestamp;

ydb/core/persqueue/partition_write.cpp

+10-10
Original file line numberDiff line numberDiff line change
@@ -219,12 +219,12 @@ void TPartition::ProcessReserveRequests(const TActorContext& ctx) {
219219

220220
const ui64 currentSize = ReservedSize + WriteInflightSize + WriteCycleSize;
221221
if (currentSize != 0 && currentSize + size > maxWriteInflightSize) {
222-
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Reserve processing: maxWriteInflightSize riched. Partition: " << Partition);
222+
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Reserve processing: maxWriteInflightSize riched. Partition: " << Partition);
223223
break;
224224
}
225225

226226
if (WaitingForSubDomainQuota(ctx, currentSize)) {
227-
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Reserve processing: SubDomainOutOfSpace. Partition: " << Partition);
227+
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Reserve processing: SubDomainOutOfSpace. Partition: " << Partition);
228228
break;
229229
}
230230

@@ -327,7 +327,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
327327
if (it == SourceIdStorage.GetInMemorySourceIds().end()) {
328328
Y_ABORT_UNLESS(!writeResponse.Msg.HeartbeatVersion);
329329
TabletCounters.Cumulative()[COUNTER_PQ_SID_CREATED].Increment(1);
330-
SourceIdStorage.RegisterSourceId(s, seqNo, offset, CurrentTimestamp);
330+
SourceIdStorage.RegisterSourceId(s, seqNo, 0, offset, CurrentTimestamp);
331331
} else if (const auto& hbVersion = writeResponse.Msg.HeartbeatVersion) {
332332
SourceIdStorage.RegisterSourceId(s, it->second.Updated(
333333
seqNo, offset, CurrentTimestamp, THeartbeat{*hbVersion, writeResponse.Msg.Data}
@@ -378,7 +378,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
378378
}
379379

380380
Y_ABORT_UNLESS(body.AssignedOffset);
381-
SourceIdStorage.RegisterSourceId(body.SourceId, body.SeqNo, *body.AssignedOffset, CurrentTimestamp, std::move(keyRange));
381+
SourceIdStorage.RegisterSourceId(body.SourceId, body.SeqNo, 0, *body.AssignedOffset, CurrentTimestamp, std::move(keyRange));
382382
ReplyOk(ctx, response.GetCookie());
383383
} else if (response.IsDeregisterMessageGroup()) {
384384
const auto& body = response.GetDeregisterMessageGroup().Body;
@@ -399,7 +399,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
399399
}
400400

401401
Y_ABORT_UNLESS(body.AssignedOffset);
402-
SourceIdStorage.RegisterSourceId(body.SourceId, body.SeqNo, *body.AssignedOffset, CurrentTimestamp, std::move(keyRange), true);
402+
SourceIdStorage.RegisterSourceId(body.SourceId, body.SeqNo, 0, *body.AssignedOffset, CurrentTimestamp, std::move(keyRange), true);
403403
}
404404

405405
ReplyOk(ctx, response.GetCookie());
@@ -727,7 +727,7 @@ void TPartition::HandleOnWrite(TEvPQ::TEvDeregisterMessageGroup::TPtr& ev, const
727727
return ReplyError(ctx, ev->Get()->Cookie, NPersQueue::NErrorCode::SOURCEID_DELETED,
728728
"SourceId doesn't exist");
729729
}
730-
730+
731731
EmplaceRequest(TDeregisterMessageGroupMsg(*ev->Get()), ctx);
732732
}
733733

@@ -836,7 +836,7 @@ TPartition::ProcessResult TPartition::ProcessRequest(TRegisterMessageGroupMsg& m
836836
}
837837

838838
body.AssignedOffset = parameters.CurOffset;
839-
parameters.SourceIdBatch.RegisterSourceId(body.SourceId, body.SeqNo, parameters.CurOffset, CurrentTimestamp, std::move(keyRange));
839+
parameters.SourceIdBatch.RegisterSourceId(body.SourceId, body.SeqNo, 0, parameters.CurOffset, CurrentTimestamp, std::move(keyRange));
840840

841841
return ProcessResult::Continue;
842842
}
@@ -859,7 +859,7 @@ TPartition::ProcessResult TPartition::ProcessRequest(TSplitMessageGroupMsg& msg,
859859
}
860860

861861
body.AssignedOffset = parameters.CurOffset;
862-
parameters.SourceIdBatch.RegisterSourceId(body.SourceId, body.SeqNo, parameters.CurOffset, CurrentTimestamp, std::move(keyRange), true);
862+
parameters.SourceIdBatch.RegisterSourceId(body.SourceId, body.SeqNo, 0, parameters.CurOffset, CurrentTimestamp, std::move(keyRange), true);
863863
}
864864

865865
return ProcessResult::Continue;
@@ -1519,7 +1519,7 @@ void TPartition::HandleWrites(const TActorContext& ctx) {
15191519
THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
15201520

15211521
Y_ABORT_UNLESS(Head.PackedSize + NewHead.PackedSize <= 2 * MaxSizeCheck);
1522-
1522+
15231523
TInstant now = ctx.Now();
15241524
WriteCycleStartTime = now;
15251525

@@ -1592,7 +1592,7 @@ bool TPartition::WaitingForSubDomainQuota(const TActorContext& ctx, const ui64 w
15921592

15931593
void TPartition::WriteBlobWithQuota(const TActorContext& /*ctx*/, THolder<TEvKeyValue::TEvRequest>&& request) {
15941594
PQ_LOG_T("TPartition::WriteBlobWithQuota.");
1595-
1595+
15961596
// Request quota and write blob.
15971597
// Mirrored topics are not quoted in local dc.
15981598
const bool skip = !IsQuotingEnabled() || TopicWriteQuotaResourcePath.empty();

ydb/core/persqueue/pq_impl.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -1500,7 +1500,7 @@ void TPersQueue::AddCmdWriteConfig(TEvKeyValue::TEvRequest* request,
15001500
keyRange = TPartitionKeyRange::Parse(mg.GetKeyRange());
15011501
}
15021502

1503-
sourceIdWriter.RegisterSourceId(mg.GetId(), 0, 0, ctx.Now(), std::move(keyRange));
1503+
sourceIdWriter.RegisterSourceId(mg.GetId(), 0, 0, 0, ctx.Now(), std::move(keyRange));
15041504
}
15051505

15061506
for (const auto& partition : cfg.GetPartitions()) {

ydb/core/persqueue/sourceid.cpp

+11-3
Original file line numberDiff line numberDiff line change
@@ -101,25 +101,28 @@ void THeartbeatProcessor::ForgetSourceId(const TString& sourceId) {
101101
SourceIdsWithHeartbeat.erase(sourceId);
102102
}
103103

104-
TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs)
104+
TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs)
105105
: SeqNo(seqNo)
106+
, MinSeqNo(minSeqNo)
106107
, Offset(offset)
107108
, WriteTimestamp(createTs)
108109
, CreateTimestamp(createTs)
109110
{
110111
}
111112

112-
TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, THeartbeat&& heartbeat)
113+
TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs, THeartbeat&& heartbeat)
113114
: SeqNo(seqNo)
115+
, MinSeqNo(minSeqNo)
114116
, Offset(offset)
115117
, WriteTimestamp(createTs)
116118
, CreateTimestamp(createTs)
117119
, LastHeartbeat(std::move(heartbeat))
118120
{
119121
}
120122

121-
TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, TMaybe<TPartitionKeyRange>&& keyRange, bool isInSplit)
123+
TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs, TMaybe<TPartitionKeyRange>&& keyRange, bool isInSplit)
122124
: SeqNo(seqNo)
125+
, MinSeqNo(minSeqNo)
123126
, Offset(offset)
124127
, CreateTimestamp(createTs)
125128
, Explicit(true)
@@ -133,6 +136,9 @@ TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, TMaybe<
133136
TSourceIdInfo TSourceIdInfo::Updated(ui64 seqNo, ui64 offset, TInstant writeTs) const {
134137
auto copy = *this;
135138
copy.SeqNo = seqNo;
139+
if (copy.MinSeqNo == 0 || copy.MinSeqNo > seqNo) {
140+
copy.MinSeqNo = seqNo;
141+
}
136142
copy.Offset = offset;
137143
copy.WriteTimestamp = writeTs;
138144

@@ -178,6 +184,7 @@ void TSourceIdInfo::Serialize(TBuffer& data) const {
178184
TSourceIdInfo TSourceIdInfo::Parse(const NKikimrPQ::TMessageGroupInfo& proto) {
179185
TSourceIdInfo result;
180186
result.SeqNo = proto.GetSeqNo();
187+
result.MinSeqNo = proto.GetMinSeqNo();
181188
result.Offset = proto.GetOffset();
182189
result.WriteTimestamp = TInstant::FromValue(proto.GetWriteTimestamp());
183190
result.CreateTimestamp = TInstant::FromValue(proto.GetCreateTimestamp());
@@ -197,6 +204,7 @@ TSourceIdInfo TSourceIdInfo::Parse(const NKikimrPQ::TMessageGroupInfo& proto) {
197204

198205
void TSourceIdInfo::Serialize(NKikimrPQ::TMessageGroupInfo& proto) const {
199206
proto.SetSeqNo(SeqNo);
207+
proto.SetMinSeqNo(MinSeqNo);
200208
proto.SetOffset(Offset);
201209
proto.SetWriteTimestamp(WriteTimestamp.GetValue());
202210
proto.SetCreateTimestamp(CreateTimestamp.GetValue());

ydb/core/persqueue/sourceid.h

+4-3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ struct TSourceIdInfo {
2424
};
2525

2626
ui64 SeqNo = 0;
27+
ui64 MinSeqNo = 0;
2728
ui64 Offset = 0;
2829
TInstant WriteTimestamp;
2930
TInstant CreateTimestamp;
@@ -33,9 +34,9 @@ struct TSourceIdInfo {
3334
EState State = EState::Registered;
3435

3536
TSourceIdInfo() = default;
36-
TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs);
37-
TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, THeartbeat&& heartbeat);
38-
TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, TMaybe<TPartitionKeyRange>&& keyRange, bool isInSplit = false);
37+
TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs);
38+
TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs, THeartbeat&& heartbeat);
39+
TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs, TMaybe<TPartitionKeyRange>&& keyRange, bool isInSplit = false);
3940

4041
TSourceIdInfo Updated(ui64 seqNo, ui64 offset, TInstant writeTs) const;
4142
TSourceIdInfo Updated(ui64 seqNo, ui64 offset, TInstant writeTs, THeartbeat&& heartbeat) const;

ydb/core/persqueue/ut/sourceid_ut.cpp

+48-16
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
2323
TSourceIdWriter writer(ESourceIdFormat::Raw);
2424

2525
const auto sourceId = TestSourceId(1);
26-
const auto sourceIdInfo = TSourceIdInfo(1, 10, TInstant::Seconds(100));
26+
const auto sourceIdInfo = TSourceIdInfo(1, 0, 10, TInstant::Seconds(100));
2727

2828
writer.RegisterSourceId(sourceId, sourceIdInfo);
2929
UNIT_ASSERT_VALUES_EQUAL(writer.GetSourceIdsToWrite().size(), 1);
@@ -35,7 +35,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
3535
}
3636

3737
const auto anotherSourceId = TestSourceId(2);
38-
const auto anotherSourceIdInfo = TSourceIdInfo(2, 20, TInstant::Seconds(200));
38+
const auto anotherSourceIdInfo = TSourceIdInfo(2, 0, 20, TInstant::Seconds(200));
3939
UNIT_ASSERT_VALUES_UNEQUAL(sourceIdInfo, anotherSourceIdInfo);
4040

4141
{
@@ -47,7 +47,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
4747
Y_UNIT_TEST(SourceIdWriterClean) {
4848
TSourceIdWriter writer(ESourceIdFormat::Raw);
4949

50-
writer.RegisterSourceId(TestSourceId(), 1, 10, TInstant::Seconds(100));
50+
writer.RegisterSourceId(TestSourceId(), 1, 0, 10, TInstant::Seconds(100));
5151
UNIT_ASSERT_VALUES_EQUAL(writer.GetSourceIdsToWrite().size(), 1);
5252

5353
writer.Clear();
@@ -60,7 +60,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
6060
auto expectedRequest = MakeHolder<TEvKeyValue::TEvRequest>();
6161

6262
const auto sourceId = TestSourceId(1);
63-
const auto sourceIdInfo = TSourceIdInfo(1, 10, TInstant::Seconds(100));
63+
const auto sourceIdInfo = TSourceIdInfo(1, 1, 10, TInstant::Seconds(100));
6464
writer.RegisterSourceId(sourceId, sourceIdInfo);
6565
UNIT_ASSERT_VALUES_EQUAL(writer.GetSourceIdsToWrite().size(), 1);
6666
{
@@ -78,7 +78,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
7878
}
7979

8080
const auto anotherSourceId = TestSourceId(2);
81-
const auto anotherSourceIdInfo = TSourceIdInfo(2, 20, TInstant::Seconds(200));
81+
const auto anotherSourceIdInfo = TSourceIdInfo(2, 0, 20, TInstant::Seconds(200));
8282
writer.RegisterSourceId(anotherSourceId, anotherSourceIdInfo);
8383
UNIT_ASSERT_VALUES_EQUAL(writer.GetSourceIdsToWrite().size(), 2);
8484
{
@@ -102,9 +102,9 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
102102
TSourceIdStorage storage;
103103

104104
const auto sourceId = TestSourceId(1);
105-
const auto sourceIdInfo = TSourceIdInfo(1, 10, TInstant::Seconds(100));
105+
const auto sourceIdInfo = TSourceIdInfo(1, 0, 10, TInstant::Seconds(100));
106106
const auto anotherSourceId = TestSourceId(2);
107-
const auto anotherSourceIdInfo = TSourceIdInfo(2, 20, TInstant::Seconds(200));
107+
const auto anotherSourceIdInfo = TSourceIdInfo(2, 0, 20, TInstant::Seconds(200));
108108

109109
storage.RegisterSourceId(sourceId, sourceIdInfo);
110110
UNIT_ASSERT_VALUES_EQUAL(storage.GetInMemorySourceIds().size(), 1);
@@ -130,7 +130,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
130130

131131
void SourceIdStorageParseAndAdd(TKeyPrefix::EMark mark, ESourceIdFormat format) {
132132
const auto sourceId = TestSourceId();
133-
const auto sourceIdInfo = TSourceIdInfo(1, 10, TInstant::Seconds(100));
133+
const auto sourceIdInfo = TSourceIdInfo(1, 1, 10, TInstant::Seconds(100));
134134

135135
TKeyPrefix ikey(TKeyPrefix::TypeInfo, TPartitionId(TestPartition), mark);
136136
TBuffer idata;
@@ -162,20 +162,20 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
162162
TSourceIdStorage storage;
163163

164164
const auto sourceId = TestSourceId(1);
165-
storage.RegisterSourceId(sourceId, 1, 10, TInstant::Seconds(100));
165+
storage.RegisterSourceId(sourceId, 1, 0, 10, TInstant::Seconds(100));
166166
{
167167
auto ds = storage.MinAvailableTimestamp(now);
168168
UNIT_ASSERT_VALUES_EQUAL(ds, TInstant::Seconds(100));
169169
}
170170

171171
const auto anotherSourceId = TestSourceId(2);
172-
storage.RegisterSourceId(anotherSourceId, 2, 20, TInstant::Seconds(200));
172+
storage.RegisterSourceId(anotherSourceId, 2, 0, 20, TInstant::Seconds(200));
173173
{
174174
auto ds = storage.MinAvailableTimestamp(now);
175175
UNIT_ASSERT_VALUES_EQUAL(ds, TInstant::Seconds(100));
176176
}
177177

178-
storage.RegisterSourceId(sourceId, 3, 30, TInstant::Seconds(300));
178+
storage.RegisterSourceId(sourceId, 3, 0, 30, TInstant::Seconds(300));
179179
{
180180
auto ds = storage.MinAvailableTimestamp(now);
181181
UNIT_ASSERT_VALUES_EQUAL(ds, TInstant::Seconds(200));
@@ -185,7 +185,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
185185
Y_UNIT_TEST(SourceIdStorageTestClean) {
186186
TSourceIdStorage storage;
187187
for (ui64 i = 1; i <= 10000; ++i) {
188-
storage.RegisterSourceId(TestSourceId(i), i, i, TInstant::Seconds(10 * i));
188+
storage.RegisterSourceId(TestSourceId(i), i, 0, i, TInstant::Seconds(10 * i));
189189
}
190190

191191
NKikimrPQ::TPartitionConfig config;
@@ -226,7 +226,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
226226
Y_UNIT_TEST(SourceIdStorageDeleteByMaxCount) {
227227
TSourceIdStorage storage;
228228
for (ui64 i = 1; i <= 10000; ++i) {
229-
storage.RegisterSourceId(TestSourceId(i), i, i, TInstant::Seconds(10 * i));
229+
storage.RegisterSourceId(TestSourceId(i), i, 0, i, TInstant::Seconds(10 * i));
230230
}
231231

232232
NKikimrPQ::TPartitionConfig config;
@@ -258,7 +258,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
258258
Y_UNIT_TEST(SourceIdStorageComplexDelete) {
259259
TSourceIdStorage storage;
260260
for (ui64 i = 1; i <= 10000 + 1; ++i) { // add 10000 + one extra sources
261-
storage.RegisterSourceId(TestSourceId(i), i, i, TInstant::Seconds(10 * i));
261+
storage.RegisterSourceId(TestSourceId(i), i, 1, i , TInstant::Seconds(10 * i));
262262
}
263263

264264
NKikimrPQ::TPartitionConfig config;
@@ -297,7 +297,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
297297
const auto sourceId = TestSourceId(i);
298298
const auto owner = TestOwner(sourceId);
299299

300-
storage.RegisterSourceId(sourceId, i, i, TInstant::Hours(i));
300+
storage.RegisterSourceId(sourceId, i, 0, i, TInstant::Hours(i));
301301
storage.RegisterSourceIdOwner(sourceId, owner);
302302
owners[owner];
303303
}
@@ -324,7 +324,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
324324
}
325325

326326
inline static TSourceIdInfo MakeExplicitSourceIdInfo(ui64 offset, const TMaybe<THeartbeat>& heartbeat = Nothing()) {
327-
auto info = TSourceIdInfo(0, offset, TInstant::Now());
327+
auto info = TSourceIdInfo(0, 0, offset, TInstant::Now());
328328

329329
info.Explicit = true;
330330
if (heartbeat) {
@@ -460,6 +460,38 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
460460
}
461461
}
462462

463+
Y_UNIT_TEST(SourceIdMinSeqNo) {
464+
TSourceIdStorage storage;
465+
466+
const auto sourceId = TestSourceId(1);
467+
const auto sourceIdInfo = TSourceIdInfo(1, 0, 10, TInstant::Seconds(100));
468+
const auto anotherSourceId = TestSourceId(2);
469+
const auto anotherSourceIdInfo = TSourceIdInfo(2, 1, 20, TInstant::Seconds(200));
470+
471+
storage.RegisterSourceId(sourceId, sourceIdInfo);
472+
storage.RegisterSourceId(anotherSourceId, anotherSourceIdInfo);
473+
{
474+
auto it = storage.GetInMemorySourceIds().find(anotherSourceId);
475+
UNIT_ASSERT_VALUES_EQUAL(it->second.MinSeqNo, 1);
476+
}
477+
478+
storage.RegisterSourceId(sourceId, sourceIdInfo.Updated(2, 11, TInstant::Seconds(100)));
479+
{
480+
auto it = storage.GetInMemorySourceIds().find(sourceId);
481+
UNIT_ASSERT_VALUES_EQUAL(it->second.MinSeqNo, 2);
482+
}
483+
storage.RegisterSourceId(sourceId, sourceIdInfo.Updated(1, 12, TInstant::Seconds(100)));
484+
{
485+
auto it = storage.GetInMemorySourceIds().find(sourceId);
486+
UNIT_ASSERT_VALUES_EQUAL(it->second.MinSeqNo, 1);
487+
}
488+
storage.RegisterSourceId(anotherSourceId, anotherSourceIdInfo.Updated(3, 12, TInstant::Seconds(100)));
489+
{
490+
auto it = storage.GetInMemorySourceIds().find(anotherSourceId);
491+
UNIT_ASSERT_VALUES_EQUAL(it->second.MinSeqNo, 1);
492+
}
493+
}
494+
463495
} // TSourceIdTests
464496

465497
} // namespace NKikimr::NPQ

ydb/core/protos/pqconfig.proto

+1
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,7 @@ message TMessageGroupInfo {
377377
}
378378

379379
optional uint64 SeqNo = 1;
380+
optional uint64 MinSeqNo = 9;
380381
optional uint64 Offset = 2;
381382
optional uint64 WriteTimestamp = 3; // TInstant::TValue
382383
optional uint64 CreateTimestamp = 4; // TInstant::TValue

0 commit comments

Comments
 (0)