Skip to content

Commit 6026e60

Browse files
LOGBROKER-8894
1 parent 7bc2f70 commit 6026e60

8 files changed

+79
-40
lines changed

ydb/core/persqueue/partition_sourcemanager.cpp

+4-3
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ TPartitionSourceManager& TPartitionSourceManager::TModificationBatch::GetManager
104104
TPartitionSourceManager::TSourceInfo Convert(TSourceIdInfo value) {
105105
TPartitionSourceManager::TSourceInfo result(value.State);
106106
result.SeqNo = value.SeqNo;
107+
result.MinSeqNo = value.MinSeqNo;
107108
result.Offset = value.Offset;
108109
result.Explicit = value.Explicit;
109110
result.WriteTimestamp = value.WriteTimestamp;
@@ -145,11 +146,11 @@ std::optional<ui64> TPartitionSourceManager::TSourceManager::UpdatedSeqNo() cons
145146
return InWriter == WriteStorage().end() ? std::nullopt : std::optional(InWriter->second.SeqNo);
146147
}
147148

148-
void TPartitionSourceManager::TSourceManager::Update(ui64 seqNo, ui64 offset, TInstant timestamp) {
149+
void TPartitionSourceManager::TSourceManager::Update(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant timestamp) {
149150
if (InMemory == MemoryStorage().end()) {
150-
Batch.SourceIdWriter.RegisterSourceId(SourceId, seqNo, offset, timestamp);
151+
Batch.SourceIdWriter.RegisterSourceId(SourceId, seqNo, minSeqNo, offset, timestamp);
151152
} else {
152-
Batch.SourceIdWriter.RegisterSourceId(SourceId, InMemory->second.Updated(seqNo, offset, timestamp));
153+
Batch.SourceIdWriter.RegisterSourceId(SourceId, InMemory->second.Updated(seqNo, minSeqNo, offset, timestamp));
153154
}
154155
}
155156

ydb/core/persqueue/partition_sourcemanager.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ class TPartitionSourceManager {
2525

2626
TSourceIdInfo::EState State;
2727
ui64 SeqNo = 0;
28+
ui64 MinSeqNo = 0;
2829
ui64 Offset = 0;
2930
bool Explicit = false;
3031
TInstant WriteTimestamp;
@@ -45,7 +46,7 @@ class TPartitionSourceManager {
4546
std::optional<ui64> CommittedSeqNo() const;
4647
std::optional<ui64> UpdatedSeqNo() const;
4748

48-
void Update(ui64 seqNo, ui64 offset, TInstant timestamp);
49+
void Update(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant timestamp);
4950
void Update(THeartbeat&& heartbeat);
5051

5152
private:

ydb/core/persqueue/partition_write.cpp

+8-8
Original file line numberDiff line numberDiff line change
@@ -327,13 +327,13 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
327327
if (it == SourceIdStorage.GetInMemorySourceIds().end()) {
328328
Y_ABORT_UNLESS(!writeResponse.Msg.HeartbeatVersion);
329329
TabletCounters.Cumulative()[COUNTER_PQ_SID_CREATED].Increment(1);
330-
SourceIdStorage.RegisterSourceId(s, seqNo, offset, CurrentTimestamp);
330+
SourceIdStorage.RegisterSourceId(s, seqNo, 0, offset, CurrentTimestamp);
331331
} else if (const auto& hbVersion = writeResponse.Msg.HeartbeatVersion) {
332332
SourceIdStorage.RegisterSourceId(s, it->second.Updated(
333-
seqNo, offset, CurrentTimestamp, THeartbeat{*hbVersion, writeResponse.Msg.Data}
333+
seqNo, seqNo, offset, CurrentTimestamp, THeartbeat{*hbVersion, writeResponse.Msg.Data}
334334
));
335335
} else {
336-
SourceIdStorage.RegisterSourceId(s, it->second.Updated(seqNo, offset, CurrentTimestamp));
336+
SourceIdStorage.RegisterSourceId(s, it->second.Updated(seqNo, seqNo, offset, CurrentTimestamp));
337337
}
338338

339339
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_OK].Increment(1);
@@ -378,7 +378,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
378378
}
379379

380380
Y_ABORT_UNLESS(body.AssignedOffset);
381-
SourceIdStorage.RegisterSourceId(body.SourceId, body.SeqNo, *body.AssignedOffset, CurrentTimestamp, std::move(keyRange));
381+
SourceIdStorage.RegisterSourceId(body.SourceId, body.SeqNo, 0, *body.AssignedOffset, CurrentTimestamp, std::move(keyRange));
382382
ReplyOk(ctx, response.GetCookie());
383383
} else if (response.IsDeregisterMessageGroup()) {
384384
const auto& body = response.GetDeregisterMessageGroup().Body;
@@ -399,7 +399,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
399399
}
400400

401401
Y_ABORT_UNLESS(body.AssignedOffset);
402-
SourceIdStorage.RegisterSourceId(body.SourceId, body.SeqNo, *body.AssignedOffset, CurrentTimestamp, std::move(keyRange), true);
402+
SourceIdStorage.RegisterSourceId(body.SourceId, body.SeqNo, 0, *body.AssignedOffset, CurrentTimestamp, std::move(keyRange), true);
403403
}
404404

405405
ReplyOk(ctx, response.GetCookie());
@@ -836,7 +836,7 @@ TPartition::ProcessResult TPartition::ProcessRequest(TRegisterMessageGroupMsg& m
836836
}
837837

838838
body.AssignedOffset = parameters.CurOffset;
839-
parameters.SourceIdBatch.RegisterSourceId(body.SourceId, body.SeqNo, parameters.CurOffset, CurrentTimestamp, std::move(keyRange));
839+
parameters.SourceIdBatch.RegisterSourceId(body.SourceId, body.SeqNo, 0, parameters.CurOffset, CurrentTimestamp, std::move(keyRange));
840840

841841
return ProcessResult::Continue;
842842
}
@@ -859,7 +859,7 @@ TPartition::ProcessResult TPartition::ProcessRequest(TSplitMessageGroupMsg& msg,
859859
}
860860

861861
body.AssignedOffset = parameters.CurOffset;
862-
parameters.SourceIdBatch.RegisterSourceId(body.SourceId, body.SeqNo, parameters.CurOffset, CurrentTimestamp, std::move(keyRange), true);
862+
parameters.SourceIdBatch.RegisterSourceId(body.SourceId, body.SeqNo, 0, parameters.CurOffset, CurrentTimestamp, std::move(keyRange), true);
863863
}
864864

865865
return ProcessResult::Continue;
@@ -1132,7 +1132,7 @@ TPartition::ProcessResult TPartition::ProcessRequest(TWriteMsg& p, ProcessParame
11321132
<< " NewHead: " << NewHead
11331133
);
11341134

1135-
sourceId.Update(p.Msg.SeqNo, curOffset, CurrentTimestamp);
1135+
sourceId.Update(p.Msg.SeqNo, p.Msg.SeqNo, curOffset, CurrentTimestamp);
11361136

11371137
++curOffset;
11381138
PartitionedBlob = TPartitionedBlob(Partition, 0, "", 0, 0, 0, Head, NewHead, true, false, MaxBlobSize);

ydb/core/persqueue/pq_impl.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -1500,7 +1500,7 @@ void TPersQueue::AddCmdWriteConfig(TEvKeyValue::TEvRequest* request,
15001500
keyRange = TPartitionKeyRange::Parse(mg.GetKeyRange());
15011501
}
15021502

1503-
sourceIdWriter.RegisterSourceId(mg.GetId(), 0, 0, ctx.Now(), std::move(keyRange));
1503+
sourceIdWriter.RegisterSourceId(mg.GetId(), 0, 0, 0, ctx.Now(), std::move(keyRange));
15041504
}
15051505

15061506
for (const auto& partition : cfg.GetPartitions()) {

ydb/core/persqueue/sourceid.cpp

+14-6
Original file line numberDiff line numberDiff line change
@@ -101,25 +101,28 @@ void THeartbeatProcessor::ForgetSourceId(const TString& sourceId) {
101101
SourceIdsWithHeartbeat.erase(sourceId);
102102
}
103103

104-
TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs)
104+
TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs)
105105
: SeqNo(seqNo)
106+
, MinSeqNo(minSeqNo)
106107
, Offset(offset)
107108
, WriteTimestamp(createTs)
108109
, CreateTimestamp(createTs)
109110
{
110111
}
111112

112-
TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, THeartbeat&& heartbeat)
113+
TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs, THeartbeat&& heartbeat)
113114
: SeqNo(seqNo)
115+
, MinSeqNo(minSeqNo)
114116
, Offset(offset)
115117
, WriteTimestamp(createTs)
116118
, CreateTimestamp(createTs)
117119
, LastHeartbeat(std::move(heartbeat))
118120
{
119121
}
120122

121-
TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, TMaybe<TPartitionKeyRange>&& keyRange, bool isInSplit)
123+
TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs, TMaybe<TPartitionKeyRange>&& keyRange, bool isInSplit)
122124
: SeqNo(seqNo)
125+
, MinSeqNo(minSeqNo)
123126
, Offset(offset)
124127
, CreateTimestamp(createTs)
125128
, Explicit(true)
@@ -130,17 +133,20 @@ TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, TMaybe<
130133
}
131134
}
132135

133-
TSourceIdInfo TSourceIdInfo::Updated(ui64 seqNo, ui64 offset, TInstant writeTs) const {
136+
TSourceIdInfo TSourceIdInfo::Updated(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant writeTs) const {
134137
auto copy = *this;
135138
copy.SeqNo = seqNo;
139+
if (minSeqNo) {
140+
copy.MinSeqNo = minSeqNo;
141+
}
136142
copy.Offset = offset;
137143
copy.WriteTimestamp = writeTs;
138144

139145
return copy;
140146
}
141147

142-
TSourceIdInfo TSourceIdInfo::Updated(ui64 seqNo, ui64 offset, TInstant writeTs, THeartbeat&& heartbeat) const {
143-
auto copy = Updated(seqNo, offset, writeTs);
148+
TSourceIdInfo TSourceIdInfo::Updated(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant writeTs, THeartbeat&& heartbeat) const {
149+
auto copy = Updated(seqNo, minSeqNo, offset, writeTs);
144150
copy.LastHeartbeat = std::move(heartbeat);
145151

146152
return copy;
@@ -178,6 +184,7 @@ void TSourceIdInfo::Serialize(TBuffer& data) const {
178184
TSourceIdInfo TSourceIdInfo::Parse(const NKikimrPQ::TMessageGroupInfo& proto) {
179185
TSourceIdInfo result;
180186
result.SeqNo = proto.GetSeqNo();
187+
result.MinSeqNo = proto.GetMinSeqNo();
181188
result.Offset = proto.GetOffset();
182189
result.WriteTimestamp = TInstant::FromValue(proto.GetWriteTimestamp());
183190
result.CreateTimestamp = TInstant::FromValue(proto.GetCreateTimestamp());
@@ -197,6 +204,7 @@ TSourceIdInfo TSourceIdInfo::Parse(const NKikimrPQ::TMessageGroupInfo& proto) {
197204

198205
void TSourceIdInfo::Serialize(NKikimrPQ::TMessageGroupInfo& proto) const {
199206
proto.SetSeqNo(SeqNo);
207+
proto.SetMinSeqNo(MinSeqNo);
200208
proto.SetOffset(Offset);
201209
proto.SetWriteTimestamp(WriteTimestamp.GetValue());
202210
proto.SetCreateTimestamp(CreateTimestamp.GetValue());

ydb/core/persqueue/sourceid.h

+6-5
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ struct TSourceIdInfo {
2424
};
2525

2626
ui64 SeqNo = 0;
27+
ui64 MinSeqNo = 0;
2728
ui64 Offset = 0;
2829
TInstant WriteTimestamp;
2930
TInstant CreateTimestamp;
@@ -33,12 +34,12 @@ struct TSourceIdInfo {
3334
EState State = EState::Registered;
3435

3536
TSourceIdInfo() = default;
36-
TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs);
37-
TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, THeartbeat&& heartbeat);
38-
TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, TMaybe<TPartitionKeyRange>&& keyRange, bool isInSplit = false);
37+
TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs);
38+
TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs, THeartbeat&& heartbeat);
39+
TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs, TMaybe<TPartitionKeyRange>&& keyRange, bool isInSplit = false);
3940

40-
TSourceIdInfo Updated(ui64 seqNo, ui64 offset, TInstant writeTs) const;
41-
TSourceIdInfo Updated(ui64 seqNo, ui64 offset, TInstant writeTs, THeartbeat&& heartbeat) const;
41+
TSourceIdInfo Updated(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant writeTs) const;
42+
TSourceIdInfo Updated(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant writeTs, THeartbeat&& heartbeat) const;
4243

4344
static EState ConvertState(NKikimrPQ::TMessageGroupInfo::EState value);
4445
static NKikimrPQ::TMessageGroupInfo::EState ConvertState(EState value);

ydb/core/persqueue/ut/sourceid_ut.cpp

+43-16
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
2323
TSourceIdWriter writer(ESourceIdFormat::Raw);
2424

2525
const auto sourceId = TestSourceId(1);
26-
const auto sourceIdInfo = TSourceIdInfo(1, 10, TInstant::Seconds(100));
26+
const auto sourceIdInfo = TSourceIdInfo(1, 0, 10, TInstant::Seconds(100));
2727

2828
writer.RegisterSourceId(sourceId, sourceIdInfo);
2929
UNIT_ASSERT_VALUES_EQUAL(writer.GetSourceIdsToWrite().size(), 1);
@@ -35,7 +35,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
3535
}
3636

3737
const auto anotherSourceId = TestSourceId(2);
38-
const auto anotherSourceIdInfo = TSourceIdInfo(2, 20, TInstant::Seconds(200));
38+
const auto anotherSourceIdInfo = TSourceIdInfo(2, 0, 20, TInstant::Seconds(200));
3939
UNIT_ASSERT_VALUES_UNEQUAL(sourceIdInfo, anotherSourceIdInfo);
4040

4141
{
@@ -47,7 +47,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
4747
Y_UNIT_TEST(SourceIdWriterClean) {
4848
TSourceIdWriter writer(ESourceIdFormat::Raw);
4949

50-
writer.RegisterSourceId(TestSourceId(), 1, 10, TInstant::Seconds(100));
50+
writer.RegisterSourceId(TestSourceId(), 1, 0, 10, TInstant::Seconds(100));
5151
UNIT_ASSERT_VALUES_EQUAL(writer.GetSourceIdsToWrite().size(), 1);
5252

5353
writer.Clear();
@@ -60,7 +60,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
6060
auto expectedRequest = MakeHolder<TEvKeyValue::TEvRequest>();
6161

6262
const auto sourceId = TestSourceId(1);
63-
const auto sourceIdInfo = TSourceIdInfo(1, 10, TInstant::Seconds(100));
63+
const auto sourceIdInfo = TSourceIdInfo(1, 1, 10, TInstant::Seconds(100));
6464
writer.RegisterSourceId(sourceId, sourceIdInfo);
6565
UNIT_ASSERT_VALUES_EQUAL(writer.GetSourceIdsToWrite().size(), 1);
6666
{
@@ -78,7 +78,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
7878
}
7979

8080
const auto anotherSourceId = TestSourceId(2);
81-
const auto anotherSourceIdInfo = TSourceIdInfo(2, 20, TInstant::Seconds(200));
81+
const auto anotherSourceIdInfo = TSourceIdInfo(2, 0, 20, TInstant::Seconds(200));
8282
writer.RegisterSourceId(anotherSourceId, anotherSourceIdInfo);
8383
UNIT_ASSERT_VALUES_EQUAL(writer.GetSourceIdsToWrite().size(), 2);
8484
{
@@ -102,9 +102,9 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
102102
TSourceIdStorage storage;
103103

104104
const auto sourceId = TestSourceId(1);
105-
const auto sourceIdInfo = TSourceIdInfo(1, 10, TInstant::Seconds(100));
105+
const auto sourceIdInfo = TSourceIdInfo(1, 0, 10, TInstant::Seconds(100));
106106
const auto anotherSourceId = TestSourceId(2);
107-
const auto anotherSourceIdInfo = TSourceIdInfo(2, 20, TInstant::Seconds(200));
107+
const auto anotherSourceIdInfo = TSourceIdInfo(2, 0, 20, TInstant::Seconds(200));
108108

109109
storage.RegisterSourceId(sourceId, sourceIdInfo);
110110
UNIT_ASSERT_VALUES_EQUAL(storage.GetInMemorySourceIds().size(), 1);
@@ -130,7 +130,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
130130

131131
void SourceIdStorageParseAndAdd(TKeyPrefix::EMark mark, ESourceIdFormat format) {
132132
const auto sourceId = TestSourceId();
133-
const auto sourceIdInfo = TSourceIdInfo(1, 10, TInstant::Seconds(100));
133+
const auto sourceIdInfo = TSourceIdInfo(1, 1, 10, TInstant::Seconds(100));
134134

135135
TKeyPrefix ikey(TKeyPrefix::TypeInfo, TPartitionId(TestPartition), mark);
136136
TBuffer idata;
@@ -162,20 +162,20 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
162162
TSourceIdStorage storage;
163163

164164
const auto sourceId = TestSourceId(1);
165-
storage.RegisterSourceId(sourceId, 1, 10, TInstant::Seconds(100));
165+
storage.RegisterSourceId(sourceId, 1, 0, 10, TInstant::Seconds(100));
166166
{
167167
auto ds = storage.MinAvailableTimestamp(now);
168168
UNIT_ASSERT_VALUES_EQUAL(ds, TInstant::Seconds(100));
169169
}
170170

171171
const auto anotherSourceId = TestSourceId(2);
172-
storage.RegisterSourceId(anotherSourceId, 2, 20, TInstant::Seconds(200));
172+
storage.RegisterSourceId(anotherSourceId, 2, 0, 20, TInstant::Seconds(200));
173173
{
174174
auto ds = storage.MinAvailableTimestamp(now);
175175
UNIT_ASSERT_VALUES_EQUAL(ds, TInstant::Seconds(100));
176176
}
177177

178-
storage.RegisterSourceId(sourceId, 3, 30, TInstant::Seconds(300));
178+
storage.RegisterSourceId(sourceId, 3, 0, 30, TInstant::Seconds(300));
179179
{
180180
auto ds = storage.MinAvailableTimestamp(now);
181181
UNIT_ASSERT_VALUES_EQUAL(ds, TInstant::Seconds(200));
@@ -185,7 +185,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
185185
Y_UNIT_TEST(SourceIdStorageTestClean) {
186186
TSourceIdStorage storage;
187187
for (ui64 i = 1; i <= 10000; ++i) {
188-
storage.RegisterSourceId(TestSourceId(i), i, i, TInstant::Seconds(10 * i));
188+
storage.RegisterSourceId(TestSourceId(i), i, 0, i, TInstant::Seconds(10 * i));
189189
}
190190

191191
NKikimrPQ::TPartitionConfig config;
@@ -226,7 +226,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
226226
Y_UNIT_TEST(SourceIdStorageDeleteByMaxCount) {
227227
TSourceIdStorage storage;
228228
for (ui64 i = 1; i <= 10000; ++i) {
229-
storage.RegisterSourceId(TestSourceId(i), i, i, TInstant::Seconds(10 * i));
229+
storage.RegisterSourceId(TestSourceId(i), i, 0, i, TInstant::Seconds(10 * i));
230230
}
231231

232232
NKikimrPQ::TPartitionConfig config;
@@ -258,7 +258,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
258258
Y_UNIT_TEST(SourceIdStorageComplexDelete) {
259259
TSourceIdStorage storage;
260260
for (ui64 i = 1; i <= 10000 + 1; ++i) { // add 10000 + one extra sources
261-
storage.RegisterSourceId(TestSourceId(i), i, i, TInstant::Seconds(10 * i));
261+
storage.RegisterSourceId(TestSourceId(i), i, 1, i , TInstant::Seconds(10 * i));
262262
}
263263

264264
NKikimrPQ::TPartitionConfig config;
@@ -297,7 +297,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
297297
const auto sourceId = TestSourceId(i);
298298
const auto owner = TestOwner(sourceId);
299299

300-
storage.RegisterSourceId(sourceId, i, i, TInstant::Hours(i));
300+
storage.RegisterSourceId(sourceId, i, 0, i, TInstant::Hours(i));
301301
storage.RegisterSourceIdOwner(sourceId, owner);
302302
owners[owner];
303303
}
@@ -324,7 +324,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
324324
}
325325

326326
inline static TSourceIdInfo MakeExplicitSourceIdInfo(ui64 offset, const TMaybe<THeartbeat>& heartbeat = Nothing()) {
327-
auto info = TSourceIdInfo(0, offset, TInstant::Now());
327+
auto info = TSourceIdInfo(0, 0, offset, TInstant::Now());
328328

329329
info.Explicit = true;
330330
if (heartbeat) {
@@ -460,6 +460,33 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
460460
}
461461
}
462462

463+
Y_UNIT_TEST(SourceIdMinSeqNo) {
464+
TSourceIdStorage storage;
465+
466+
const auto sourceId = TestSourceId(1);
467+
const auto sourceIdInfo = TSourceIdInfo(1, 0, 10, TInstant::Seconds(100));
468+
const auto anotherSourceId = TestSourceId(2);
469+
const auto anotherSourceIdInfo = TSourceIdInfo(2, 1, 20, TInstant::Seconds(200));
470+
471+
storage.RegisterSourceId(sourceId, sourceIdInfo);
472+
storage.RegisterSourceId(sourceId, anotherSourceIdInfo);
473+
storage.RegisterSourceId(sourceId, sourceIdInfo.Updated(2, 2, 11, TInstant::Seconds(100)));
474+
{
475+
auto it = storage.GetInMemorySourceIds().find(sourceId);
476+
UNIT_ASSERT_VALUES_EQUAL(it->second.MinSeqNo, 2);
477+
}
478+
storage.RegisterSourceId(sourceId, sourceIdInfo.Updated(2, 1, 12, TInstant::Seconds(100)));
479+
{
480+
auto it = storage.GetInMemorySourceIds().find(sourceId);
481+
UNIT_ASSERT_VALUES_EQUAL(it->second.MinSeqNo, 1);
482+
}
483+
storage.RegisterSourceId(anotherSourceId, anotherSourceIdInfo.Updated(2, 0, 12, TInstant::Seconds(100)));
484+
{
485+
auto it = storage.GetInMemorySourceIds().find(anotherSourceId);
486+
UNIT_ASSERT_VALUES_EQUAL(it->second.MinSeqNo, 1);
487+
}
488+
}
489+
463490
} // TSourceIdTests
464491

465492
} // namespace NKikimr::NPQ

ydb/core/protos/pqconfig.proto

+1
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,7 @@ message TMessageGroupInfo {
377377
}
378378

379379
optional uint64 SeqNo = 1;
380+
optional uint64 MinSeqNo = 9;
380381
optional uint64 Offset = 2;
381382
optional uint64 WriteTimestamp = 3; // TInstant::TValue
382383
optional uint64 CreateTimestamp = 4; // TInstant::TValue

0 commit comments

Comments
 (0)