@@ -10,6 +10,7 @@ import (
10
10
"context"
11
11
"encoding/json"
12
12
"fmt"
13
+ "github.com/matrix-org/complement/runtime"
13
14
"io/ioutil"
14
15
"net"
15
16
"net/http"
@@ -3636,6 +3637,81 @@ func TestPartialStateJoin(t *testing.T) {
3636
3637
// - Join+Leave+Rejoin works
3637
3638
// - Join + remote kick works
3638
3639
// - Join + remote ban works, then cannot rejoin
3640
+
3641
+ t .Run ("Purge during resync" , func (t * testing.T ) {
3642
+ if runtime .Homeserver != runtime .Synapse {
3643
+ // TOOD: Pull this into a Synapse-specific suite when someone figures out how
3644
+ // to do that (https://github.com/matrix-org/complement/issues/226)
3645
+ t .Skipf ("Skipping test of Synapse-internal API on %s" , runtime .Homeserver )
3646
+ }
3647
+ t .Log ("Alice begins a partial join to a room" )
3648
+ alice := deployment .RegisterUser (t , "hs1" , "t46alice" , "secret" , true )
3649
+ // Ignore PDUs (leaves from shutting down the room) and EDUs (presence).
3650
+ server := createTestServer (
3651
+ t ,
3652
+ deployment ,
3653
+ federation .HandleTransactionRequests (nil , nil ),
3654
+ )
3655
+ cancel := server .Listen ()
3656
+ defer cancel ()
3657
+
3658
+ serverRoom := createTestRoom (t , server , alice .GetDefaultRoomVersion (t ))
3659
+ psjResult := beginPartialStateJoin (t , server , serverRoom , alice )
3660
+ // NB: because we do not end up joined to this room at the end of the test,
3661
+ // we do not `defer psjResult.Destroy(t)` as usual; see the comments below
3662
+ // about races.
3663
+
3664
+ t .Log ("Alice waits to see her join" )
3665
+ alice .MustSyncUntil (
3666
+ t ,
3667
+ client.SyncReq {Filter : buildLazyLoadingSyncFilter (nil )},
3668
+ client .SyncJoinedTo (alice .UserID , serverRoom .RoomID ),
3669
+ )
3670
+
3671
+ // Synapse's partial-state-resync process can race with the purge.
3672
+ // If the purge completes and Synapse then makes a /state_ids request to us
3673
+ // after we've shut down the complement test server, we can end up with flakey
3674
+ // test failures (c.f. https://github.com/matrix-org/synapse/issues/13975)
3675
+ // Avoid this by
3676
+ // - waiting for Synapse to make a state_ids request
3677
+ // - serving the response after the purge (see next comment).
3678
+
3679
+ t .Log ("Wait for /state_ids request" )
3680
+ psjResult .AwaitStateIdsRequest (t )
3681
+
3682
+ t .Log ("Alice purges that room" )
3683
+ alice .MustDoFunc (t , "DELETE" , []string {"_synapse" , "admin" , "v1" , "rooms" , serverRoom .RoomID }, client .WithJSONBody (t , map [string ]interface {}{}))
3684
+
3685
+ // Note: clients don't get told about purged rooms. No leave event for you!
3686
+ t .Log ("Alice does an initial sync after the purge, until the response does not include the purged room" )
3687
+
3688
+ // Note: we retry this sync a few times, because the purge may happen on another
3689
+ // worker to that serving the sync response.
3690
+ queryParams := url.Values {
3691
+ "timeout" : []string {"1000" },
3692
+ "filter" : []string {buildLazyLoadingSyncFilter (nil )},
3693
+ }
3694
+ matcher := match .JSONKeyMissing (
3695
+ fmt .Sprintf ("rooms.join.%s" , client .GjsonEscape (serverRoom .RoomID )),
3696
+ )
3697
+ alice .MustDoFunc (
3698
+ t ,
3699
+ "GET" ,
3700
+ []string {"_matrix" , "client" , "v3" , "sync" },
3701
+ client .WithQueries (queryParams ),
3702
+ client .WithRetryUntil (5 * time .Second , func (res * http.Response ) bool {
3703
+ body := client .ParseJSON (t , res )
3704
+ err := matcher (body )
3705
+ return err == nil
3706
+ }),
3707
+ )
3708
+
3709
+ // Send the state ids response now. Synapse will try to process it and fail
3710
+ // because of the purge. There are no other destinations here so Synapse should
3711
+ // give up the resync process and not make any more requests to the complement
3712
+ // HS, avoiding the flake described above.
3713
+ psjResult .FinishStateRequest ()
3714
+ })
3639
3715
}
3640
3716
3641
3717
// test reception of an event over federation during a resync
0 commit comments