Skip to content

Commit a15df6a

Browse files
authored
Merge cf72985 into 9eea4e3
2 parents 9eea4e3 + cf72985 commit a15df6a

File tree

3 files changed

+35
-24
lines changed

3 files changed

+35
-24
lines changed

ydb/core/persqueue/partition.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
243243
void ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, const TActorContext& ctx);
244244
void ProcessChangeOwnerRequests(const TActorContext& ctx);
245245
void ProcessHasDataRequests(const TActorContext& ctx);
246+
bool ProcessHasDataRequest(const THasDataReq& request, const TActorContext& ctx);
246247
void ProcessRead(const TActorContext& ctx, TReadInfo&& info, const ui64 cookie, bool subscription);
247248
void ProcessReserveRequests(const TActorContext& ctx);
248249
void ProcessTimestampRead(const TActorContext& ctx);

ydb/core/persqueue/partition_read.cpp

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,32 @@ TAutoPtr<TEvPersQueue::TEvHasDataInfoResponse> TPartition::MakeHasDataInfoRespon
134134
return res;
135135
}
136136

137+
bool TPartition::ProcessHasDataRequest(const THasDataReq& request, const TActorContext& ctx) {
138+
auto sendResponse = [&](ui64 lagSize, bool readingFinished) {
139+
auto response = MakeHasDataInfoResponse(lagSize, request.Cookie, readingFinished);
140+
ctx.Send(request.Sender, response.Release());
141+
};
142+
143+
if (!IsActive()) {
144+
if (request.Offset < EndOffset && (!request.ReadTimestamp || *request.ReadTimestamp <= EndWriteTimestamp)) {
145+
sendResponse(GetSizeLag(request.Offset), false);
146+
} else {
147+
sendResponse(0, true);
148+
149+
auto now = ctx.Now();
150+
auto& userInfo = UsersInfoStorage->GetOrCreate(request.ClientId, ctx);
151+
userInfo.UpdateReadOffset((i64)EndOffset - 1, now, now, now, true);
152+
}
153+
} else if (request.Offset < EndOffset) {
154+
sendResponse(GetSizeLag(request.Offset), false);
155+
} else {
156+
return false;
157+
}
158+
159+
return true;
160+
}
161+
162+
137163
void TPartition::ProcessHasDataRequests(const TActorContext& ctx) {
138164
if (!InitDone) {
139165
return;
@@ -148,13 +174,7 @@ void TPartition::ProcessHasDataRequests(const TActorContext& ctx) {
148174
};
149175

150176
for (auto request = HasDataRequests.begin(); request != HasDataRequests.end();) {
151-
if (request->Offset < EndOffset && (IsActive() || !request->ReadTimestamp || *request->ReadTimestamp < EndWriteTimestamp)) {
152-
auto response = MakeHasDataInfoResponse(GetSizeLag(request->Offset), request->Cookie);
153-
ctx.Send(request->Sender, response.Release());
154-
} else if (!IsActive()) {
155-
auto response = MakeHasDataInfoResponse(0, request->Cookie, true);
156-
ctx.Send(request->Sender, response.Release());
157-
} else {
177+
if (!ProcessHasDataRequest(*request, ctx)) {
158178
break;
159179
}
160180

@@ -183,32 +203,22 @@ void TPartition::Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorCont
183203
auto& record = ev->Get()->Record;
184204
Y_ABORT_UNLESS(record.HasSender());
185205

206+
auto now = ctx.Now();
207+
186208
auto cookie = record.HasCookie() ? TMaybe<ui64>(record.GetCookie()) : TMaybe<ui64>();
187209
auto readTimestamp = GetReadFrom(record.GetMaxTimeLagMs(), record.GetReadTimestampMs(), TInstant::Zero(), ctx);
188-
189210
TActorId sender = ActorIdFromProto(record.GetSender());
190-
if (InitDone && EndOffset > (ui64)record.GetOffset() && (!readTimestamp || EndWriteTimestamp >= *readTimestamp)) { //already has data, answer right now
191-
auto response = MakeHasDataInfoResponse(GetSizeLag(record.GetOffset()), cookie);
192-
ctx.Send(sender, response.Release());
193-
} else if (InitDone && !IsActive()) {
194-
auto now = ctx.Now();
195211

196-
auto& userInfo = UsersInfoStorage->GetOrCreate(record.GetClientId(), ctx);
197-
userInfo.UpdateReadOffset((i64)EndOffset - 1, now, now, now, true);
212+
THasDataReq req{++HasDataReqNum, (ui64)record.GetOffset(), sender, cookie,
213+
record.HasClientId() && InitDone ? record.GetClientId() : "", readTimestamp};
198214

199-
auto response = MakeHasDataInfoResponse(0, cookie, true);
200-
ctx.Send(sender, response.Release());
201-
} else {
202-
THasDataReq req{++HasDataReqNum, (ui64)record.GetOffset(), sender, cookie,
203-
record.HasClientId() && InitDone ? record.GetClientId() : "", readTimestamp};
215+
if (!InitDone || !ProcessHasDataRequest(req, ctx)) {
204216
THasDataDeadline dl{TInstant::MilliSeconds(record.GetDeadline()), req};
205-
auto res = HasDataRequests.insert(req);
217+
auto res = HasDataRequests.insert(std::move(req));
206218
HasDataDeadlines.insert(dl);
207219
Y_ABORT_UNLESS(res.second);
208220

209221
if (InitDone && record.HasClientId() && !record.GetClientId().empty()) {
210-
auto now = ctx.Now();
211-
212222
auto& userInfo = UsersInfoStorage->GetOrCreate(record.GetClientId(), ctx);
213223
++userInfo.Subscriptions;
214224
userInfo.UpdateReadOffset((i64)EndOffset - 1, now, now, now);

ydb/core/persqueue/partition_util.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ struct TPartition::THasDataReq {
116116
TMaybe<TInstant> ReadTimestamp;
117117

118118
bool operator < (const THasDataReq& req) const {
119-
return Num < req.Num;
119+
return std::tuple(Offset, Num) < std::tuple(req.Offset, req.Num);
120120
}
121121
};
122122

0 commit comments

Comments
 (0)