12
12
# See the License for the specific language governing permissions and
13
13
# limitations under the License.
14
14
15
- from typing import TYPE_CHECKING , Any , Dict , Iterable , List , Tuple , cast
15
+ from typing import TYPE_CHECKING , Any , Dict , Iterable , List , Optional , Tuple , cast
16
16
17
17
from synapse .api .presence import PresenceState , UserPresenceState
18
18
from synapse .replication .tcp .streams import PresenceStream
22
22
LoggingDatabaseConnection ,
23
23
LoggingTransaction ,
24
24
)
25
+ from synapse .storage .databases .main .cache import CacheInvalidationWorkerStore
25
26
from synapse .storage .engines import PostgresEngine
26
27
from synapse .storage .types import Connection
27
28
from synapse .storage .util .id_generators import (
@@ -56,7 +57,7 @@ def __init__(
56
57
)
57
58
58
59
59
- class PresenceStore (PresenceBackgroundUpdateStore ):
60
+ class PresenceStore (PresenceBackgroundUpdateStore , CacheInvalidationWorkerStore ):
60
61
def __init__ (
61
62
self ,
62
63
database : DatabasePool ,
@@ -281,20 +282,25 @@ async def should_user_receive_full_presence_with_token(
281
282
True if the user should have full presence sent to them, False otherwise.
282
283
"""
283
284
284
- def _should_user_receive_full_presence_with_token_txn (
285
- txn : LoggingTransaction ,
286
- ) -> bool :
287
- sql = """
288
- SELECT 1 FROM users_to_send_full_presence_to
289
- WHERE user_id = ?
290
- AND presence_stream_id >= ?
291
- """
292
- txn .execute (sql , (user_id , from_token ))
293
- return bool (txn .fetchone ())
285
+ token = await self ._get_when_user_should_receive_full_presence (user_id )
286
+ if token is None :
287
+ return False
294
288
295
- return await self .db_pool .runInteraction (
296
- "should_user_receive_full_presence_with_token" ,
297
- _should_user_receive_full_presence_with_token_txn ,
289
+ return from_token <= token
290
+
291
+ @cached ()
292
+ async def _get_when_user_should_receive_full_presence (
293
+ self , user_id : str
294
+ ) -> Optional [int ]:
295
+ """Return the presence stream token, if any, which should trigger the
296
+ user to receive full presence.
297
+ """
298
+ return await self .db_pool .simple_select_one_onecol (
299
+ table = "users_to_send_full_presence_to" ,
300
+ keyvalues = {"user_id" : user_id },
301
+ retcol = "presence_stream_id" ,
302
+ allow_none = True ,
303
+ desc = "_get_when_user_should_receive_full_presence" ,
298
304
)
299
305
300
306
async def add_users_to_send_full_presence_to (self , user_ids : Iterable [str ]) -> None :
@@ -307,18 +313,28 @@ async def add_users_to_send_full_presence_to(self, user_ids: Iterable[str]) -> N
307
313
# Add user entries to the table, updating the presence_stream_id column if the user already
308
314
# exists in the table.
309
315
presence_stream_id = self ._presence_id_gen .get_current_token ()
310
- await self .db_pool .simple_upsert_many (
311
- table = "users_to_send_full_presence_to" ,
312
- key_names = ("user_id" ,),
313
- key_values = [(user_id ,) for user_id in user_ids ],
314
- value_names = ("presence_stream_id" ,),
315
- # We save the current presence stream ID token along with the user ID entry so
316
- # that when a user /sync's, even if they syncing multiple times across separate
317
- # devices at different times, each device will receive full presence once - when
318
- # the presence stream ID in their sync token is less than the one in the table
319
- # for their user ID.
320
- value_values = [(presence_stream_id ,) for _ in user_ids ],
321
- desc = "add_users_to_send_full_presence_to" ,
316
+
317
+ def _add_users_to_send_full_presence_to (txn : LoggingTransaction ) -> None :
318
+ self .db_pool .simple_upsert_many_txn (
319
+ txn ,
320
+ table = "users_to_send_full_presence_to" ,
321
+ key_names = ("user_id" ,),
322
+ key_values = [(user_id ,) for user_id in user_ids ],
323
+ value_names = ("presence_stream_id" ,),
324
+ # We save the current presence stream ID token along with the user ID entry so
325
+ # that when a user /sync's, even if they syncing multiple times across separate
326
+ # devices at different times, each device will receive full presence once - when
327
+ # the presence stream ID in their sync token is less than the one in the table
328
+ # for their user ID.
329
+ value_values = [(presence_stream_id ,) for _ in user_ids ],
330
+ )
331
+ for user_id in user_ids :
332
+ self ._invalidate_cache_and_stream (
333
+ txn , self ._get_when_user_should_receive_full_presence , (user_id ,)
334
+ )
335
+
336
+ return await self .db_pool .runInteraction (
337
+ "add_users_to_send_full_presence_to" , _add_users_to_send_full_presence_to
322
338
)
323
339
324
340
async def get_presence_for_all_users (
0 commit comments