Skip to content

Commit 4c13a9e

Browse files
authored
YQ-2862 Add more logs to pq_dq_read_actor (#2335)
1 parent 1c690cb commit 4c13a9e

File tree

2 files changed

+19
-4
lines changed

2 files changed

+19
-4
lines changed

ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
382382
NYdb::NPersQueue::TTopicReadSettings topicReadSettings;
383383
topicReadSettings.Path(SourceParams.GetTopicPath());
384384
auto partitionsToRead = GetPartitionsToRead();
385-
SRC_LOG_D("PartitionsToRead: " << JoinSeq(", ", partitionsToRead));
385+
SRC_LOG_D("RangesMode: " << RangesMode << ", PartitionsToRead: " << JoinSeq(", ", partitionsToRead));
386386
for (const auto partitionId : partitionsToRead) {
387387
topicReadSettings.AppendPartitionGroupIds(partitionId);
388388
}
@@ -495,6 +495,8 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
495495
curBatch.Data.emplace_back(std::move(item));
496496
curBatch.UsedSpace += size;
497497

498+
CheckAndUpdateOffset(message.GetPartitionStream(), message.GetOffset());
499+
498500
auto& offsets = curBatch.OffsetRanges[message.GetPartitionStream()];
499501
if (!offsets.empty() && offsets.back().second == message.GetOffset()) {
500502
offsets.back().second = message.GetOffset() + 1;
@@ -577,6 +579,18 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
577579
return std::make_pair(item, usedSpace);
578580
}
579581

582+
void CheckAndUpdateOffset(const NYdb::NPersQueue::TPartitionStream::TPtr& partitionStreamPtr, ui64 offset) {
583+
auto offsetIt = Self.LastOffsetByPartitionStream.find(partitionStreamPtr);
584+
if (offsetIt == Self.LastOffsetByPartitionStream.end()) {
585+
Self.LastOffsetByPartitionStream[partitionStreamPtr] = offset;
586+
return;
587+
}
588+
ui64 lastOffset = offsetIt->second;
589+
if (offset <= lastOffset) {
590+
ythrow yexception() << "Invalid message offset " << offset << ", last offset " << lastOffset;
591+
}
592+
}
593+
580594
TDqPqReadActor& Self;
581595
ui32 BatchCapacity;
582596
const TString& LogPrefix;
@@ -609,6 +623,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
609623
std::queue<TReadyBatch> ReadyBuffer;
610624
TMaybe<TDqSourceWatermarkTracker<TPartitionKey>> WatermarkTracker;
611625
TMaybe<TInstant> NextIdlenesCheckAt;
626+
THashMap<NYdb::NPersQueue::TPartitionStream::TPtr, ui64> LastOffsetByPartitionStream;
612627
};
613628

614629
std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(

ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ void TPartitionStreamImpl<UseMigrationProtocol>::Commit(ui64 startOffset, ui64 e
7777
Commits.EraseInterval(0, endOffset); // Drop only committed ranges;
7878
}
7979
for (auto range: toCommit) {
80-
sessionShared->Commit(this, range.first, range.second);
80+
sessionShared->Commit(this, range.first, Min(range.second, endOffset));
8181
}
8282
}
8383
}
@@ -1121,7 +1121,7 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl(
11211121
TDeferredActions<true>& deferred) {
11221122
Y_ABORT_UNLESS(Lock.IsLocked());
11231123

1124-
LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Committed response: " << msg);
1124+
LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Committed response: " << msg);
11251125

11261126
TMap<ui64, TIntrusivePtr<TPartitionStreamImpl<true>>> partitionStreams;
11271127
for (const Ydb::PersQueue::V1::CommitCookie& cookieProto : msg.cookies()) {
@@ -1377,7 +1377,7 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
13771377
TDeferredActions<false>& deferred) {
13781378
Y_ABORT_UNLESS(Lock.IsLocked());
13791379

1380-
LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Committed response: " << msg);
1380+
LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Committed response: " << msg);
13811381

13821382
for (const auto& rangeProto : msg.partitions_committed_offsets()) {
13831383
auto partitionStreamIt = PartitionStreams.find(rangeProto.partition_session_id());

0 commit comments

Comments
 (0)