Skip to content

Commit 3b3d383

Browse files
committed
Add test for multiple marker state
Also includes `paginateUntilMessageCheckOff` from #214
1 parent b53ec21 commit 3b3d383

File tree

1 file changed

+191
-17
lines changed

1 file changed

+191
-17
lines changed

tests/msc2716_test.go

+191-17
Original file line numberDiff line numberDiff line change
@@ -919,7 +919,7 @@ func TestImportHistoricalMessages(t *testing.T) {
919919
})
920920
})
921921

922-
t.Run("Historical messages show up for remote federated homeserver even the homeserver is missing the part of the timeline where the marker was sent and it paginates before it occured", func(t *testing.T) {
922+
t.Run("Historical messages show up for remote federated homeserver even when the homeserver is missing the part of the timeline where the marker was sent and it paginates before it occured", func(t *testing.T) {
923923
t.Parallel()
924924

925925
roomID := as.CreateRoom(t, createPublicRoomOpts)
@@ -964,7 +964,11 @@ func TestImportHistoricalMessages(t *testing.T) {
964964
baseInsertionEventID := client.GetJSONFieldStr(t, batchSendResBody, "base_insertion_event_id")
965965

966966
// Send the marker event which lets remote homeservers know there are
967-
// some historical messages back at the given insertion event.
967+
// some historical messages back at the given insertion event. We
968+
// purposely use the local user Alice here as remoteCharlie isn't even
969+
// in the room at this point in time and even if they were, the purpose
970+
// of this test is to make sure the remote-join will pick up the state,
971+
// not our backfill here.
968972
sendMarkerAndEnsureBackfilled(t, as, alice, roomID, baseInsertionEventID)
969973

970974
// Add some events after the marker so that remoteCharlie doesn't see the marker
@@ -991,24 +995,97 @@ func TestImportHistoricalMessages(t *testing.T) {
991995
// We don't want to use `validateBatchSendRes(t, remoteCharlie, roomID,
992996
// batchSendRes, false)` here because it tests against the full message
993997
// response and we need to skip past the marker in the timeline.
994-
messagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{
995-
"dir": []string{"b"},
996-
"limit": []string{"100"},
997-
"from": []string{paginationTokenBeforeMarker},
998-
}))
998+
paginateUntilMessageCheckOff(t, remoteCharlie, roomID, paginationTokenBeforeMarker, historicalEventIDs, []string{})
999+
})
9991000

1000-
// Make sure all of the historical events are present
1001-
must.MatchResponse(t, messagesRes, match.HTTPResponse{
1002-
JSON: []match.JSON{
1003-
match.JSONCheckOffAllowUnwanted("chunk", makeInterfaceSlice(historicalEventIDs), func(r gjson.Result) interface{} {
1004-
return r.Get("event_id").Str
1005-
}, nil),
1006-
},
1001+
t.Run("Historical messages show up for remote federated homeserver even when the homeserver is missing the part of the timeline where multiple marker events were sent and it paginates before they occured", func(t *testing.T) {
1002+
t.Parallel()
1003+
1004+
roomID := as.CreateRoom(t, createPublicRoomOpts)
1005+
alice.JoinRoom(t, roomID, nil)
1006+
1007+
// Anything above 1 here should be sufficient to test whether we can
1008+
// follow the state and previous state all the way up to injest all of
1009+
// the marker events along the way
1010+
numBatches := 2
1011+
1012+
eventIDsBefore := createMessagesInRoom(t, alice, roomID, numBatches, "eventIDsBefore")
1013+
timeAfterEventBefore := time.Now()
1014+
1015+
eventIDsAfter := createMessagesInRoom(t, alice, roomID, 3, "eventIDsAfter")
1016+
eventIDAfter := eventIDsAfter[0]
1017+
1018+
// Join the room from a remote homeserver before the historical messages were sent
1019+
remoteCharlie.JoinRoom(t, roomID, []string{"hs1"})
1020+
1021+
// Make sure all of the events have been backfilled for the remote user
1022+
// before we leave the room
1023+
fetchUntilMessagesResponseHas(t, remoteCharlie, roomID, func(ev gjson.Result) bool {
1024+
if ev.Get("event_id").Str == eventIDsBefore[0] {
1025+
return true
1026+
}
1027+
1028+
return false
10071029
})
1008-
})
10091030

1010-
t.Run("TODO: do multiple marker events as state to see if homeserver can follow the state update chain and get all history", func(t *testing.T) {
1011-
t.Skip("Skipping until I write this test")
1031+
// Leave before the historical messages are imported
1032+
remoteCharlie.LeaveRoom(t, roomID)
1033+
1034+
var expectedEventIDs []string
1035+
for i := 0; i < numBatches; i++ {
1036+
// Create separate disconnected batches
1037+
batchSendRes := batchSendHistoricalMessages(
1038+
t,
1039+
as,
1040+
roomID,
1041+
eventIDsBefore[i],
1042+
"",
1043+
createJoinStateEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore),
1044+
createMessageEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore, 2),
1045+
// Status
1046+
200,
1047+
)
1048+
batchSendResBody := client.ParseJSON(t, batchSendRes)
1049+
historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids")
1050+
baseInsertionEventID := client.GetJSONFieldStr(t, batchSendResBody, "base_insertion_event_id")
1051+
1052+
// Store the historical events we will expect to see later
1053+
expectedEventIDs = append(expectedEventIDs, historicalEventIDs...)
1054+
1055+
// Send the marker event which lets remote homeservers know there are
1056+
// some historical messages back at the given insertion event. We
1057+
// purposely use the local user Alice here as remoteCharlie isn't even
1058+
// in the room at this point in time and even if they were, the purpose
1059+
// of this test is to make sure the remote-join will pick up the state,
1060+
// not our backfill here.
1061+
sendMarkerAndEnsureBackfilled(t, as, alice, roomID, baseInsertionEventID)
1062+
}
1063+
1064+
// Add some events after the marker so that remoteCharlie doesn't see the marker
1065+
createMessagesInRoom(t, alice, roomID, 3, "eventIDFiller")
1066+
1067+
// Join the room from a remote homeserver after the historical messages were sent
1068+
remoteCharlie.JoinRoom(t, roomID, []string{"hs1"})
1069+
1070+
// From the remote user, make a /context request for eventIDAfter to get
1071+
// pagination token before the marker event
1072+
contextRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "context", eventIDAfter}, client.WithContentType("application/json"), client.WithQueries(url.Values{
1073+
"limit": []string{"0"},
1074+
}))
1075+
contextResResBody := client.ParseJSON(t, contextRes)
1076+
paginationTokenBeforeMarker := client.GetJSONFieldStr(t, contextResResBody, "end")
1077+
1078+
// Start the /messages request from that pagination token which
1079+
// jumps/skips over the marker event in the timeline. This is the key
1080+
// part of the test. We want to make sure that new marker state can be
1081+
// injested and processed to reveal the imported history after a
1082+
// remote-join without paginating and backfilling over the spot in the
1083+
// timeline with the marker event.
1084+
//
1085+
// We don't want to use `validateBatchSendRes(t, remoteCharlie, roomID,
1086+
// batchSendRes, false)` here because it tests against the full message
1087+
// response and we need to skip past the marker in the timeline.
1088+
paginateUntilMessageCheckOff(t, remoteCharlie, roomID, paginationTokenBeforeMarker, expectedEventIDs, []string{})
10121089
})
10131090
})
10141091

@@ -1184,6 +1261,103 @@ func fetchUntilMessagesResponseHas(t *testing.T, c *client.CSAPI, roomID string,
11841261
}
11851262
}
11861263

1264+
// Paginate the /messages endpoint until we find all of the expectedEventIds
1265+
// (order does not matter). If any event in denyListEventIDs is found, an error
1266+
// will be thrown.
1267+
func paginateUntilMessageCheckOff(t *testing.T, c *client.CSAPI, roomID string, fromPaginationToken string, expectedEventIDs []string, denyListEventIDs []string) {
1268+
t.Helper()
1269+
start := time.Now()
1270+
1271+
workingExpectedEventIDMap := make(map[string]string)
1272+
for _, expectedEventID := range expectedEventIDs {
1273+
workingExpectedEventIDMap[expectedEventID] = expectedEventID
1274+
}
1275+
1276+
denyEventIDMap := make(map[string]string)
1277+
for _, denyEventID := range denyListEventIDs {
1278+
denyEventIDMap[denyEventID] = denyEventID
1279+
}
1280+
1281+
var actualEventIDList []string
1282+
callCounter := 0
1283+
messageResEnd := fromPaginationToken
1284+
generateErrorMesssageInfo := func() string {
1285+
i := 0
1286+
leftoverEventIDs := make([]string, len(workingExpectedEventIDMap))
1287+
for eventID := range workingExpectedEventIDMap {
1288+
leftoverEventIDs[i] = eventID
1289+
i++
1290+
}
1291+
1292+
return fmt.Sprintf("Called /messages %d times but only found %d/%d expected messages. Leftover messages we expected (%d): %s. We saw %d events over all of the API calls: %s",
1293+
callCounter,
1294+
len(expectedEventIDs)-len(leftoverEventIDs),
1295+
len(expectedEventIDs),
1296+
len(leftoverEventIDs),
1297+
leftoverEventIDs,
1298+
len(actualEventIDList),
1299+
actualEventIDList,
1300+
)
1301+
}
1302+
1303+
for {
1304+
if time.Since(start) > 200*c.SyncUntilTimeout {
1305+
t.Fatalf(
1306+
"paginateUntilMessageCheckOff timed out. %s",
1307+
generateErrorMesssageInfo(),
1308+
)
1309+
}
1310+
1311+
messagesRes := c.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{
1312+
"dir": []string{"b"},
1313+
"limit": []string{"100"},
1314+
"from": []string{messageResEnd},
1315+
}))
1316+
callCounter++
1317+
messsageResBody := client.ParseJSON(t, messagesRes)
1318+
messageResEnd = client.GetJSONFieldStr(t, messsageResBody, "end")
1319+
// Since the original body can only be read once, create a new one from the body bytes we just read
1320+
messagesRes.Body = ioutil.NopCloser(bytes.NewBuffer(messsageResBody))
1321+
1322+
foundEventInMessageResponse := false
1323+
must.MatchResponse(t, messagesRes, match.HTTPResponse{
1324+
JSON: []match.JSON{
1325+
match.JSONArrayEach("chunk", func(ev gjson.Result) error {
1326+
foundEventInMessageResponse = true
1327+
eventID := ev.Get("event_id").Str
1328+
actualEventIDList = append(actualEventIDList, eventID)
1329+
1330+
if _, keyExists := denyEventIDMap[eventID]; keyExists {
1331+
return fmt.Errorf(
1332+
"paginateUntilMessageCheckOff found unexpected message=%s in deny list while paginating. %s",
1333+
eventID,
1334+
generateErrorMesssageInfo(),
1335+
)
1336+
}
1337+
1338+
if _, keyExists := workingExpectedEventIDMap[eventID]; keyExists {
1339+
delete(workingExpectedEventIDMap, eventID)
1340+
}
1341+
1342+
return nil
1343+
}),
1344+
},
1345+
})
1346+
1347+
if !foundEventInMessageResponse {
1348+
t.Fatalf(
1349+
"paginateUntilMessageCheckOff reached the end of the messages without finding all expected events. %s",
1350+
generateErrorMesssageInfo(),
1351+
)
1352+
}
1353+
1354+
// We were able to find all of the expected events!
1355+
if len(workingExpectedEventIDMap) == 0 {
1356+
return
1357+
}
1358+
}
1359+
}
1360+
11871361
func historicalEventFilter(r gjson.Result) bool {
11881362
// This includes messages, insertion, batch, and marker events because we
11891363
// include the historical field on all of them.

0 commit comments

Comments
 (0)