@@ -45,7 +45,6 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
45
45
46
46
Become (&TThis::StateWork);
47
47
Y_UNUSED (ctx);
48
- // Y_ABORT_UNLESS(Counters);
49
48
}
50
49
51
50
STRICT_STFUNC (StateWork,
@@ -66,6 +65,7 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
66
65
const auto & ctx = ActorContext ();
67
66
auto key = MakeSessionKey (ev->Get ());
68
67
PQ_CPROXY_LOG_D (" client session connected with id '" << key.SessionId << " '" );
68
+ ChangeCounterValue (" CreateClientSessionRate" , 1 , false , true );
69
69
auto sessionIter = ServerSessions.find (key);
70
70
if (sessionIter.IsEnd ()) {
71
71
PQ_CPROXY_LOG_D (" unknown session id '" << key.SessionId << " ', close session" );
@@ -88,7 +88,11 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
88
88
// Without this response, the client might have to wait until there are topic messages to send.
89
89
ctx.Send (sender, new TEvPQProxy::TEvDirectReadDataSessionConnectedResponse (key.PartitionSessionId , ev->Get ()->Generation ));
90
90
91
+ if (!sessionIter->second .Client .Defined ()) {
92
+ ChangeCounterValue (" ActiveClientSessions" , 1 , false );
93
+ } // else Its probably a misbehavior by client (or proxy) but we can handle it anyway
91
94
sessionIter->second .Client = TCacheClientContext{sender, startingReadId};
95
+
92
96
AssignByProxy[sender].insert (key.PartitionSessionId );
93
97
while (SendNextReadToClient (sessionIter)) {
94
98
// Empty
@@ -105,6 +109,7 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
105
109
Ydb::PersQueue::ErrorCode::ErrorCode::OK, " " , ev->Sender
106
110
);
107
111
}
112
+ AssignByProxy.erase (assignIter);
108
113
}
109
114
110
115
void HandleRegister (TEvPQ::TEvRegisterDirectReadSession::TPtr& ev) {
@@ -149,6 +154,7 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
149
154
}
150
155
ChangeCounterValue (" StagedReadDataSize" , ins.first ->second ->ByteSize (), false );
151
156
ChangeCounterValue (" StagedReadsCount" , 1 , false );
157
+ ChangeCounterValue (" StagedReadsRate" , 1 , false , true );
152
158
PQ_CPROXY_LOG_D (" staged direct read id " << ev->Get ()->ReadKey .ReadId << " for session: " << sessionKey.SessionId );
153
159
}
154
160
@@ -178,6 +184,7 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
178
184
if (inserted) {
179
185
ChangeCounterValue (" PublishedReadDataSize" , stagedIter->second ->ByteSize (), false );
180
186
ChangeCounterValue (" PublishedReadsCount" , 1 , false );
187
+ ChangeCounterValue (" PublishedReadsRate" , 1 , false , true );
181
188
}
182
189
ChangeCounterValue (" StagedReadDataSize" , -stagedIter->second ->ByteSize (), false );
183
190
ChangeCounterValue (" StagedReadsCount" , -1 , false );
@@ -201,19 +208,25 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
201
208
if (iter->second .Generation != generation) { // Stale generation in event, ignore it
202
209
return ;
203
210
}
211
+ bool didForget = false ;
204
212
auto readIter = iter->second .Reads .find (ev->Get ()->ReadKey .ReadId );
205
213
if (readIter != iter->second .Reads .end ()) {
206
214
ChangeCounterValue (" PublishedReadDataSize" , -readIter->second ->ByteSize (), false );
207
215
ChangeCounterValue (" PublishedReadsCount" , -1 , false );
216
+ didForget = true ;
208
217
209
218
iter->second .Reads .erase (readIter);
210
219
}
211
220
auto stagedIter = iter->second .StagedReads .find (ev->Get ()->ReadKey .ReadId );
212
221
if (stagedIter != iter->second .StagedReads .end ()) {
213
222
ChangeCounterValue (" StagedReadDataSize" , -stagedIter->second ->ByteSize (), false );
214
223
ChangeCounterValue (" StagedReadsCount" , -1 , false );
224
+ didForget = true ;
215
225
iter->second .StagedReads .erase (stagedIter);
216
226
}
227
+ if (didForget) {
228
+ ChangeCounterValue (" ForgetReadsRate" , 1 , false , true );
229
+ }
217
230
iter->second .StagedReads .erase (ev->Get ()->ReadKey .ReadId );
218
231
}
219
232
@@ -234,13 +247,15 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
234
247
if (!assignIter.IsEnd ()) {
235
248
assignIter->second .erase (sessionIter->first .PartitionSessionId );
236
249
}
250
+ if (sessionIter->second .Client .Defined ()) {
251
+ ChangeCounterValue (" ActiveClientSessions" , -1 , false );
252
+ }
237
253
sessionIter->second .Client = Nothing ();
238
254
}
239
255
240
256
[[nodiscard]] bool DestroyServerSession (TSessionsMap::iterator sessionIter, ui64 generation) {
241
257
if (sessionIter.IsEnd () || sessionIter->second .Generation > generation)
242
258
return false ;
243
- Cerr << " CahceProxy: DestroyServerSession with generation: " << generation << Endl;
244
259
DestroyPartitionSession (sessionIter, Ydb::PersQueue::ErrorCode::READ_ERROR_NO_SESSION, " Closed by server" );
245
260
ServerSessions.erase (sessionIter);
246
261
ChangeCounterValue (" ActiveServerSessions" , ServerSessions.size (), true );
@@ -311,6 +326,7 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
311
326
return false ;
312
327
}
313
328
auto result = SendData (sessionIter->first .PartitionSessionId , client, nextData->first , nextData->second );
329
+ ChangeCounterValue (" SendDataRate" , 1 , false , true );
314
330
if (!result) {
315
331
// ToDo: for discuss. Error in parsing partition response - shall we kill the entire session or just the partition session?
316
332
DestroyClientSession (sessionIter, false , Ydb::PersQueue::ErrorCode::OK, " " );
@@ -369,10 +385,10 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
369
385
return true ;
370
386
}
371
387
372
- void ChangeCounterValue (const TString& name, i64 value, bool isAbs) {
388
+ void ChangeCounterValue (const TString& name, i64 value, bool isAbs, bool deriv = false ) {
373
389
if (!Counters)
374
390
return ;
375
- auto counter = Counters->GetCounter (name, false );
391
+ auto counter = Counters->GetCounter (name, deriv );
376
392
if (isAbs)
377
393
counter->Set (value);
378
394
else if (value >= 0 )
@@ -510,9 +526,12 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
510
526
511
527
512
528
IActor* CreatePQDReadCacheService (const ::NMonitoring::TDynamicCounterPtr& counters) {
513
- Y_VERIFY_DEBUG (counters);
514
- return new TPQDirectReadCacheService (
515
- GetServiceCounters (counters, " persqueue" )->GetSubgroup (" subsystem" , " caching_service" ));
529
+ if (counters) {
530
+ return new TPQDirectReadCacheService (
531
+ GetServiceCounters (counters, " persqueue" )->GetSubgroup (" subsystem" , " caching_service" ));
532
+ } else {
533
+ return new TPQDirectReadCacheService (nullptr );
534
+ }
516
535
}
517
536
518
537
} // namespace NKikimr::NPQ
0 commit comments