@@ -918,6 +918,114 @@ func TestImportHistoricalMessages(t *testing.T) {
918
918
},
919
919
})
920
920
})
921
+
922
+ // We're testing to make sure historical messages show up for a remote
923
+ // federated homeserver even when the homeserver is missing the part of
924
+ // the timeline where the marker events were sent and it paginates before
925
+ // they occured to see if the history is available. Making sure the
926
+ // homeserver processes all of the markers from the current state instead
927
+ // of just when it sees them in the timeline.
928
+ testHistoricalMessagesAppearForRemoteHomeserverWhenMissingPartOfTimelineWithMarker := func (t * testing.T , numBatches int ) {
929
+ t .Helper ()
930
+
931
+ roomID := as .CreateRoom (t , createPublicRoomOpts )
932
+ alice .JoinRoom (t , roomID , nil )
933
+
934
+ eventIDsBefore := createMessagesInRoom (t , alice , roomID , numBatches , "eventIDsBefore" )
935
+ timeAfterEventBefore := time .Now ()
936
+
937
+ eventIDsAfter := createMessagesInRoom (t , alice , roomID , 3 , "eventIDsAfter" )
938
+ eventIDAfter := eventIDsAfter [0 ]
939
+
940
+ // Join the room from a remote homeserver before the historical messages were sent
941
+ remoteCharlie .JoinRoom (t , roomID , []string {"hs1" })
942
+
943
+ // Make sure all of the events have been backfilled for the remote user
944
+ // before we leave the room
945
+ fetchUntilMessagesResponseHas (t , remoteCharlie , roomID , func (ev gjson.Result ) bool {
946
+ if ev .Get ("event_id" ).Str == eventIDsBefore [0 ] {
947
+ return true
948
+ }
949
+
950
+ return false
951
+ })
952
+
953
+ // Leave before the historical messages are imported
954
+ remoteCharlie .LeaveRoom (t , roomID )
955
+
956
+ var expectedEventIDs []string
957
+ for i := 0 ; i < numBatches ; i ++ {
958
+ // Create separate disconnected batches
959
+ batchSendRes := batchSendHistoricalMessages (
960
+ t ,
961
+ as ,
962
+ roomID ,
963
+ eventIDsBefore [i ],
964
+ "" ,
965
+ createJoinStateEventsForBatchSendRequest ([]string {virtualUserID }, timeAfterEventBefore ),
966
+ createMessageEventsForBatchSendRequest ([]string {virtualUserID }, timeAfterEventBefore , 2 ),
967
+ // Status
968
+ 200 ,
969
+ )
970
+ batchSendResBody := client .ParseJSON (t , batchSendRes )
971
+ historicalEventIDs := client .GetJSONFieldStringArray (t , batchSendResBody , "event_ids" )
972
+ baseInsertionEventID := client .GetJSONFieldStr (t , batchSendResBody , "base_insertion_event_id" )
973
+
974
+ // Store the historical events we will expect to see later
975
+ expectedEventIDs = append (expectedEventIDs , historicalEventIDs ... )
976
+
977
+ // Send the marker event which lets remote homeservers know there are
978
+ // some historical messages back at the given insertion event. We
979
+ // purposely use the local user Alice here as remoteCharlie isn't even
980
+ // in the room at this point in time and even if they were, the purpose
981
+ // of this test is to make sure the remote-join will pick up the state,
982
+ // not our backfill here.
983
+ sendMarkerAndEnsureBackfilled (t , as , alice , roomID , baseInsertionEventID )
984
+ }
985
+
986
+ // Add some events after the marker so that remoteCharlie doesn't see the marker
987
+ createMessagesInRoom (t , alice , roomID , 3 , "eventIDFiller" )
988
+
989
+ // Join the room from a remote homeserver after the historical messages were sent
990
+ remoteCharlie .JoinRoom (t , roomID , []string {"hs1" })
991
+
992
+ // From the remote user, make a /context request for eventIDAfter to get
993
+ // pagination token before the marker event
994
+ contextRes := remoteCharlie .MustDoFunc (t , "GET" , []string {"_matrix" , "client" , "r0" , "rooms" , roomID , "context" , eventIDAfter }, client .WithContentType ("application/json" ), client .WithQueries (url.Values {
995
+ "limit" : []string {"0" },
996
+ }))
997
+ contextResResBody := client .ParseJSON (t , contextRes )
998
+ paginationTokenBeforeMarker := client .GetJSONFieldStr (t , contextResResBody , "end" )
999
+
1000
+ // Start the /messages request from that pagination token which
1001
+ // jumps/skips over the marker event in the timeline. This is the key
1002
+ // part of the test. We want to make sure that new marker state can be
1003
+ // injested and processed to reveal the imported history after a
1004
+ // remote-join without paginating and backfilling over the spot in the
1005
+ // timeline with the marker event.
1006
+ //
1007
+ // We don't want to use `validateBatchSendRes(t, remoteCharlie, roomID,
1008
+ // batchSendRes, false)` here because it tests against the full message
1009
+ // response and we need to skip past the marker in the timeline.
1010
+ paginateUntilMessageCheckOff (t , remoteCharlie , roomID , paginationTokenBeforeMarker , expectedEventIDs , []string {})
1011
+ }
1012
+
1013
+ t .Run ("Historical messages show up for remote federated homeserver even when the homeserver is missing the part of the timeline where the marker was sent and it paginates before it occured" , func (t * testing.T ) {
1014
+ t .Parallel ()
1015
+
1016
+ testHistoricalMessagesAppearForRemoteHomeserverWhenMissingPartOfTimelineWithMarker (t , 1 )
1017
+ })
1018
+
1019
+ t .Run ("Historical messages show up for remote federated homeserver even when the homeserver is missing the part of the timeline where multiple marker events were sent and it paginates before they occured" , func (t * testing.T ) {
1020
+ t .Parallel ()
1021
+
1022
+ testHistoricalMessagesAppearForRemoteHomeserverWhenMissingPartOfTimelineWithMarker (
1023
+ t ,
1024
+ // Anything above 1 here should be sufficient to test whether we can
1025
+ // process all of the current state to injest all of the marker events
1026
+ 2 ,
1027
+ )
1028
+ })
921
1029
})
922
1030
923
1031
t .Run ("Existing room versions" , func (t * testing.T ) {
@@ -1092,6 +1200,103 @@ func fetchUntilMessagesResponseHas(t *testing.T, c *client.CSAPI, roomID string,
1092
1200
}
1093
1201
}
1094
1202
1203
+ // Paginate the /messages endpoint until we find all of the expectedEventIds
1204
+ // (order does not matter). If any event in denyListEventIDs is found, an error
1205
+ // will be thrown.
1206
+ func paginateUntilMessageCheckOff (t * testing.T , c * client.CSAPI , roomID string , fromPaginationToken string , expectedEventIDs []string , denyListEventIDs []string ) {
1207
+ t .Helper ()
1208
+ start := time .Now ()
1209
+
1210
+ workingExpectedEventIDMap := make (map [string ]string )
1211
+ for _ , expectedEventID := range expectedEventIDs {
1212
+ workingExpectedEventIDMap [expectedEventID ] = expectedEventID
1213
+ }
1214
+
1215
+ denyEventIDMap := make (map [string ]string )
1216
+ for _ , denyEventID := range denyListEventIDs {
1217
+ denyEventIDMap [denyEventID ] = denyEventID
1218
+ }
1219
+
1220
+ var actualEventIDList []string
1221
+ callCounter := 0
1222
+ messageResEnd := fromPaginationToken
1223
+ generateErrorMesssageInfo := func () string {
1224
+ i := 0
1225
+ leftoverEventIDs := make ([]string , len (workingExpectedEventIDMap ))
1226
+ for eventID := range workingExpectedEventIDMap {
1227
+ leftoverEventIDs [i ] = eventID
1228
+ i ++
1229
+ }
1230
+
1231
+ return fmt .Sprintf ("Called /messages %d times but only found %d/%d expected messages. Leftover messages we expected (%d): %s. We saw %d events over all of the API calls: %s" ,
1232
+ callCounter ,
1233
+ len (expectedEventIDs )- len (leftoverEventIDs ),
1234
+ len (expectedEventIDs ),
1235
+ len (leftoverEventIDs ),
1236
+ leftoverEventIDs ,
1237
+ len (actualEventIDList ),
1238
+ actualEventIDList ,
1239
+ )
1240
+ }
1241
+
1242
+ for {
1243
+ if time .Since (start ) > c .SyncUntilTimeout {
1244
+ t .Fatalf (
1245
+ "paginateUntilMessageCheckOff timed out. %s" ,
1246
+ generateErrorMesssageInfo (),
1247
+ )
1248
+ }
1249
+
1250
+ messagesRes := c .MustDoFunc (t , "GET" , []string {"_matrix" , "client" , "r0" , "rooms" , roomID , "messages" }, client .WithContentType ("application/json" ), client .WithQueries (url.Values {
1251
+ "dir" : []string {"b" },
1252
+ "limit" : []string {"100" },
1253
+ "from" : []string {messageResEnd },
1254
+ }))
1255
+ callCounter ++
1256
+ messsageResBody := client .ParseJSON (t , messagesRes )
1257
+ messageResEnd = client .GetJSONFieldStr (t , messsageResBody , "end" )
1258
+ // Since the original body can only be read once, create a new one from the body bytes we just read
1259
+ messagesRes .Body = ioutil .NopCloser (bytes .NewBuffer (messsageResBody ))
1260
+
1261
+ foundEventInMessageResponse := false
1262
+ must .MatchResponse (t , messagesRes , match.HTTPResponse {
1263
+ JSON : []match.JSON {
1264
+ match .JSONArrayEach ("chunk" , func (ev gjson.Result ) error {
1265
+ foundEventInMessageResponse = true
1266
+ eventID := ev .Get ("event_id" ).Str
1267
+ actualEventIDList = append (actualEventIDList , eventID )
1268
+
1269
+ if _ , keyExists := denyEventIDMap [eventID ]; keyExists {
1270
+ return fmt .Errorf (
1271
+ "paginateUntilMessageCheckOff found unexpected message=%s in deny list while paginating. %s" ,
1272
+ eventID ,
1273
+ generateErrorMesssageInfo (),
1274
+ )
1275
+ }
1276
+
1277
+ if _ , keyExists := workingExpectedEventIDMap [eventID ]; keyExists {
1278
+ delete (workingExpectedEventIDMap , eventID )
1279
+ }
1280
+
1281
+ return nil
1282
+ }),
1283
+ },
1284
+ })
1285
+
1286
+ if ! foundEventInMessageResponse {
1287
+ t .Fatalf (
1288
+ "paginateUntilMessageCheckOff reached the end of the messages without finding all expected events. %s" ,
1289
+ generateErrorMesssageInfo (),
1290
+ )
1291
+ }
1292
+
1293
+ // We were able to find all of the expected events!
1294
+ if len (workingExpectedEventIDMap ) == 0 {
1295
+ return
1296
+ }
1297
+ }
1298
+ }
1299
+
1095
1300
func historicalEventFilter (r gjson.Result ) bool {
1096
1301
// This includes messages, insertion, batch, and marker events because we
1097
1302
// include the historical field on all of them.
@@ -1169,9 +1374,10 @@ func sendMarkerAndEnsureBackfilled(t *testing.T, as *client.CSAPI, c *client.CSA
1169
1374
markerInsertionContentField : insertionEventID ,
1170
1375
},
1171
1376
}
1172
- // We can't use as.SendEventSynced(...) because application services can't use the /sync API
1173
- txnId := getTxnID ("sendMarkerAndEnsureBackfilled-txn" )
1174
- markerSendRes := as .MustDoFunc (t , "PUT" , []string {"_matrix" , "client" , "r0" , "rooms" , roomID , "send" , markerEvent .Type , txnId }, client .WithJSONBody (t , markerEvent .Content ))
1377
+ // Marker events should have unique state_key so they all show up in the current state to process.
1378
+ unique_state_key := getTxnID ("marker_state_key" )
1379
+ // We can't use as.SendEventSynced(...) because application services can't use the /sync API.
1380
+ markerSendRes := as .MustDoFunc (t , "PUT" , []string {"_matrix" , "client" , "r0" , "rooms" , roomID , "state" , markerEvent .Type , unique_state_key }, client .WithJSONBody (t , markerEvent .Content ))
1175
1381
markerSendBody := client .ParseJSON (t , markerSendRes )
1176
1382
markerEventID = client .GetJSONFieldStr (t , markerSendBody , "event_id" )
1177
1383
0 commit comments