8
8
#include < ydb/library/actors/core/hfunc.h>
9
9
#include < ydb/library/actors/core/interconnect.h>
10
10
#include < ydb/library/actors/core/log.h>
11
- #include < ydb/library/actors/helpers/flow_controlled_queue.h>
12
11
13
12
#include < util/digest/city.h>
14
13
#include < util/generic/xrange.h>
@@ -235,15 +234,11 @@ class TStateStorageProxyRequest : public TActor<TStateStorageProxyRequest> {
235
234
}
236
235
237
236
template <typename TEv>
238
- void PrepareInit (TEv *ev, bool allowFlowControlled ) {
237
+ void PrepareInit (TEv *ev) {
239
238
TabletID = ev->TabletID ;
240
239
Cookie = ev->Cookie ;
241
240
ProxyOptions = ev->ProxyOptions ;
242
-
243
- if (allowFlowControlled && FlowControlledInfo.Get () && KIKIMR_ALLOW_FLOWCONTROLLED_QUEUE_FOR_SSLOOKUP)
244
- SelectRequestReplicas (FlowControlledInfo.Get ());
245
- else
246
- SelectRequestReplicas (Info.Get ());
241
+ SelectRequestReplicas (Info.Get ());
247
242
}
248
243
249
244
// request setup
@@ -253,7 +248,7 @@ class TStateStorageProxyRequest : public TActor<TStateStorageProxyRequest> {
253
248
BLOG_D (" ProxyRequest::HandleInit ev: " << msg->ToString ());
254
249
Source = ev->Sender ;
255
250
256
- PrepareInit (msg, true );
251
+ PrepareInit (msg);
257
252
SendRequest ([this ](ui64 cookie) { return new TEvStateStorage::TEvReplicaLookup (TabletID, cookie); });
258
253
259
254
Become (&TThis::StateLookup, TDuration::MicroSeconds (StateStorageRequestTimeout), new TEvents::TEvWakeup ());
@@ -264,7 +259,7 @@ class TStateStorageProxyRequest : public TActor<TStateStorageProxyRequest> {
264
259
BLOG_D (" ProxyRequest::HandleInit ev: %s" << msg->ToString ());
265
260
Source = ev->Sender ;
266
261
267
- PrepareInit (msg, false );
262
+ PrepareInit (msg);
268
263
269
264
SuggestedLeader = msg->ProposedLeader ;
270
265
SuggestedLeaderTablet = msg->ProposedLeaderTablet ;
@@ -286,7 +281,7 @@ class TStateStorageProxyRequest : public TActor<TStateStorageProxyRequest> {
286
281
BLOG_D (" ProxyRequest::HandleInit ev: " << msg->ToString ());
287
282
Source = ev->Sender ;
288
283
289
- PrepareInit (msg, false );
284
+ PrepareInit (msg);
290
285
291
286
SuggestedLeader = msg->ProposedLeader ;
292
287
SuggestedGeneration = msg->ProposedGeneration ;
@@ -866,13 +861,11 @@ class TStateStorageProxy : public TActor<TStateStorageProxy> {
866
861
867
862
void Handle (TEvStateStorage::TEvUpdateGroupConfig::TPtr &ev) {
868
863
auto *msg = ev->Get ();
869
- TIntrusivePtr<TStateStorageInfo> old = Info;
870
-
871
864
Info = msg->GroupConfig ;
872
865
BoardInfo = msg->BoardConfig ;
873
866
SchemeBoardInfo = msg->SchemeBoardConfig ;
874
867
875
- RegisterDerivedServices (TlsActivationContext->ExecutorThread .ActorSystem , old. Get () );
868
+ RegisterReplicaProbes (TlsActivationContext->ExecutorThread .ActorSystem );
876
869
877
870
for (const auto & [key, value] : Subscriptions) {
878
871
const auto & [sender, cookie] = key;
@@ -885,11 +878,6 @@ class TStateStorageProxy : public TActor<TStateStorageProxy> {
885
878
}
886
879
}
887
880
888
- void RegisterDerivedServices (TActorSystem *sys, const TStateStorageInfo *old) {
889
- RegisterReplicaProbes (sys);
890
- RegisterFlowContolled (sys, old);
891
- }
892
-
893
881
void RegisterReplicaProbes (TActorSystem *sys) {
894
882
if (!KIKIMR_ALLOW_SSREPLICA_PROBES)
895
883
return ;
@@ -916,65 +904,8 @@ class TStateStorageProxy : public TActor<TStateStorageProxy> {
916
904
Send (ev->Sender , reply.Release (), 0 , ev->Cookie );
917
905
}
918
906
919
- void RegisterFlowContolled (TActorSystem *sys, const TStateStorageInfo *old) {
920
- if (!KIKIMR_ALLOW_FLOWCONTROLLED_QUEUE_FOR_SSLOOKUP)
921
- return ;
922
-
923
- TIntrusivePtr<TStateStorageInfo> updated = new TStateStorageInfo ();
924
- updated->NToSelect = Info->NToSelect ;
925
- updated->Rings .resize (Info->Rings .size ());
926
-
927
- const bool checkOldInfo = FlowControlledInfo && old
928
- && updated->NToSelect == FlowControlledInfo->NToSelect
929
- && updated->Rings .size () == FlowControlledInfo->Rings .size ();
930
-
931
- ui32 ringIdx = 0 ;
932
- for (const ui32 ringsSz = Info->Rings .size (); ringIdx < ringsSz; ++ringIdx) {
933
- const bool checkRing = checkOldInfo && (FlowControlledInfo->Rings [ringIdx].Replicas .size () == Info->Rings [ringIdx].Replicas .size ());
934
-
935
- TStateStorageInfo::TRing &ctring = updated->Rings [ringIdx];
936
- TStateStorageInfo::TRing *fcring = checkRing ? &FlowControlledInfo->Rings [ringIdx] : nullptr ;
937
- const auto &srcring = Info->Rings [ringIdx];
938
- const auto *oldring = checkRing ? &old->Rings [ringIdx] : nullptr ;
939
-
940
- ctring.Replicas .resize (srcring.Replicas .size ());
941
- ui32 replicaIdx = 0 ;
942
- for (const ui32 srcSize = srcring.Replicas .size (); replicaIdx < srcSize; ++replicaIdx) {
943
- if (checkRing && srcring.Replicas [replicaIdx] == oldring->Replicas [replicaIdx]) {
944
- ctring.Replicas [replicaIdx] = fcring->Replicas [replicaIdx];
945
- fcring->Replicas [replicaIdx] = TActorId ();
946
- } else {
947
- if (fcring && replicaIdx < fcring->Replicas .size ())
948
- Send (fcring->Replicas [replicaIdx], new TEvents::TEvPoison ());
949
-
950
- TFlowControlledQueueConfig flowConfig;
951
- flowConfig.MaxAllowedInFly = 10000 ;
952
- flowConfig.TargetDynamicRate = 250000 ;
953
-
954
- ctring.Replicas [replicaIdx] = sys->Register (
955
- CreateFlowControlledRequestQueue (srcring.Replicas [replicaIdx], NKikimrServices::TActivity::SS_PROXY_REQUEST, flowConfig),
956
- TMailboxType::ReadAsFilled
957
- );
958
- }
959
- }
960
- if (fcring) {
961
- for (const ui32 fcSize = fcring->Replicas .size (); replicaIdx < fcSize; ++replicaIdx) {
962
- Send (fcring->Replicas [replicaIdx], new TEvents::TEvPoison ());
963
- }
964
- }
965
- }
966
- if (FlowControlledInfo) {
967
- for (const ui32 oldSize = FlowControlledInfo->Rings .size (); ringIdx < oldSize; ++ringIdx) {
968
- for (TActorId outdated : FlowControlledInfo->Rings [oldSize].Replicas )
969
- Send (outdated, new TEvents::TEvPoison ());
970
- }
971
- }
972
-
973
- FlowControlledInfo = std::move (updated);
974
- }
975
-
976
907
void Registered (TActorSystem* sys, const TActorId&) {
977
- RegisterDerivedServices (sys, nullptr );
908
+ RegisterReplicaProbes (sys);
978
909
}
979
910
public:
980
911
static constexpr NKikimrServices::TActivity::EType ActorActivityType () {
0 commit comments