@@ -50,12 +50,12 @@ class TDatabaseSubscriberActor : public TActor<TDatabaseSubscriberActor> {
50
50
};
51
51
52
52
using TBase = TActor<TDatabaseSubscriberActor>;
53
- using TDatabaseStatePtr = typename std::list<TDatabaseState>::iterator;
54
53
55
54
public:
56
55
TDatabaseSubscriberActor (TDuration idleTimeout)
57
56
: TBase(&TDatabaseSubscriberActor::StateFunc)
58
57
, IdleTimeout(idleTimeout)
58
+ , DatabaseStates(std::numeric_limits<size_t >::max())
59
59
{}
60
60
61
61
void Registered (TActorSystem* sys, const TActorId& owner) {
@@ -65,86 +65,76 @@ class TDatabaseSubscriberActor : public TActor<TDatabaseSubscriberActor> {
65
65
66
66
void Handle (TEvPrivate::TEvSubscribeOnDatabase::TPtr& ev) {
67
67
const TString& database = ev->Get ()->Database ;
68
- const auto it = DatabasePathToState. find (database);
68
+ auto databaseStateIt = DatabaseStates. Find (database);
69
69
70
- if (it == DatabasePathToState.end ()) {
71
- DatabaseStates.emplace_front (TDatabaseState{.Database = database});
72
- DatabasePathToState.insert ({database, DatabaseStates.begin ()});
70
+ if (databaseStateIt == DatabaseStates.End ()) {
71
+ DatabaseStates.Insert ({database, TDatabaseState{.Database = database}});
73
72
Register (NWorkload::CreateDatabaseFetcherActor (SelfId (), database));
74
73
StartIdleCheck ();
75
74
return ;
76
75
}
77
76
78
- const auto databaseState = it-> second ;
79
- if (databaseState ->DatabaseId ) {
80
- SendSubscriberInfo (*databaseState , Ydb::StatusIds::SUCCESS);
77
+ databaseStateIt-> LastUpdateTime = TInstant::Now () ;
78
+ if (databaseStateIt ->DatabaseId ) {
79
+ SendSubscriberInfo (*databaseStateIt , Ydb::StatusIds::SUCCESS);
81
80
}
82
81
}
83
82
84
83
void Handle (TEvPrivate::TEvPingDatabaseSubscription::TPtr& ev) {
85
- const auto it = DatabasePathToState. find (ev->Get ()->Database );
86
- if (it == DatabasePathToState. end ()) {
87
- return ;
84
+ auto databaseStateIt = DatabaseStates. Find (ev->Get ()->Database );
85
+ if (databaseStateIt != DatabaseStates. End ()) {
86
+ databaseStateIt-> LastUpdateTime = TInstant::Now () ;
88
87
}
89
-
90
- TDatabaseState databaseState = *it->second ;
91
- databaseState.LastUpdateTime = TInstant::Now ();
92
-
93
- DatabaseStates.erase (it->second );
94
- DatabaseStates.emplace_front (databaseState);
95
- it->second = DatabaseStates.begin ();
96
88
}
97
89
98
90
void Handle (NWorkload::TEvFetchDatabaseResponse::TPtr& ev) {
99
- const auto it = DatabasePathToState. find (ev->Get ()->Database );
100
- if (it == DatabasePathToState. end ()) {
91
+ auto databaseStateIt = DatabaseStates. Find (ev->Get ()->Database );
92
+ if (databaseStateIt == DatabaseStates. End ()) {
101
93
return ;
102
94
}
103
95
104
- const auto databaseState = it->second ;
105
- databaseState->FetchRequestIsRunning = false ;
106
- UpdateDatabaseState (*databaseState, ev->Get ()->PathId , ev->Get ()->Serverless );
107
- SendSubscriberInfo (*databaseState, ev->Get ()->Status , ev->Get ()->Issues );
96
+ databaseStateIt->FetchRequestIsRunning = false ;
97
+ UpdateDatabaseState (*databaseStateIt, ev->Get ()->PathId , ev->Get ()->Serverless );
98
+ SendSubscriberInfo (*databaseStateIt, ev->Get ()->Status , ev->Get ()->Issues );
108
99
109
100
if (ev->Get ()->Status == Ydb::StatusIds::SUCCESS) {
110
101
FreeWatchKey++;
111
- databaseState ->WatchKey = FreeWatchKey;
102
+ databaseStateIt ->WatchKey = FreeWatchKey;
112
103
Send (MakeSchemeCacheID (), new TEvTxProxySchemeCache::TEvWatchPathId (ev->Get ()->PathId , FreeWatchKey));
113
104
}
114
105
}
115
106
116
107
void Handle (TEvTxProxySchemeCache::TEvWatchNotifyDeleted::TPtr& ev) {
117
- const auto it = DatabasePathToState. find (ev->Get ()->Path );
118
- if (it == DatabasePathToState. end ()) {
108
+ auto databaseStateIt = DatabaseStates. Find (ev->Get ()->Path );
109
+ if (databaseStateIt == DatabaseStates. End ()) {
119
110
return ;
120
111
}
121
112
122
- const auto databaseState = it->second ;
123
- UnsubscribeFromSchemeCache (*databaseState);
124
- SendSubscriberInfo (*databaseState, Ydb::StatusIds::NOT_FOUND, {NYql::TIssue{" Database was dropped" }});
125
- DatabasePathToState.erase (it);
126
- DatabaseStates.erase (databaseState);
113
+ UnsubscribeFromSchemeCache (*databaseStateIt);
114
+ SendSubscriberInfo (*databaseStateIt, Ydb::StatusIds::NOT_FOUND, {NYql::TIssue{" Database was dropped" }});
115
+ DatabaseStates.Erase (databaseStateIt);
127
116
}
128
117
129
118
void HandlePoison () {
130
- for (auto & databaseState : DatabaseStates) {
131
- UnsubscribeFromSchemeCache (databaseState);
132
- }
133
-
119
+ Send (MakeSchemeCacheID (), new TEvTxProxySchemeCache::TEvWatchRemove (0 ));
134
120
TBase::PassAway ();
135
121
}
136
122
137
123
void HandleWakeup () {
138
124
IdleCheckStarted = false ;
139
125
const auto minimalTime = TInstant::Now () - IdleTimeout;
140
- while (!DatabaseStates.empty () && DatabaseStates.back ().LastUpdateTime <= minimalTime) {
141
- UnsubscribeFromSchemeCache (DatabaseStates.back ());
142
- SendSubscriberInfo (DatabaseStates.back (), Ydb::StatusIds::ABORTED, {NYql::TIssue{" Database subscription was dropped by idle timeout" }});
143
- DatabasePathToState.erase (DatabaseStates.back ().Database );
144
- DatabaseStates.pop_back ();
126
+ while (!DatabaseStates.Empty ()) {
127
+ auto oldestIt = DatabaseStates.FindOldest ();
128
+ if (oldestIt->LastUpdateTime > minimalTime) {
129
+ break ;
130
+ }
131
+
132
+ UnsubscribeFromSchemeCache (*oldestIt);
133
+ SendSubscriberInfo (*oldestIt, Ydb::StatusIds::ABORTED, {NYql::TIssue{" Database subscription was dropped by idle timeout" }});
134
+ DatabaseStates.Erase (oldestIt);
145
135
}
146
136
147
- if (!DatabaseStates.empty ()) {
137
+ if (!DatabaseStates.Empty ()) {
148
138
StartIdleCheck ();
149
139
}
150
140
}
@@ -162,6 +152,7 @@ class TDatabaseSubscriberActor : public TActor<TDatabaseSubscriberActor> {
162
152
163
153
private:
164
154
static void UpdateDatabaseState(TDatabaseState& databaseState, TPathId pathId, bool serverless) {
155
+ databaseState.LastUpdateTime = TInstant::Now ();
165
156
databaseState.DatabaseId = (serverless ? TStringBuilder () << pathId.OwnerId << " :" << pathId.LocalPathId << " :" : TStringBuilder ()) << databaseState.Database ;
166
157
databaseState.Serverless = serverless;
167
158
}
@@ -173,7 +164,7 @@ class TDatabaseSubscriberActor : public TActor<TDatabaseSubscriberActor> {
173
164
}
174
165
}
175
166
176
- void SendSubscriberInfo (TDatabaseState& databaseState, Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) {
167
+ void SendSubscriberInfo (const TDatabaseState& databaseState, Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) {
177
168
if (status == Ydb::StatusIds::SUCCESS || status == Ydb::StatusIds::UNSUPPORTED) {
178
169
Send (Owner, new TEvKqp::TEvUpdateDatabaseInfo (databaseState.Database , databaseState.DatabaseId , databaseState.Serverless ));
179
170
} else {
@@ -197,8 +188,7 @@ class TDatabaseSubscriberActor : public TActor<TDatabaseSubscriberActor> {
197
188
TActorId Owner;
198
189
bool IdleCheckStarted = false ;
199
190
200
- std::unordered_map<TString, TDatabaseStatePtr> DatabasePathToState;
201
- std::list<TDatabaseState> DatabaseStates;
191
+ TLRUCache<TString, TDatabaseState> DatabaseStates;
202
192
ui32 FreeWatchKey = 0 ;
203
193
};
204
194
0 commit comments