Skip to content

Commit a1a979d

Browse files
committed
Fixed possible stop of reading from the topic partition (ydb-platform#15288)
1 parent 427bec5 commit a1a979d

File tree

3 files changed

+35
-24
lines changed

3 files changed

+35
-24
lines changed

ydb/core/persqueue/partition.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
240240
void ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, const TActorContext& ctx);
241241
void ProcessChangeOwnerRequests(const TActorContext& ctx);
242242
void ProcessHasDataRequests(const TActorContext& ctx);
243+
bool ProcessHasDataRequest(const THasDataReq& request, const TActorContext& ctx);
243244
void ProcessRead(const TActorContext& ctx, TReadInfo&& info, const ui64 cookie, bool subscription);
244245
void ProcessReserveRequests(const TActorContext& ctx);
245246
void ProcessTimestampRead(const TActorContext& ctx);

ydb/core/persqueue/partition_read.cpp

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,32 @@ TAutoPtr<TEvPersQueue::TEvHasDataInfoResponse> TPartition::MakeHasDataInfoRespon
135135
return res;
136136
}
137137

138+
bool TPartition::ProcessHasDataRequest(const THasDataReq& request, const TActorContext& ctx) {
139+
auto sendResponse = [&](ui64 lagSize, bool readingFinished) {
140+
auto response = MakeHasDataInfoResponse(lagSize, request.Cookie, readingFinished);
141+
ctx.Send(request.Sender, response.Release());
142+
};
143+
144+
if (!IsActive()) {
145+
if (request.Offset < EndOffset && (!request.ReadTimestamp || *request.ReadTimestamp <= EndWriteTimestamp)) {
146+
sendResponse(GetSizeLag(request.Offset), false);
147+
} else {
148+
sendResponse(0, true);
149+
150+
auto now = ctx.Now();
151+
auto& userInfo = UsersInfoStorage->GetOrCreate(request.ClientId, ctx);
152+
userInfo.UpdateReadOffset((i64)EndOffset - 1, now, now, now, true);
153+
}
154+
} else if (request.Offset < EndOffset) {
155+
sendResponse(GetSizeLag(request.Offset), false);
156+
} else {
157+
return false;
158+
}
159+
160+
return true;
161+
}
162+
163+
138164
void TPartition::ProcessHasDataRequests(const TActorContext& ctx) {
139165
if (!InitDone) {
140166
return;
@@ -150,13 +176,7 @@ void TPartition::ProcessHasDataRequests(const TActorContext& ctx) {
150176
};
151177

152178
for (auto request = HasDataRequests.begin(); request != HasDataRequests.end();) {
153-
if (request->Offset < EndOffset && (IsActive() || !request->ReadTimestamp || *request->ReadTimestamp < EndWriteTimestamp)) {
154-
auto response = MakeHasDataInfoResponse(GetSizeLag(request->Offset), request->Cookie);
155-
ctx.Send(request->Sender, response.Release());
156-
} else if (!IsActive()) {
157-
auto response = MakeHasDataInfoResponse(0, request->Cookie, true);
158-
ctx.Send(request->Sender, response.Release());
159-
} else {
179+
if (!ProcessHasDataRequest(*request, ctx)) {
160180
break;
161181
}
162182

@@ -185,33 +205,23 @@ void TPartition::Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorCont
185205
auto& record = ev->Get()->Record;
186206
Y_ABORT_UNLESS(record.HasSender());
187207

188-
auto cookie = record.HasCookie() ? TMaybe<ui64>(record.GetCookie()) : TMaybe<ui64>();
208+
auto now = ctx.Now();
189209

210+
auto cookie = record.HasCookie() ? TMaybe<ui64>(record.GetCookie()) : TMaybe<ui64>();
190211
auto readTimestamp = GetReadFrom(record.GetMaxTimeLagMs(), record.GetReadTimestampMs(), TInstant::Zero() /* TODO */, ctx);
191212

192213
TActorId sender = ActorIdFromProto(record.GetSender());
193-
if (InitDone && EndOffset > (ui64)record.GetOffset() && (!readTimestamp || EndWriteTimestamp >= *readTimestamp)) { //already has data, answer right now
194-
auto response = MakeHasDataInfoResponse(GetSizeLag(record.GetOffset()), cookie);
195-
ctx.Send(sender, response.Release());
196-
} else if (InitDone && !IsActive()) {
197-
auto now = ctx.Now();
198214

199-
auto& userInfo = UsersInfoStorage->GetOrCreate(record.GetClientId(), ctx);
200-
userInfo.UpdateReadOffset((i64)EndOffset - 1, now, now, now, true);
215+
THasDataReq req{++HasDataReqNum, (ui64)record.GetOffset(), sender, cookie,
216+
record.HasClientId() && InitDone ? record.GetClientId() : "", readTimestamp};
201217

202-
auto response = MakeHasDataInfoResponse(0, cookie, true);
203-
ctx.Send(sender, response.Release());
204-
} else {
205-
THasDataReq req{++HasDataReqNum, (ui64)record.GetOffset(), sender, cookie,
206-
record.HasClientId() && InitDone ? record.GetClientId() : "", readTimestamp};
218+
if (!InitDone || !ProcessHasDataRequest(req, ctx)) {
207219
THasDataDeadline dl{TInstant::MilliSeconds(record.GetDeadline()), req};
208-
auto res = HasDataRequests.insert(req);
220+
auto res = HasDataRequests.insert(std::move(req));
209221
HasDataDeadlines.insert(dl);
210222
Y_ABORT_UNLESS(res.second);
211223

212224
if (InitDone && record.HasClientId() && !record.GetClientId().empty()) {
213-
auto now = ctx.Now();
214-
215225
auto& userInfo = UsersInfoStorage->GetOrCreate(record.GetClientId(), ctx);
216226
++userInfo.Subscriptions;
217227
userInfo.UpdateReadOffset((i64)EndOffset - 1, now, now, now);

ydb/core/persqueue/partition_util.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ struct TPartition::THasDataReq {
116116
TMaybe<TInstant> ReadTimestamp;
117117

118118
bool operator < (const THasDataReq& req) const {
119-
return Num < req.Num;
119+
return std::tuple(Offset, Num) < std::tuple(req.Offset, req.Num);
120120
}
121121
};
122122

0 commit comments

Comments
 (0)