Skip to content

Commit dd37eab

Browse files
authored
Merge 4f02750 into 312918a
2 parents 312918a + 4f02750 commit dd37eab

File tree

3 files changed

+32
-23
lines changed

3 files changed

+32
-23
lines changed

ydb/core/persqueue/partition.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
243243
void ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, const TActorContext& ctx);
244244
void ProcessChangeOwnerRequests(const TActorContext& ctx);
245245
void ProcessHasDataRequests(const TActorContext& ctx);
246+
bool ProcessHasDataRequest(const THasDataReq& request, const TActorContext& ctx);
246247
void ProcessRead(const TActorContext& ctx, TReadInfo&& info, const ui64 cookie, bool subscription);
247248
void ProcessReserveRequests(const TActorContext& ctx);
248249
void ProcessTimestampRead(const TActorContext& ctx);

ydb/core/persqueue/partition_read.cpp

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,30 @@ TAutoPtr<TEvPersQueue::TEvHasDataInfoResponse> TPartition::MakeHasDataInfoRespon
134134
return res;
135135
}
136136

137+
bool TPartition::ProcessHasDataRequest(const THasDataReq& request, const TActorContext& ctx) {
138+
auto sendResponse = [&](ui64 lagSize, bool readingFinished) {
139+
auto response = MakeHasDataInfoResponse(lagSize, request.Cookie, readingFinished);
140+
ctx.Send(request.Sender, response.Release());
141+
};
142+
143+
if (!IsActive()) {
144+
if (request.ReadTimestamp && *request.ReadTimestamp <= EndWriteTimestamp) {
145+
sendResponse(GetSizeLag(request.Offset), false);
146+
} else if (!request.ReadTimestamp && request.Offset < EndOffset) {
147+
sendResponse(GetSizeLag(request.Offset), false);
148+
} else {
149+
sendResponse(0, true);
150+
}
151+
} else if (request.Offset < EndOffset) {
152+
sendResponse(GetSizeLag(request.Offset), false);
153+
} else {
154+
return false;
155+
}
156+
157+
return true;
158+
}
159+
160+
137161
void TPartition::ProcessHasDataRequests(const TActorContext& ctx) {
138162
if (!InitDone) {
139163
return;
@@ -148,13 +172,7 @@ void TPartition::ProcessHasDataRequests(const TActorContext& ctx) {
148172
};
149173

150174
for (auto request = HasDataRequests.begin(); request != HasDataRequests.end();) {
151-
if (request->Offset < EndOffset && (IsActive() || !request->ReadTimestamp || *request->ReadTimestamp < EndWriteTimestamp)) {
152-
auto response = MakeHasDataInfoResponse(GetSizeLag(request->Offset), request->Cookie);
153-
ctx.Send(request->Sender, response.Release());
154-
} else if (!IsActive()) {
155-
auto response = MakeHasDataInfoResponse(0, request->Cookie, true);
156-
ctx.Send(request->Sender, response.Release());
157-
} else {
175+
if (!ProcessHasDataRequest(*request, ctx)) {
158176
break;
159177
}
160178

@@ -183,32 +201,22 @@ void TPartition::Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorCont
183201
auto& record = ev->Get()->Record;
184202
Y_ABORT_UNLESS(record.HasSender());
185203

204+
auto now = ctx.Now();
205+
186206
auto cookie = record.HasCookie() ? TMaybe<ui64>(record.GetCookie()) : TMaybe<ui64>();
187207
auto readTimestamp = GetReadFrom(record.GetMaxTimeLagMs(), record.GetReadTimestampMs(), TInstant::Zero(), ctx);
188-
189208
TActorId sender = ActorIdFromProto(record.GetSender());
190-
if (InitDone && EndOffset > (ui64)record.GetOffset() && (!readTimestamp || EndWriteTimestamp >= *readTimestamp)) { //already has data, answer right now
191-
auto response = MakeHasDataInfoResponse(GetSizeLag(record.GetOffset()), cookie);
192-
ctx.Send(sender, response.Release());
193-
} else if (InitDone && !IsActive()) {
194-
auto now = ctx.Now();
195209

196-
auto& userInfo = UsersInfoStorage->GetOrCreate(record.GetClientId(), ctx);
197-
userInfo.UpdateReadOffset((i64)EndOffset - 1, now, now, now, true);
210+
THasDataReq req{++HasDataReqNum, (ui64)record.GetOffset(), sender, cookie,
211+
record.HasClientId() && InitDone ? record.GetClientId() : "", readTimestamp};
198212

199-
auto response = MakeHasDataInfoResponse(0, cookie, true);
200-
ctx.Send(sender, response.Release());
201-
} else {
202-
THasDataReq req{++HasDataReqNum, (ui64)record.GetOffset(), sender, cookie,
203-
record.HasClientId() && InitDone ? record.GetClientId() : "", readTimestamp};
213+
if (InitDone && !ProcessHasDataRequest(req, ctx)) {
204214
THasDataDeadline dl{TInstant::MilliSeconds(record.GetDeadline()), req};
205215
auto res = HasDataRequests.insert(req);
206216
HasDataDeadlines.insert(dl);
207217
Y_ABORT_UNLESS(res.second);
208218

209219
if (InitDone && record.HasClientId() && !record.GetClientId().empty()) {
210-
auto now = ctx.Now();
211-
212220
auto& userInfo = UsersInfoStorage->GetOrCreate(record.GetClientId(), ctx);
213221
++userInfo.Subscriptions;
214222
userInfo.UpdateReadOffset((i64)EndOffset - 1, now, now, now);

ydb/core/persqueue/partition_util.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ struct TPartition::THasDataReq {
116116
TMaybe<TInstant> ReadTimestamp;
117117

118118
bool operator < (const THasDataReq& req) const {
119-
return Num < req.Num;
119+
return std::tuple(Offset, Num) < std::tuple(req.Offset, req.Num);
120120
}
121121
};
122122

0 commit comments

Comments
 (0)