@@ -419,8 +419,6 @@ async def process_remote_join(
419
419
Raises:
420
420
SynapseError if the response is in some way invalid.
421
421
"""
422
- event_map = {e .event_id : e for e in itertools .chain (auth_events , state )}
423
-
424
422
create_event = None
425
423
for e in auth_events :
426
424
if (e .type , e .state_key ) == (EventTypes .Create , "" ):
@@ -439,11 +437,6 @@ async def process_remote_join(
439
437
if room_version .identifier != room_version_id :
440
438
raise SynapseError (400 , "Room version mismatch" )
441
439
442
- # filter out any events we have already seen
443
- seen_remotes = await self ._store .have_seen_events (room_id , event_map .keys ())
444
- for s in seen_remotes :
445
- event_map .pop (s , None )
446
-
447
440
# persist the auth chain and state events.
448
441
#
449
442
# any invalid events here will be marked as rejected, and we'll carry on.
@@ -455,7 +448,9 @@ async def process_remote_join(
455
448
# signatures right now doesn't mean that we will *never* be able to, so it
456
449
# is premature to reject them.
457
450
#
458
- await self ._auth_and_persist_outliers (room_id , event_map .values ())
451
+ await self ._auth_and_persist_outliers (
452
+ room_id , itertools .chain (auth_events , state )
453
+ )
459
454
460
455
# and now persist the join event itself.
461
456
logger .info ("Peristing join-via-remote %s" , event )
@@ -1245,6 +1240,16 @@ async def _auth_and_persist_outliers(
1245
1240
"""
1246
1241
event_map = {event .event_id : event for event in events }
1247
1242
1243
+ # filter out any events we have already seen. This might happen because
1244
+ # the events were eagerly pushed to us (eg, during a room join), or because
1245
+ # another thread has raced against us since we decided to request the event.
1246
+ #
1247
+ # This is just an optimisation, so it doesn't need to be watertight - the event
1248
+ # persister does another round of deduplication.
1249
+ seen_remotes = await self ._store .have_seen_events (room_id , event_map .keys ())
1250
+ for s in seen_remotes :
1251
+ event_map .pop (s , None )
1252
+
1248
1253
# XXX: it might be possible to kick this process off in parallel with fetching
1249
1254
# the events.
1250
1255
while event_map :
@@ -1717,31 +1722,22 @@ async def _get_remote_auth_chain_for_event(
1717
1722
event_id: the event for which we are lacking auth events
1718
1723
"""
1719
1724
try :
1720
- remote_event_map = {
1721
- e .event_id : e
1722
- for e in await self ._federation_client .get_event_auth (
1723
- destination , room_id , event_id
1724
- )
1725
- }
1725
+ remote_events = await self ._federation_client .get_event_auth (
1726
+ destination , room_id , event_id
1727
+ )
1728
+
1726
1729
except RequestSendFailed as e1 :
1727
1730
# The other side isn't around or doesn't implement the
1728
1731
# endpoint, so lets just bail out.
1729
1732
logger .info ("Failed to get event auth from remote: %s" , e1 )
1730
1733
return
1731
1734
1732
- logger .info ("/event_auth returned %i events" , len (remote_event_map ))
1735
+ logger .info ("/event_auth returned %i events" , len (remote_events ))
1733
1736
1734
1737
# `event` may be returned, but we should not yet process it.
1735
- remote_event_map .pop (event_id , None )
1736
-
1737
- # nor should we reprocess any events we have already seen.
1738
- seen_remotes = await self ._store .have_seen_events (
1739
- room_id , remote_event_map .keys ()
1740
- )
1741
- for s in seen_remotes :
1742
- remote_event_map .pop (s , None )
1738
+ remote_auth_events = (e for e in remote_events if e .event_id != event_id )
1743
1739
1744
- await self ._auth_and_persist_outliers (room_id , remote_event_map . values () )
1740
+ await self ._auth_and_persist_outliers (room_id , remote_auth_events )
1745
1741
1746
1742
async def _update_context_for_auth_events (
1747
1743
self , event : EventBase , context : EventContext , auth_events : StateMap [EventBase ]
0 commit comments