Skip to content

Commit 2624f93

Browse files
Revert "LOGBROKER-8894"
This reverts commit 6026e60.
1 parent 4f50eb4 commit 2624f93

8 files changed

+40
-79
lines changed

ydb/core/persqueue/partition_sourcemanager.cpp

+3-4
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,6 @@ TPartitionSourceManager& TPartitionSourceManager::TModificationBatch::GetManager
104104
TPartitionSourceManager::TSourceInfo Convert(TSourceIdInfo value) {
105105
TPartitionSourceManager::TSourceInfo result(value.State);
106106
result.SeqNo = value.SeqNo;
107-
result.MinSeqNo = value.MinSeqNo;
108107
result.Offset = value.Offset;
109108
result.Explicit = value.Explicit;
110109
result.WriteTimestamp = value.WriteTimestamp;
@@ -146,11 +145,11 @@ std::optional<ui64> TPartitionSourceManager::TSourceManager::UpdatedSeqNo() cons
146145
return InWriter == WriteStorage().end() ? std::nullopt : std::optional(InWriter->second.SeqNo);
147146
}
148147

149-
void TPartitionSourceManager::TSourceManager::Update(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant timestamp) {
148+
void TPartitionSourceManager::TSourceManager::Update(ui64 seqNo, ui64 offset, TInstant timestamp) {
150149
if (InMemory == MemoryStorage().end()) {
151-
Batch.SourceIdWriter.RegisterSourceId(SourceId, seqNo, minSeqNo, offset, timestamp);
150+
Batch.SourceIdWriter.RegisterSourceId(SourceId, seqNo, offset, timestamp);
152151
} else {
153-
Batch.SourceIdWriter.RegisterSourceId(SourceId, InMemory->second.Updated(seqNo, minSeqNo, offset, timestamp));
152+
Batch.SourceIdWriter.RegisterSourceId(SourceId, InMemory->second.Updated(seqNo, offset, timestamp));
154153
}
155154
}
156155

ydb/core/persqueue/partition_sourcemanager.h

+1-2
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ class TPartitionSourceManager {
2525

2626
TSourceIdInfo::EState State;
2727
ui64 SeqNo = 0;
28-
ui64 MinSeqNo = 0;
2928
ui64 Offset = 0;
3029
bool Explicit = false;
3130
TInstant WriteTimestamp;
@@ -46,7 +45,7 @@ class TPartitionSourceManager {
4645
std::optional<ui64> CommittedSeqNo() const;
4746
std::optional<ui64> UpdatedSeqNo() const;
4847

49-
void Update(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant timestamp);
48+
void Update(ui64 seqNo, ui64 offset, TInstant timestamp);
5049
void Update(THeartbeat&& heartbeat);
5150

5251
private:

ydb/core/persqueue/partition_write.cpp

+8-8
Original file line numberDiff line numberDiff line change
@@ -327,13 +327,13 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
327327
if (it == SourceIdStorage.GetInMemorySourceIds().end()) {
328328
Y_ABORT_UNLESS(!writeResponse.Msg.HeartbeatVersion);
329329
TabletCounters.Cumulative()[COUNTER_PQ_SID_CREATED].Increment(1);
330-
SourceIdStorage.RegisterSourceId(s, seqNo, 0, offset, CurrentTimestamp);
330+
SourceIdStorage.RegisterSourceId(s, seqNo, offset, CurrentTimestamp);
331331
} else if (const auto& hbVersion = writeResponse.Msg.HeartbeatVersion) {
332332
SourceIdStorage.RegisterSourceId(s, it->second.Updated(
333-
seqNo, seqNo, offset, CurrentTimestamp, THeartbeat{*hbVersion, writeResponse.Msg.Data}
333+
seqNo, offset, CurrentTimestamp, THeartbeat{*hbVersion, writeResponse.Msg.Data}
334334
));
335335
} else {
336-
SourceIdStorage.RegisterSourceId(s, it->second.Updated(seqNo, seqNo, offset, CurrentTimestamp));
336+
SourceIdStorage.RegisterSourceId(s, it->second.Updated(seqNo, offset, CurrentTimestamp));
337337
}
338338

339339
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_OK].Increment(1);
@@ -378,7 +378,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
378378
}
379379

380380
Y_ABORT_UNLESS(body.AssignedOffset);
381-
SourceIdStorage.RegisterSourceId(body.SourceId, body.SeqNo, 0, *body.AssignedOffset, CurrentTimestamp, std::move(keyRange));
381+
SourceIdStorage.RegisterSourceId(body.SourceId, body.SeqNo, *body.AssignedOffset, CurrentTimestamp, std::move(keyRange));
382382
ReplyOk(ctx, response.GetCookie());
383383
} else if (response.IsDeregisterMessageGroup()) {
384384
const auto& body = response.GetDeregisterMessageGroup().Body;
@@ -399,7 +399,7 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
399399
}
400400

401401
Y_ABORT_UNLESS(body.AssignedOffset);
402-
SourceIdStorage.RegisterSourceId(body.SourceId, body.SeqNo, 0, *body.AssignedOffset, CurrentTimestamp, std::move(keyRange), true);
402+
SourceIdStorage.RegisterSourceId(body.SourceId, body.SeqNo, *body.AssignedOffset, CurrentTimestamp, std::move(keyRange), true);
403403
}
404404

405405
ReplyOk(ctx, response.GetCookie());
@@ -836,7 +836,7 @@ TPartition::ProcessResult TPartition::ProcessRequest(TRegisterMessageGroupMsg& m
836836
}
837837

838838
body.AssignedOffset = parameters.CurOffset;
839-
parameters.SourceIdBatch.RegisterSourceId(body.SourceId, body.SeqNo, 0, parameters.CurOffset, CurrentTimestamp, std::move(keyRange));
839+
parameters.SourceIdBatch.RegisterSourceId(body.SourceId, body.SeqNo, parameters.CurOffset, CurrentTimestamp, std::move(keyRange));
840840

841841
return ProcessResult::Continue;
842842
}
@@ -859,7 +859,7 @@ TPartition::ProcessResult TPartition::ProcessRequest(TSplitMessageGroupMsg& msg,
859859
}
860860

861861
body.AssignedOffset = parameters.CurOffset;
862-
parameters.SourceIdBatch.RegisterSourceId(body.SourceId, body.SeqNo, 0, parameters.CurOffset, CurrentTimestamp, std::move(keyRange), true);
862+
parameters.SourceIdBatch.RegisterSourceId(body.SourceId, body.SeqNo, parameters.CurOffset, CurrentTimestamp, std::move(keyRange), true);
863863
}
864864

865865
return ProcessResult::Continue;
@@ -1132,7 +1132,7 @@ TPartition::ProcessResult TPartition::ProcessRequest(TWriteMsg& p, ProcessParame
11321132
<< " NewHead: " << NewHead
11331133
);
11341134

1135-
sourceId.Update(p.Msg.SeqNo, p.Msg.SeqNo, curOffset, CurrentTimestamp);
1135+
sourceId.Update(p.Msg.SeqNo, curOffset, CurrentTimestamp);
11361136

11371137
++curOffset;
11381138
PartitionedBlob = TPartitionedBlob(Partition, 0, "", 0, 0, 0, Head, NewHead, true, false, MaxBlobSize);

ydb/core/persqueue/pq_impl.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -1500,7 +1500,7 @@ void TPersQueue::AddCmdWriteConfig(TEvKeyValue::TEvRequest* request,
15001500
keyRange = TPartitionKeyRange::Parse(mg.GetKeyRange());
15011501
}
15021502

1503-
sourceIdWriter.RegisterSourceId(mg.GetId(), 0, 0, 0, ctx.Now(), std::move(keyRange));
1503+
sourceIdWriter.RegisterSourceId(mg.GetId(), 0, 0, ctx.Now(), std::move(keyRange));
15041504
}
15051505

15061506
for (const auto& partition : cfg.GetPartitions()) {

ydb/core/persqueue/sourceid.cpp

+6-14
Original file line numberDiff line numberDiff line change
@@ -101,28 +101,25 @@ void THeartbeatProcessor::ForgetSourceId(const TString& sourceId) {
101101
SourceIdsWithHeartbeat.erase(sourceId);
102102
}
103103

104-
TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs)
104+
TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs)
105105
: SeqNo(seqNo)
106-
, MinSeqNo(minSeqNo)
107106
, Offset(offset)
108107
, WriteTimestamp(createTs)
109108
, CreateTimestamp(createTs)
110109
{
111110
}
112111

113-
TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs, THeartbeat&& heartbeat)
112+
TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, THeartbeat&& heartbeat)
114113
: SeqNo(seqNo)
115-
, MinSeqNo(minSeqNo)
116114
, Offset(offset)
117115
, WriteTimestamp(createTs)
118116
, CreateTimestamp(createTs)
119117
, LastHeartbeat(std::move(heartbeat))
120118
{
121119
}
122120

123-
TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs, TMaybe<TPartitionKeyRange>&& keyRange, bool isInSplit)
121+
TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, TMaybe<TPartitionKeyRange>&& keyRange, bool isInSplit)
124122
: SeqNo(seqNo)
125-
, MinSeqNo(minSeqNo)
126123
, Offset(offset)
127124
, CreateTimestamp(createTs)
128125
, Explicit(true)
@@ -133,20 +130,17 @@ TSourceIdInfo::TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant cr
133130
}
134131
}
135132

136-
TSourceIdInfo TSourceIdInfo::Updated(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant writeTs) const {
133+
TSourceIdInfo TSourceIdInfo::Updated(ui64 seqNo, ui64 offset, TInstant writeTs) const {
137134
auto copy = *this;
138135
copy.SeqNo = seqNo;
139-
if (minSeqNo) {
140-
copy.MinSeqNo = minSeqNo;
141-
}
142136
copy.Offset = offset;
143137
copy.WriteTimestamp = writeTs;
144138

145139
return copy;
146140
}
147141

148-
TSourceIdInfo TSourceIdInfo::Updated(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant writeTs, THeartbeat&& heartbeat) const {
149-
auto copy = Updated(seqNo, minSeqNo, offset, writeTs);
142+
TSourceIdInfo TSourceIdInfo::Updated(ui64 seqNo, ui64 offset, TInstant writeTs, THeartbeat&& heartbeat) const {
143+
auto copy = Updated(seqNo, offset, writeTs);
150144
copy.LastHeartbeat = std::move(heartbeat);
151145

152146
return copy;
@@ -184,7 +178,6 @@ void TSourceIdInfo::Serialize(TBuffer& data) const {
184178
TSourceIdInfo TSourceIdInfo::Parse(const NKikimrPQ::TMessageGroupInfo& proto) {
185179
TSourceIdInfo result;
186180
result.SeqNo = proto.GetSeqNo();
187-
result.MinSeqNo = proto.GetMinSeqNo();
188181
result.Offset = proto.GetOffset();
189182
result.WriteTimestamp = TInstant::FromValue(proto.GetWriteTimestamp());
190183
result.CreateTimestamp = TInstant::FromValue(proto.GetCreateTimestamp());
@@ -204,7 +197,6 @@ TSourceIdInfo TSourceIdInfo::Parse(const NKikimrPQ::TMessageGroupInfo& proto) {
204197

205198
void TSourceIdInfo::Serialize(NKikimrPQ::TMessageGroupInfo& proto) const {
206199
proto.SetSeqNo(SeqNo);
207-
proto.SetMinSeqNo(MinSeqNo);
208200
proto.SetOffset(Offset);
209201
proto.SetWriteTimestamp(WriteTimestamp.GetValue());
210202
proto.SetCreateTimestamp(CreateTimestamp.GetValue());

ydb/core/persqueue/sourceid.h

+5-6
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ struct TSourceIdInfo {
2424
};
2525

2626
ui64 SeqNo = 0;
27-
ui64 MinSeqNo = 0;
2827
ui64 Offset = 0;
2928
TInstant WriteTimestamp;
3029
TInstant CreateTimestamp;
@@ -34,12 +33,12 @@ struct TSourceIdInfo {
3433
EState State = EState::Registered;
3534

3635
TSourceIdInfo() = default;
37-
TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs);
38-
TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs, THeartbeat&& heartbeat);
39-
TSourceIdInfo(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant createTs, TMaybe<TPartitionKeyRange>&& keyRange, bool isInSplit = false);
36+
TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs);
37+
TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, THeartbeat&& heartbeat);
38+
TSourceIdInfo(ui64 seqNo, ui64 offset, TInstant createTs, TMaybe<TPartitionKeyRange>&& keyRange, bool isInSplit = false);
4039

41-
TSourceIdInfo Updated(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant writeTs) const;
42-
TSourceIdInfo Updated(ui64 seqNo, ui64 minSeqNo, ui64 offset, TInstant writeTs, THeartbeat&& heartbeat) const;
40+
TSourceIdInfo Updated(ui64 seqNo, ui64 offset, TInstant writeTs) const;
41+
TSourceIdInfo Updated(ui64 seqNo, ui64 offset, TInstant writeTs, THeartbeat&& heartbeat) const;
4342

4443
static EState ConvertState(NKikimrPQ::TMessageGroupInfo::EState value);
4544
static NKikimrPQ::TMessageGroupInfo::EState ConvertState(EState value);

ydb/core/persqueue/ut/sourceid_ut.cpp

+16-43
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
2323
TSourceIdWriter writer(ESourceIdFormat::Raw);
2424

2525
const auto sourceId = TestSourceId(1);
26-
const auto sourceIdInfo = TSourceIdInfo(1, 0, 10, TInstant::Seconds(100));
26+
const auto sourceIdInfo = TSourceIdInfo(1, 10, TInstant::Seconds(100));
2727

2828
writer.RegisterSourceId(sourceId, sourceIdInfo);
2929
UNIT_ASSERT_VALUES_EQUAL(writer.GetSourceIdsToWrite().size(), 1);
@@ -35,7 +35,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
3535
}
3636

3737
const auto anotherSourceId = TestSourceId(2);
38-
const auto anotherSourceIdInfo = TSourceIdInfo(2, 0, 20, TInstant::Seconds(200));
38+
const auto anotherSourceIdInfo = TSourceIdInfo(2, 20, TInstant::Seconds(200));
3939
UNIT_ASSERT_VALUES_UNEQUAL(sourceIdInfo, anotherSourceIdInfo);
4040

4141
{
@@ -47,7 +47,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
4747
Y_UNIT_TEST(SourceIdWriterClean) {
4848
TSourceIdWriter writer(ESourceIdFormat::Raw);
4949

50-
writer.RegisterSourceId(TestSourceId(), 1, 0, 10, TInstant::Seconds(100));
50+
writer.RegisterSourceId(TestSourceId(), 1, 10, TInstant::Seconds(100));
5151
UNIT_ASSERT_VALUES_EQUAL(writer.GetSourceIdsToWrite().size(), 1);
5252

5353
writer.Clear();
@@ -60,7 +60,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
6060
auto expectedRequest = MakeHolder<TEvKeyValue::TEvRequest>();
6161

6262
const auto sourceId = TestSourceId(1);
63-
const auto sourceIdInfo = TSourceIdInfo(1, 1, 10, TInstant::Seconds(100));
63+
const auto sourceIdInfo = TSourceIdInfo(1, 10, TInstant::Seconds(100));
6464
writer.RegisterSourceId(sourceId, sourceIdInfo);
6565
UNIT_ASSERT_VALUES_EQUAL(writer.GetSourceIdsToWrite().size(), 1);
6666
{
@@ -78,7 +78,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
7878
}
7979

8080
const auto anotherSourceId = TestSourceId(2);
81-
const auto anotherSourceIdInfo = TSourceIdInfo(2, 0, 20, TInstant::Seconds(200));
81+
const auto anotherSourceIdInfo = TSourceIdInfo(2, 20, TInstant::Seconds(200));
8282
writer.RegisterSourceId(anotherSourceId, anotherSourceIdInfo);
8383
UNIT_ASSERT_VALUES_EQUAL(writer.GetSourceIdsToWrite().size(), 2);
8484
{
@@ -102,9 +102,9 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
102102
TSourceIdStorage storage;
103103

104104
const auto sourceId = TestSourceId(1);
105-
const auto sourceIdInfo = TSourceIdInfo(1, 0, 10, TInstant::Seconds(100));
105+
const auto sourceIdInfo = TSourceIdInfo(1, 10, TInstant::Seconds(100));
106106
const auto anotherSourceId = TestSourceId(2);
107-
const auto anotherSourceIdInfo = TSourceIdInfo(2, 0, 20, TInstant::Seconds(200));
107+
const auto anotherSourceIdInfo = TSourceIdInfo(2, 20, TInstant::Seconds(200));
108108

109109
storage.RegisterSourceId(sourceId, sourceIdInfo);
110110
UNIT_ASSERT_VALUES_EQUAL(storage.GetInMemorySourceIds().size(), 1);
@@ -130,7 +130,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
130130

131131
void SourceIdStorageParseAndAdd(TKeyPrefix::EMark mark, ESourceIdFormat format) {
132132
const auto sourceId = TestSourceId();
133-
const auto sourceIdInfo = TSourceIdInfo(1, 1, 10, TInstant::Seconds(100));
133+
const auto sourceIdInfo = TSourceIdInfo(1, 10, TInstant::Seconds(100));
134134

135135
TKeyPrefix ikey(TKeyPrefix::TypeInfo, TPartitionId(TestPartition), mark);
136136
TBuffer idata;
@@ -162,20 +162,20 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
162162
TSourceIdStorage storage;
163163

164164
const auto sourceId = TestSourceId(1);
165-
storage.RegisterSourceId(sourceId, 1, 0, 10, TInstant::Seconds(100));
165+
storage.RegisterSourceId(sourceId, 1, 10, TInstant::Seconds(100));
166166
{
167167
auto ds = storage.MinAvailableTimestamp(now);
168168
UNIT_ASSERT_VALUES_EQUAL(ds, TInstant::Seconds(100));
169169
}
170170

171171
const auto anotherSourceId = TestSourceId(2);
172-
storage.RegisterSourceId(anotherSourceId, 2, 0, 20, TInstant::Seconds(200));
172+
storage.RegisterSourceId(anotherSourceId, 2, 20, TInstant::Seconds(200));
173173
{
174174
auto ds = storage.MinAvailableTimestamp(now);
175175
UNIT_ASSERT_VALUES_EQUAL(ds, TInstant::Seconds(100));
176176
}
177177

178-
storage.RegisterSourceId(sourceId, 3, 0, 30, TInstant::Seconds(300));
178+
storage.RegisterSourceId(sourceId, 3, 30, TInstant::Seconds(300));
179179
{
180180
auto ds = storage.MinAvailableTimestamp(now);
181181
UNIT_ASSERT_VALUES_EQUAL(ds, TInstant::Seconds(200));
@@ -185,7 +185,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
185185
Y_UNIT_TEST(SourceIdStorageTestClean) {
186186
TSourceIdStorage storage;
187187
for (ui64 i = 1; i <= 10000; ++i) {
188-
storage.RegisterSourceId(TestSourceId(i), i, 0, i, TInstant::Seconds(10 * i));
188+
storage.RegisterSourceId(TestSourceId(i), i, i, TInstant::Seconds(10 * i));
189189
}
190190

191191
NKikimrPQ::TPartitionConfig config;
@@ -226,7 +226,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
226226
Y_UNIT_TEST(SourceIdStorageDeleteByMaxCount) {
227227
TSourceIdStorage storage;
228228
for (ui64 i = 1; i <= 10000; ++i) {
229-
storage.RegisterSourceId(TestSourceId(i), i, 0, i, TInstant::Seconds(10 * i));
229+
storage.RegisterSourceId(TestSourceId(i), i, i, TInstant::Seconds(10 * i));
230230
}
231231

232232
NKikimrPQ::TPartitionConfig config;
@@ -258,7 +258,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
258258
Y_UNIT_TEST(SourceIdStorageComplexDelete) {
259259
TSourceIdStorage storage;
260260
for (ui64 i = 1; i <= 10000 + 1; ++i) { // add 10000 + one extra sources
261-
storage.RegisterSourceId(TestSourceId(i), i, 1, i , TInstant::Seconds(10 * i));
261+
storage.RegisterSourceId(TestSourceId(i), i, i, TInstant::Seconds(10 * i));
262262
}
263263

264264
NKikimrPQ::TPartitionConfig config;
@@ -297,7 +297,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
297297
const auto sourceId = TestSourceId(i);
298298
const auto owner = TestOwner(sourceId);
299299

300-
storage.RegisterSourceId(sourceId, i, 0, i, TInstant::Hours(i));
300+
storage.RegisterSourceId(sourceId, i, i, TInstant::Hours(i));
301301
storage.RegisterSourceIdOwner(sourceId, owner);
302302
owners[owner];
303303
}
@@ -324,7 +324,7 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
324324
}
325325

326326
inline static TSourceIdInfo MakeExplicitSourceIdInfo(ui64 offset, const TMaybe<THeartbeat>& heartbeat = Nothing()) {
327-
auto info = TSourceIdInfo(0, 0, offset, TInstant::Now());
327+
auto info = TSourceIdInfo(0, offset, TInstant::Now());
328328

329329
info.Explicit = true;
330330
if (heartbeat) {
@@ -460,33 +460,6 @@ Y_UNIT_TEST_SUITE(TSourceIdTests) {
460460
}
461461
}
462462

463-
Y_UNIT_TEST(SourceIdMinSeqNo) {
464-
TSourceIdStorage storage;
465-
466-
const auto sourceId = TestSourceId(1);
467-
const auto sourceIdInfo = TSourceIdInfo(1, 0, 10, TInstant::Seconds(100));
468-
const auto anotherSourceId = TestSourceId(2);
469-
const auto anotherSourceIdInfo = TSourceIdInfo(2, 1, 20, TInstant::Seconds(200));
470-
471-
storage.RegisterSourceId(sourceId, sourceIdInfo);
472-
storage.RegisterSourceId(sourceId, anotherSourceIdInfo);
473-
storage.RegisterSourceId(sourceId, sourceIdInfo.Updated(2, 2, 11, TInstant::Seconds(100)));
474-
{
475-
auto it = storage.GetInMemorySourceIds().find(sourceId);
476-
UNIT_ASSERT_VALUES_EQUAL(it->second.MinSeqNo, 2);
477-
}
478-
storage.RegisterSourceId(sourceId, sourceIdInfo.Updated(2, 1, 12, TInstant::Seconds(100)));
479-
{
480-
auto it = storage.GetInMemorySourceIds().find(sourceId);
481-
UNIT_ASSERT_VALUES_EQUAL(it->second.MinSeqNo, 1);
482-
}
483-
storage.RegisterSourceId(anotherSourceId, anotherSourceIdInfo.Updated(2, 0, 12, TInstant::Seconds(100)));
484-
{
485-
auto it = storage.GetInMemorySourceIds().find(anotherSourceId);
486-
UNIT_ASSERT_VALUES_EQUAL(it->second.MinSeqNo, 1);
487-
}
488-
}
489-
490463
} // TSourceIdTests
491464

492465
} // namespace NKikimr::NPQ

ydb/core/protos/pqconfig.proto

-1
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,6 @@ message TMessageGroupInfo {
377377
}
378378

379379
optional uint64 SeqNo = 1;
380-
optional uint64 MinSeqNo = 9;
381380
optional uint64 Offset = 2;
382381
optional uint64 WriteTimestamp = 3; // TInstant::TValue
383382
optional uint64 CreateTimestamp = 4; // TInstant::TValue

0 commit comments

Comments
 (0)