Skip to content

Commit 83cc4da

Browse files
authored
Support read from timestamp for topics autopartitioning (#12125)
1 parent 466cfa4 commit 83cc4da

21 files changed

+349
-69
lines changed

ydb/core/persqueue/blob.cpp

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,10 @@ void TBatch::Pack() {
399399
Header.SetPayloadSize(PackedData.size());
400400
}
401401

402+
for (auto& b : Blobs) {
403+
EndWriteTimestamp = std::max(EndWriteTimestamp, b.WriteTimestamp);
404+
}
405+
402406

403407
TVector<TClientBlob> tmp;
404408
Blobs.swap(tmp);
@@ -414,11 +418,14 @@ void TBatch::Unpack() {
414418
UnpackTo(&Blobs);
415419
Y_ABORT_UNLESS(InternalPartsPos.empty());
416420
for (ui32 i = 0; i < Blobs.size(); ++i) {
417-
if (!Blobs[i].IsLastPart())
421+
auto& b = Blobs[i];
422+
if (!b.IsLastPart()) {
418423
InternalPartsPos.push_back(i);
424+
}
425+
EndWriteTimestamp = std::max(EndWriteTimestamp, b.WriteTimestamp);
419426
}
420427
Y_ABORT_UNLESS(InternalPartsPos.size() == GetInternalPartsCount());
421-
428+
422429
PackedData.Clear();
423430
}
424431

@@ -978,4 +985,3 @@ bool TPartitionedBlob::IsNextPart(const TString& sourceId, const ui64 seqNo, con
978985

979986
}// NPQ
980987
}// NKikimr
981-

ydb/core/persqueue/blob.h

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ struct TClientBlob {
101101
void SerializeTo(TBuffer& buffer) const;
102102
static TClientBlob Deserialize(const char *data, ui32 size);
103103

104-
static void CheckBlob(const TKey& key, const TString& blob);
104+
static void CheckBlob(const TKey& key, const TString& blob);
105105
};
106106

107107
static constexpr const ui32 MAX_BLOB_SIZE = 8_MB;
@@ -121,6 +121,7 @@ struct TBatch {
121121
TVector<ui32> InternalPartsPos;
122122
NKikimrPQ::TBatchHeader Header;
123123
TBuffer PackedData;
124+
TInstant EndWriteTimestamp;
124125

125126
TBatch()
126127
: Packed(false)
@@ -162,27 +163,42 @@ struct TBatch {
162163
Header.SetUnpackedSize(unpackedSize);
163164
Header.SetCount(count);
164165
Header.SetInternalPartsCount(InternalPartsPos.size());
166+
167+
EndWriteTimestamp = std::max(EndWriteTimestamp, b.WriteTimestamp);
165168
}
166169

167170
ui64 GetOffset() const {
168171
return Header.GetOffset();
169172
}
173+
170174
ui16 GetPartNo() const {
171175
return Header.GetPartNo();
172176
}
177+
173178
ui32 GetUnpackedSize() const {
174179
return Header.GetUnpackedSize();
175180
}
181+
176182
ui32 GetCount() const {
177183
return Header.GetCount();
178184
}
185+
179186
ui16 GetInternalPartsCount() const {
180187
return Header.GetInternalPartsCount();
181188
}
189+
182190
bool IsGreaterThan(ui64 offset, ui16 partNo) const {
183191
return GetOffset() > offset || GetOffset() == offset && GetPartNo() > partNo;
184192
}
185193

194+
bool Empty() const {
195+
return Blobs.empty();
196+
}
197+
198+
TInstant GetEndWriteTimestamp() const {
199+
return EndWriteTimestamp;
200+
}
201+
186202
TBatch(const NKikimrPQ::TBatchHeader &header, const char* data)
187203
: Packed(true)
188204
, Header(header)
@@ -239,7 +255,7 @@ struct THead {
239255
private:
240256
std::deque<TBatch> Batches;
241257
ui16 InternalPartsCount = 0;
242-
258+
243259
friend class TPartitionedBlob;
244260

245261
class TBatchAccessor {

ydb/core/persqueue/partition.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,14 @@ ui64 TPartition::ImportantClientsMinOffset() const {
313313
return minOffset;
314314
}
315315

316+
TInstant TPartition::GetEndWriteTimestamp() const {
317+
return EndWriteTimestamp;
318+
}
319+
320+
THead& TPartition::GetHead() {
321+
return Head;
322+
}
323+
316324
void TPartition::HandleWakeup(const TActorContext& ctx) {
317325
FilterDeadlinedWrites(ctx);
318326

@@ -356,6 +364,7 @@ void TPartition::AddMetaKey(TEvKeyValue::TEvRequest* request) {
356364
meta.SetStartOffset(StartOffset);
357365
meta.SetEndOffset(Max(NewHead.GetNextOffset(), EndOffset));
358366
meta.SetSubDomainOutOfSpace(SubDomainOutOfSpace);
367+
meta.SetEndWriteTimestamp(PendingWriteTimestamp.MilliSeconds());
359368

360369
if (IsSupportive()) {
361370
auto* counterData = meta.MutableCounterData();

ydb/core/persqueue/partition.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
116116
friend TInitInfoRangeStep;
117117
friend TInitDataRangeStep;
118118
friend TInitDataStep;
119+
friend TInitEndWriteTimestampStep;
119120

120121
friend TPartitionSourceManager;
121122

@@ -440,6 +441,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
440441
void HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx);
441442
void Handle(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx);
442443

444+
ui64 GetReadOffset(ui64 offset, TMaybe<TInstant> readTimestamp) const;
443445

444446
public:
445447
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
@@ -470,6 +472,8 @@ class TPartition : public TActorBootstrapped<TPartition> {
470472
// Minimal offset, the data from which cannot be deleted, because it is required by an important consumer
471473
ui64 ImportantClientsMinOffset() const;
472474

475+
TInstant GetEndWriteTimestamp() const; // For tests only
476+
THead& GetHead(); // For tests only
473477

474478
//Bootstrap sends kvRead
475479
//Become StateInit
@@ -666,6 +670,8 @@ class TPartition : public TActorBootstrapped<TPartition> {
666670
// [DataKeysBody ][DataKeysHead ]
667671
ui64 StartOffset;
668672
ui64 EndOffset;
673+
TInstant EndWriteTimestamp;
674+
TInstant PendingWriteTimestamp;
669675

670676
ui64 WriteInflightSize;
671677
TActorId Tablet;

ydb/core/persqueue/partition_init.cpp

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ TInitializer::TInitializer(TPartition* partition)
2828
Steps.push_back(MakeHolder<TInitInfoRangeStep>(this));
2929
Steps.push_back(MakeHolder<TInitDataRangeStep>(this));
3030
Steps.push_back(MakeHolder<TInitDataStep>(this));
31+
Steps.push_back(MakeHolder<TInitEndWriteTimestampStep>(this));
3132

3233
CurrentStep = Steps.begin();
3334
}
@@ -311,14 +312,14 @@ void TInitMetaStep::LoadMeta(const NKikimrClient::TResponse& kvResponse, const T
311312
bool res = meta.ParseFromString(response.GetValue());
312313
Y_ABORT_UNLESS(res);
313314

314-
/* Bring back later, when switch to 21-2 will be unable
315-
StartOffset = meta.GetStartOffset();
316-
EndOffset = meta.GetEndOffset();
317-
if (StartOffset == EndOffset) {
318-
NewHead.Offset = Head.Offset = EndOffset;
319-
}
320-
*/
315+
Partition()->StartOffset = meta.GetStartOffset();
316+
Partition()->EndOffset = meta.GetEndOffset();
317+
if (Partition()->StartOffset == Partition()->EndOffset) {
318+
Partition()->NewHead.Offset = Partition()->Head.Offset = Partition()->EndOffset;
319+
}
321320
Partition()->SubDomainOutOfSpace = meta.GetSubDomainOutOfSpace();
321+
Partition()->EndWriteTimestamp = TInstant::MilliSeconds(meta.GetEndWriteTimestamp());
322+
Partition()->PendingWriteTimestamp = Partition()->EndWriteTimestamp;
322323
if (Partition()->IsSupportive()) {
323324
const auto& counterData = meta.GetCounterData();
324325
Partition()->BytesWrittenGrpc.SetSavedValue(counterData.GetBytesWrittenGrpc());
@@ -503,7 +504,7 @@ void TInitDataRangeStep::FillBlobsMetaData(const NKikimrClient::TKeyValueRespons
503504
if (k.GetPartNo() > 0) ++startOffset;
504505
head.PartNo = 0;
505506
} else {
506-
Y_ABORT_UNLESS(endOffset <= k.GetOffset(), "%s", pair.GetKey().c_str());
507+
Y_ABORT_UNLESS(endOffset <= k.GetOffset(), "%" PRIu64 " <= %" PRIu64 " %s", endOffset, k.GetOffset(), pair.GetKey().c_str());
507508
if (endOffset < k.GetOffset()) {
508509
gapOffsets.push_back(std::make_pair(endOffset, k.GetOffset()));
509510
gapSize += k.GetOffset() - endOffset;
@@ -631,7 +632,7 @@ void TInitDataStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActorConte
631632

632633
Y_ABORT_UNLESS(offset + 1 >= Partition()->StartOffset);
633634
Y_ABORT_UNLESS(offset < Partition()->EndOffset);
634-
Y_ABORT_UNLESS(size == read.GetValue().size());
635+
Y_ABORT_UNLESS(size == read.GetValue().size(), "size=%" PRIu32 " == read.GetValue().size() =%" PRIu64, size, read.GetValue().size());
635636

636637
for (TBlobIterator it(key, read.GetValue()); it.IsValid(); it.Next()) {
637638
head.AddBatch(it.GetBatch());
@@ -667,6 +668,43 @@ void TInitDataStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActorConte
667668
}
668669

669670

671+
//
672+
// TInitEndWriteTimestampStep
673+
//
674+
675+
TInitEndWriteTimestampStep::TInitEndWriteTimestampStep(TInitializer* initializer)
676+
: TInitializerStep(initializer, "TInitEndWriteTimestampStep", true) {
677+
}
678+
679+
void TInitEndWriteTimestampStep::Execute(const TActorContext &ctx) {
680+
if (Partition()->EndWriteTimestamp != TInstant::Zero() || (Partition()->HeadKeys.empty() && Partition()->DataKeysBody.empty())) {
681+
LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE,
682+
"Initializing EndWriteTimestamp of the topic '" << Partition()->TopicName()
683+
<< "' partition " << Partition()->Partition
684+
<< " skiped because already initialized.");
685+
return Done(ctx);
686+
}
687+
688+
TDataKey* lastKey = nullptr;
689+
if (!Partition()->HeadKeys.empty()) {
690+
lastKey = &Partition()->HeadKeys.back();
691+
} else if (!Partition()->DataKeysBody.empty()) {
692+
lastKey = &Partition()->DataKeysBody.back();
693+
}
694+
695+
if (lastKey) {
696+
Partition()->EndWriteTimestamp = lastKey->Timestamp;
697+
Partition()->PendingWriteTimestamp = Partition()->EndWriteTimestamp;
698+
}
699+
700+
LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE,
701+
"Initializing EndWriteTimestamp of the topic '" << Partition()->TopicName()
702+
<< "' partition " << Partition()->Partition
703+
<< " from keys completed. Value " << Partition()->EndWriteTimestamp);
704+
705+
return Done(ctx);
706+
}
707+
670708
//
671709
// TPartition
672710
//

ydb/core/persqueue/partition_init.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,4 +152,11 @@ class TInitDataStep: public TBaseKVStep {
152152
void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx) override;
153153
};
154154

155+
class TInitEndWriteTimestampStep: public TInitializerStep {
156+
public:
157+
TInitEndWriteTimestampStep(TInitializer* initializer);
158+
159+
void Execute(const TActorContext& ctx) override;
160+
};
161+
155162
} // NKikimr::NPQ

ydb/core/persqueue/partition_monitoring.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo
5959
PROPERTY("StartOffset", StartOffset);
6060
PROPERTY("EndOffset", EndOffset);
6161
PROPERTY("LastOffset", Head.GetNextOffset());
62+
PROPERTY("Last message WriteTimestamp", EndWriteTimestamp.ToRfc822String());
6263
PROPERTY("HeadOffset", Head.Offset << ", count: " << Head.GetCount());
6364
}
6465
}

ydb/core/persqueue/partition_read.cpp

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,24 @@ namespace NKikimr::NPQ {
2929

3030
static const ui32 MAX_USER_ACTS = 1000;
3131

32+
TMaybe<TInstant> GetReadFrom(ui32 maxTimeLagMs, ui64 readTimestampMs, TInstant consumerReadFromTimestamp, const TActorContext& ctx) {
33+
if (!(maxTimeLagMs > 0 || readTimestampMs > 0 || consumerReadFromTimestamp > TInstant::MilliSeconds(1))) {
34+
return {};
35+
}
36+
37+
TInstant timestamp = maxTimeLagMs > 0 ? ctx.Now() - TDuration::MilliSeconds(maxTimeLagMs) : TInstant::Zero();
38+
timestamp = Max(timestamp, TInstant::MilliSeconds(readTimestampMs));
39+
timestamp = Max(timestamp, consumerReadFromTimestamp);
40+
return timestamp;
41+
}
42+
43+
ui64 TPartition::GetReadOffset(ui64 offset, TMaybe<TInstant> readTimestamp) const {
44+
if (!readTimestamp) {
45+
return offset;
46+
}
47+
return Max(GetOffsetEstimate(DataKeysBody, *readTimestamp, Min(Head.Offset, EndOffset - 1)), offset);
48+
}
49+
3250
void TPartition::SendReadingFinished(const TString& consumer) {
3351
Send(Tablet, new TEvPQ::TEvReadingPartitionStatusRequest(consumer, Partition.OriginalPartitionId, TabletGeneration, ++PQRBCookie));
3452
}
@@ -132,7 +150,7 @@ void TPartition::ProcessHasDataRequests(const TActorContext& ctx) {
132150
};
133151

134152
for (auto request = HasDataRequests.begin(); request != HasDataRequests.end();) {
135-
if (request->Offset < EndOffset) {
153+
if (request->Offset < EndOffset && (IsActive() || !request->ReadTimestamp || *request->ReadTimestamp < EndWriteTimestamp)) {
136154
auto response = MakeHasDataInfoResponse(GetSizeLag(request->Offset), request->Cookie);
137155
ctx.Send(request->Sender, response.Release());
138156
} else if (!IsActive()) {
@@ -169,16 +187,18 @@ void TPartition::Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorCont
169187

170188
auto cookie = record.HasCookie() ? TMaybe<ui64>(record.GetCookie()) : TMaybe<ui64>();
171189

190+
auto readTimestamp = GetReadFrom(record.GetMaxTimeLagMs(), record.GetReadTimestampMs(), TInstant::Zero() /* TODO */, ctx);
191+
172192
TActorId sender = ActorIdFromProto(record.GetSender());
173-
if (InitDone && EndOffset > (ui64)record.GetOffset()) { //already has data, answer right now
193+
if (InitDone && EndOffset > (ui64)record.GetOffset() && (!readTimestamp || EndWriteTimestamp >= *readTimestamp)) { //already has data, answer right now
174194
auto response = MakeHasDataInfoResponse(GetSizeLag(record.GetOffset()), cookie);
175195
ctx.Send(sender, response.Release());
176196
} else if (InitDone && !IsActive()) {
177197
auto response = MakeHasDataInfoResponse(0, cookie, true);
178198
ctx.Send(sender, response.Release());
179199
} else {
180200
THasDataReq req{++HasDataReqNum, (ui64)record.GetOffset(), sender, cookie,
181-
record.HasClientId() && InitDone ? record.GetClientId() : ""};
201+
record.HasClientId() && InitDone ? record.GetClientId() : "", readTimestamp};
182202
THasDataDeadline dl{TInstant::MilliSeconds(record.GetDeadline()), req};
183203
auto res = HasDataRequests.insert(req);
184204
HasDataDeadlines.insert(dl);
@@ -756,11 +776,10 @@ void TPartition::DoRead(TEvPQ::TEvRead::TPtr&& readEvent, TDuration waitQuotaTim
756776
}
757777
userInfo->ReadsInQuotaQueue--;
758778
ui64 offset = read->Offset;
759-
if (read->PartNo == 0 && (read->MaxTimeLagMs > 0 || read->ReadTimestampMs > 0 || userInfo->ReadFromTimestamp > TInstant::MilliSeconds(1))) {
760-
TInstant timestamp = read->MaxTimeLagMs > 0 ? ctx.Now() - TDuration::MilliSeconds(read->MaxTimeLagMs) : TInstant::Zero();
761-
timestamp = Max(timestamp, TInstant::MilliSeconds(read->ReadTimestampMs));
762-
timestamp = Max(timestamp, userInfo->ReadFromTimestamp);
763-
offset = Max(GetOffsetEstimate(DataKeysBody, timestamp, Min(Head.Offset, EndOffset - 1)), offset);
779+
780+
auto readTimestamp = GetReadFrom(read->MaxTimeLagMs, read->ReadTimestampMs, userInfo->ReadFromTimestamp, ctx);
781+
if (read->PartNo == 0 && readTimestamp) {
782+
offset = GetReadOffset(offset, readTimestamp);
764783
userInfo->ReadOffsetRewindSum += offset - read->Offset;
765784
}
766785

ydb/core/persqueue/partition_util.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ struct TPartition::THasDataReq {
113113
TActorId Sender;
114114
TMaybe<ui64> Cookie;
115115
TString ClientId;
116+
TMaybe<TInstant> ReadTimestamp;
116117

117118
bool operator < (const THasDataReq& req) const {
118119
return Num < req.Num;

ydb/core/persqueue/partition_write.cpp

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -382,8 +382,9 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
382382
void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) {
383383
PQ_LOG_T("TPartition::SyncMemoryStateWithKVState.");
384384

385-
if (!CompactedKeys.empty())
385+
if (!CompactedKeys.empty()) {
386386
HeadKeys.clear();
387+
}
387388

388389
if (NewHeadKey.Size > 0) {
389390
while (!HeadKeys.empty() &&
@@ -438,6 +439,8 @@ void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) {
438439
}
439440

440441
EndOffset = Head.GetNextOffset();
442+
EndWriteTimestamp = PendingWriteTimestamp;
443+
441444
NewHead.Clear();
442445
NewHead.Offset = EndOffset;
443446

@@ -1272,6 +1275,11 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey
12721275
++WriteNewMessagesInternal;
12731276
}
12741277

1278+
// Empty partition may will be filling from offset great than zero from mirror actor if source partition old and was clean by retantion time
1279+
if (!Head.GetCount() && !NewHead.GetCount() && DataKeysBody.empty() && HeadKeys.empty() && p.Offset) {
1280+
StartOffset = *p.Offset;
1281+
}
1282+
12751283
TMaybe<TPartData> partData;
12761284
if (p.Msg.TotalParts > 1) { //this is multi-part message
12771285
partData = TPartData(p.Msg.PartNo, p.Msg.TotalParts, p.Msg.TotalSize);
@@ -1420,12 +1428,15 @@ void TPartition::AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvReq
14201428
Y_ABORT_UNLESS(Head.GetBatch(pp).GetPartNo() == key.GetPartNo());
14211429
for (; pp < Head.GetBatches().size(); ++pp) { //TODO - merge small batches here
14221430
Y_ABORT_UNLESS(Head.GetBatch(pp).Packed);
1423-
Head.GetBatch(pp).SerializeTo(valueD);
1431+
auto& batch = Head.GetBatch(pp);
1432+
batch.SerializeTo(valueD);
1433+
PendingWriteTimestamp = std::max(PendingWriteTimestamp, batch.GetEndWriteTimestamp());
14241434
}
14251435
}
14261436
for (auto& b : NewHead.GetBatches()) {
14271437
Y_ABORT_UNLESS(b.Packed);
14281438
b.SerializeTo(valueD);
1439+
PendingWriteTimestamp = std::max(PendingWriteTimestamp, b.GetEndWriteTimestamp());
14291440
}
14301441

14311442
Y_ABORT_UNLESS(res.second >= valueD.size());

0 commit comments

Comments
 (0)