30
30
#include < util/generic/algorithm.h>
31
31
#include < util/generic/hash.h>
32
32
#include < util/generic/utility.h>
33
+ #include < util/string/join.h>
33
34
34
35
#include < queue>
35
36
#include < variant>
@@ -91,6 +92,7 @@ struct TEvPrivate {
91
92
class TDqPqReadActor : public NActors ::TActor<TDqPqReadActor>, public IDqComputeActorAsyncInput {
92
93
public:
93
94
using TPartitionKey = std::pair<TString, ui64>; // Cluster, partition id.
95
+ using TDebugOffsets = TMaybe<std::pair<ui64, ui64>>;
94
96
95
97
TDqPqReadActor (
96
98
ui64 inputIndex,
@@ -170,8 +172,9 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
170
172
data->SetVersion (StateVersion);
171
173
data->SetBlob (stateBlob);
172
174
173
- DeferredCommits.emplace (checkpoint.GetId (), std::move (CurrentDeferredCommit));
175
+ DeferredCommits.emplace (checkpoint.GetId (), std::make_pair ( std:: move (CurrentDeferredCommit), CurrentDeferredCommitOffset ));
174
176
CurrentDeferredCommit = NYdb::NPersQueue::TDeferredCommit ();
177
+ CurrentDeferredCommitOffset.Clear ();
175
178
}
176
179
177
180
void LoadState (const NDqProto::TSourceState& state) override {
@@ -198,6 +201,9 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
198
201
ythrow yexception () << " Invalid state version " << data.GetVersion ();
199
202
}
200
203
}
204
+ for (const auto & [key, value] : PartitionToOffset) {
205
+ SRC_LOG_D (" Restoring offset: cluster " << key.first << " , partition id " << key.second << " , offset: " << value);
206
+ }
201
207
StartingMessageTimestamp = minStartingMessageTs;
202
208
IngressStats.Bytes += ingressBytes;
203
209
IngressStats.Chunks ++;
@@ -212,7 +218,14 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
212
218
void CommitState (const NDqProto::TCheckpoint& checkpoint) override {
213
219
const auto checkpointId = checkpoint.GetId ();
214
220
while (!DeferredCommits.empty () && DeferredCommits.front ().first <= checkpointId) {
215
- DeferredCommits.front ().second .Commit ();
221
+ auto & valuePair = DeferredCommits.front ().second ;
222
+ const auto & offsets = valuePair.second ;
223
+ if (offsets.Empty ()) {
224
+ SRC_LOG_D (" Commit offset: [ empty ]" );
225
+ } else {
226
+ SRC_LOG_D (" Commit offset: [" << offsets->first << " , " << offsets->second << " ]" );
227
+ }
228
+ valuePair.first .Commit ();
216
229
DeferredCommits.pop ();
217
230
}
218
231
}
@@ -364,7 +377,9 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
364
377
NYdb::NPersQueue::TReadSessionSettings GetReadSessionSettings () const {
365
378
NYdb::NPersQueue::TTopicReadSettings topicReadSettings;
366
379
topicReadSettings.Path (SourceParams.GetTopicPath ());
367
- for (const auto partitionId : GetPartitionsToRead ()) {
380
+ auto partitionsToRead = GetPartitionsToRead ();
381
+ SRC_LOG_D (" PartitionsToRead: " << JoinSeq (" , " , partitionsToRead));
382
+ for (const auto partitionId : partitionsToRead) {
368
383
topicReadSettings.AppendPartitionGroupIds (partitionId);
369
384
}
370
385
@@ -419,6 +434,12 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
419
434
for (const auto & [partitionStream, ranges] : readyBatch.OffsetRanges ) {
420
435
for (const auto & [start, end] : ranges) {
421
436
CurrentDeferredCommit.Add (partitionStream, start, end);
437
+ if (!CurrentDeferredCommitOffset) {
438
+ CurrentDeferredCommitOffset = std::make_pair (start, end);
439
+ } else {
440
+ CurrentDeferredCommitOffset->first = std::min (CurrentDeferredCommitOffset->first , start);
441
+ CurrentDeferredCommitOffset->second = std::max (CurrentDeferredCommitOffset->second , end);
442
+ }
422
443
}
423
444
PartitionToOffset[MakePartitionKey (partitionStream)] = ranges.back ().second ;
424
445
}
@@ -574,8 +595,9 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
574
595
THashMap<TPartitionKey, ui64> PartitionToOffset; // {cluster, partition} -> offset of next event.
575
596
TInstant StartingMessageTimestamp;
576
597
const NActors::TActorId ComputeActorId;
577
- std::queue<std::pair<ui64, NYdb::NPersQueue::TDeferredCommit>> DeferredCommits;
598
+ std::queue<std::pair<ui64, std::pair< NYdb::NPersQueue::TDeferredCommit, TDebugOffsets> >> DeferredCommits;
578
599
NYdb::NPersQueue::TDeferredCommit CurrentDeferredCommit;
600
+ TDebugOffsets CurrentDeferredCommitOffset;
579
601
bool SubscribedOnEvent = false ;
580
602
std::vector<std::tuple<TString, TPqMetaExtractor::TPqMetaExtractorLambda>> MetadataFields;
581
603
std::queue<TReadyBatch> ReadyBuffer;
0 commit comments