@@ -88,6 +88,10 @@ struct TEvPrivate {
88
88
struct TEvPqEventsReady : public TEventLocal <TEvPqEventsReady, EvPqEventsReady> {};
89
89
};
90
90
91
+ TString MakeStringForLog (const NDqProto::TCheckpoint& checkpoint) {
92
+ return TStringBuilder () << " [Checkpoint " << checkpoint.GetGeneration () << " ." << checkpoint.GetId () << " ] " ;
93
+ }
94
+
91
95
} // namespace
92
96
93
97
class TDqPqWriteActor : public NActors ::TActor<TDqPqWriteActor>, public IDqComputeActorAsyncOutput {
@@ -155,6 +159,7 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
155
159
, LogPrefix(TStringBuilder() << " SelfId: " << this ->SelfId () << ", TxId: " << TxId << ", TaskId: " << taskId << ", PQ sink. ")
156
160
, FreeSpace(freeSpace)
157
161
, PqGateway(pqGateway)
162
+ , TaskId(taskId)
158
163
{
159
164
EgressStats.Level = statsLevel;
160
165
}
@@ -215,11 +220,11 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
215
220
216
221
if (checkpoint) {
217
222
if (Buffer.empty () && WaitingAcks.empty ()) {
218
- SINK_LOG_D (" Send checkpoint state immediately" );
223
+ SINK_LOG_D (MakeStringForLog (*checkpoint) << " Send checkpoint state immediately" );
219
224
Callbacks->OnAsyncOutputStateSaved (BuildState (*checkpoint), OutputIndex, *checkpoint);
220
225
} else {
221
226
ui64 seqNo = NextSeqNo + Buffer.size () - 1 ;
222
- SINK_LOG_D (" Defer sending the checkpoint, seqNo: " << seqNo);
227
+ SINK_LOG_D (MakeStringForLog (*checkpoint) << " Defer sending the checkpoint, seqNo: " << seqNo);
223
228
Metrics.InFlyCheckpoints ->Inc ();
224
229
DeferredCheckpoints.emplace (seqNo, *checkpoint);
225
230
}
@@ -275,10 +280,18 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
275
280
)
276
281
277
282
void Handle(TEvPrivate::TEvPqEventsReady::TPtr&) {
283
+ if (!Inited) {
284
+ Init ();
285
+ Inited = true ;
286
+ }
278
287
while (HandleNewPQEvents ()) { }
279
288
SubscribeOnNextEvent ();
280
289
}
281
290
291
+ void Init () {
292
+ LogPrefix = TStringBuilder () << " SelfId: " << this ->SelfId () << " , TxId: " << TxId << " , TaskId: " << TaskId << " , PQ sink. " ;
293
+ }
294
+
282
295
// IActor & IDqComputeActorAsyncOutput
283
296
void PassAway () override { // Is called from Compute Actor
284
297
if (WriteSession) {
@@ -433,7 +446,7 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
433
446
if (!Self.DeferredCheckpoints .empty () && std::get<0 >(Self.DeferredCheckpoints .front ()) == it->SeqNo ) {
434
447
Self.ConfirmedSeqNo = it->SeqNo ;
435
448
const auto & checkpoint = std::get<1 >(Self.DeferredCheckpoints .front ());
436
- LOG_D (Self.LogPrefix << " Send a deferred checkpoint, seqNo: " << it->SeqNo );
449
+ LOG_D (Self.LogPrefix << MakeStringForLog (checkpoint) << " Send a deferred checkpoint, seqNo: " << it->SeqNo );
437
450
Self.Callbacks ->OnAsyncOutputStateSaved (Self.BuildState (checkpoint), Self.OutputIndex , checkpoint);
438
451
Self.DeferredCheckpoints .pop ();
439
452
Self.Metrics .InFlyCheckpoints ->Dec ();
@@ -479,7 +492,7 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
479
492
NYdb::TDriver Driver;
480
493
std::shared_ptr<NYdb::ICredentialsProviderFactory> CredentialsProviderFactory;
481
494
IDqComputeActorAsyncOutput::ICallbacks* const Callbacks;
482
- const TString LogPrefix;
495
+ TString LogPrefix;
483
496
i64 FreeSpace = 0 ;
484
497
bool Finished = false ;
485
498
@@ -495,6 +508,8 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
495
508
std::queue<TAckInfo> WaitingAcks; // Size of items which are waiting for acks (used to update free space)
496
509
std::queue<std::tuple<ui64, NDqProto::TCheckpoint>> DeferredCheckpoints;
497
510
IPqGateway::TPtr PqGateway;
511
+ ui64 TaskId;
512
+ bool Inited = false ;
498
513
};
499
514
500
515
std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqPqWriteActor (
0 commit comments