20
20
import logging
21
21
from enum import Enum
22
22
from http import HTTPStatus
23
- from typing import TYPE_CHECKING , Dict , Iterable , List , Optional , Tuple , Union
23
+ from typing import (
24
+ TYPE_CHECKING ,
25
+ Collection ,
26
+ Dict ,
27
+ Iterable ,
28
+ List ,
29
+ Optional ,
30
+ Tuple ,
31
+ Union ,
32
+ )
24
33
25
34
import attr
26
35
from signedjson .key import decode_verify_key_bytes
34
43
CodeMessageException ,
35
44
Codes ,
36
45
FederationDeniedError ,
46
+ FederationError ,
37
47
HttpResponseException ,
38
48
NotFoundError ,
39
49
RequestSendFailed ,
@@ -545,7 +555,8 @@ async def do_invite_join(
545
555
run_as_background_process (
546
556
desc = "sync_partial_state_room" ,
547
557
func = self ._sync_partial_state_room ,
548
- destination = origin ,
558
+ initial_destination = origin ,
559
+ other_destinations = ret .servers_in_room ,
549
560
room_id = room_id ,
550
561
)
551
562
@@ -1454,13 +1465,16 @@ async def get_room_complexity(
1454
1465
1455
1466
async def _sync_partial_state_room (
1456
1467
self ,
1457
- destination : str ,
1468
+ initial_destination : Optional [str ],
1469
+ other_destinations : Collection [str ],
1458
1470
room_id : str ,
1459
1471
) -> None :
1460
1472
"""Background process to resync the state of a partial-state room
1461
1473
1462
1474
Args:
1463
- destination: homeserver to pull the state from
1475
+ initial_destination: the initial homeserver to pull the state from
1476
+ other_destinations: other homeservers to try to pull the state from, if
1477
+ `initial_destination` is unavailable
1464
1478
room_id: room to be resynced
1465
1479
"""
1466
1480
@@ -1472,8 +1486,29 @@ async def _sync_partial_state_room(
1472
1486
# really leave, that might mean we have difficulty getting the room state over
1473
1487
# federation.
1474
1488
#
1475
- # TODO(faster_joins): try other destinations if the one we have fails
1489
+ # TODO(faster_joins): we need some way of prioritising which homeservers in
1490
+ # `other_destinations` to try first, otherwise we'll spend ages trying dead
1491
+ # homeservers for large rooms.
1492
+
1493
+ if initial_destination is None and len (other_destinations ) == 0 :
1494
+ raise ValueError (
1495
+ f"Cannot resync state of { room_id } : no destinations provided"
1496
+ )
1476
1497
1498
+ # Make an infinite iterator of destinations to try. Once we find a working
1499
+ # destination, we'll stick with it until it flakes.
1500
+ if initial_destination is not None :
1501
+ # Move `initial_destination` to the front of the list.
1502
+ destinations = list (other_destinations )
1503
+ if initial_destination in destinations :
1504
+ destinations .remove (initial_destination )
1505
+ destinations = [initial_destination ] + destinations
1506
+ destination_iter = itertools .cycle (destinations )
1507
+ else :
1508
+ destination_iter = itertools .cycle (other_destinations )
1509
+
1510
+ # `destination` is the current remote homeserver we're pulling from.
1511
+ destination = next (destination_iter )
1477
1512
logger .info ("Syncing state for room %s via %s" , room_id , destination )
1478
1513
1479
1514
# we work through the queue in order of increasing stream ordering.
@@ -1511,6 +1546,41 @@ async def _sync_partial_state_room(
1511
1546
allow_rejected = True ,
1512
1547
)
1513
1548
for event in events :
1514
- await self ._federation_event_handler .update_state_for_partial_state_event (
1515
- destination , event
1516
- )
1549
+ for attempt in itertools .count ():
1550
+ try :
1551
+ await self ._federation_event_handler .update_state_for_partial_state_event (
1552
+ destination , event
1553
+ )
1554
+ break
1555
+ except FederationError as e :
1556
+ if attempt == len (destinations ) - 1 :
1557
+ # We have tried every remote server for this event. Give up.
1558
+ # TODO(faster_joins) giving up isn't the right thing to do
1559
+ # if there's a temporary network outage. retrying
1560
+ # indefinitely is also not the right thing to do if we can
1561
+ # reach all homeservers and they all claim they don't have
1562
+ # the state we want.
1563
+ logger .error (
1564
+ "Failed to get state for %s at %s from %s because %s, "
1565
+ "giving up!" ,
1566
+ room_id ,
1567
+ event ,
1568
+ destination ,
1569
+ e ,
1570
+ )
1571
+ raise
1572
+
1573
+ # Try the next remote server.
1574
+ logger .info (
1575
+ "Failed to get state for %s at %s from %s because %s" ,
1576
+ room_id ,
1577
+ event ,
1578
+ destination ,
1579
+ e ,
1580
+ )
1581
+ destination = next (destination_iter )
1582
+ logger .info (
1583
+ "Syncing state for room %s via %s instead" ,
1584
+ room_id ,
1585
+ destination ,
1586
+ )
0 commit comments