@@ -251,125 +251,151 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
251
251
waitingSyncRequests .Inc ()
252
252
defer waitingSyncRequests .Dec ()
253
253
254
- currentPos := rp .Notifier .CurrentPosition ()
255
-
256
- if ! rp .shouldReturnImmediately (syncReq , currentPos ) {
257
- timer := time .NewTimer (syncReq .Timeout ) // case of timeout=0 is handled above
258
- defer timer .Stop ()
259
-
260
- userStreamListener := rp .Notifier .GetListener (* syncReq )
261
- defer userStreamListener .Close ()
262
-
263
- giveup := func () util.JSONResponse {
264
- syncReq .Log .Debugln ("Responding to sync since client gave up or timeout was reached" )
265
- syncReq .Response .NextBatch = syncReq .Since
266
- // We should always try to include OTKs in sync responses, otherwise clients might upload keys
267
- // even if that's not required. See also:
268
- // https://github.com/matrix-org/synapse/blob/29f06704b8871a44926f7c99e73cf4a978fb8e81/synapse/rest/client/sync.py#L276-L281
269
- // Only try to get OTKs if the context isn't already done.
270
- if syncReq .Context .Err () == nil {
271
- err = internal .DeviceOTKCounts (syncReq .Context , rp .keyAPI , syncReq .Device .UserID , syncReq .Device .ID , syncReq .Response )
272
- if err != nil && err != context .Canceled {
273
- syncReq .Log .WithError (err ).Warn ("failed to get OTK counts" )
254
+ // loop until we get some data
255
+ for {
256
+ startTime := time .Now ()
257
+ currentPos := rp .Notifier .CurrentPosition ()
258
+
259
+ // if the since token matches the current positions, wait via the notifier
260
+ if ! rp .shouldReturnImmediately (syncReq , currentPos ) {
261
+ timer := time .NewTimer (syncReq .Timeout ) // case of timeout=0 is handled above
262
+ defer timer .Stop ()
263
+
264
+ userStreamListener := rp .Notifier .GetListener (* syncReq )
265
+ defer userStreamListener .Close ()
266
+
267
+ giveup := func () util.JSONResponse {
268
+ syncReq .Log .Debugln ("Responding to sync since client gave up or timeout was reached" )
269
+ syncReq .Response .NextBatch = syncReq .Since
270
+ // We should always try to include OTKs in sync responses, otherwise clients might upload keys
271
+ // even if that's not required. See also:
272
+ // https://github.com/matrix-org/synapse/blob/29f06704b8871a44926f7c99e73cf4a978fb8e81/synapse/rest/client/sync.py#L276-L281
273
+ // Only try to get OTKs if the context isn't already done.
274
+ if syncReq .Context .Err () == nil {
275
+ err = internal .DeviceOTKCounts (syncReq .Context , rp .keyAPI , syncReq .Device .UserID , syncReq .Device .ID , syncReq .Response )
276
+ if err != nil && err != context .Canceled {
277
+ syncReq .Log .WithError (err ).Warn ("failed to get OTK counts" )
278
+ }
279
+ }
280
+ return util.JSONResponse {
281
+ Code : http .StatusOK ,
282
+ JSON : syncReq .Response ,
274
283
}
275
284
}
276
- return util.JSONResponse {
277
- Code : http .StatusOK ,
278
- JSON : syncReq .Response ,
279
- }
280
- }
281
285
282
- select {
283
- case <- syncReq .Context .Done (): // Caller gave up
284
- return giveup ()
286
+ select {
287
+ case <- syncReq .Context .Done (): // Caller gave up
288
+ return giveup ()
285
289
286
- case <- timer .C : // Timeout reached
287
- return giveup ()
290
+ case <- timer .C : // Timeout reached
291
+ return giveup ()
288
292
289
- case <- userStreamListener .GetNotifyChannel (syncReq .Since ):
290
- syncReq .Log .Debugln ("Responding to sync after wake-up" )
291
- currentPos .ApplyUpdates (userStreamListener .GetSyncPosition ())
293
+ case <- userStreamListener .GetNotifyChannel (syncReq .Since ):
294
+ syncReq .Log .Debugln ("Responding to sync after wake-up" )
295
+ currentPos .ApplyUpdates (userStreamListener .GetSyncPosition ())
296
+ }
297
+ } else {
298
+ syncReq .Log .WithField ("currentPos" , currentPos ).Debugln ("Responding to sync immediately" )
292
299
}
293
- } else {
294
- syncReq .Log .WithField ("currentPos" , currentPos ).Debugln ("Responding to sync immediately" )
295
- }
296
300
297
- if syncReq .Since .IsEmpty () {
298
- // Complete sync
299
- syncReq .Response .NextBatch = types.StreamingToken {
300
- PDUPosition : rp .streams .PDUStreamProvider .CompleteSync (
301
- syncReq .Context , syncReq ,
302
- ),
303
- TypingPosition : rp .streams .TypingStreamProvider .CompleteSync (
304
- syncReq .Context , syncReq ,
305
- ),
306
- ReceiptPosition : rp .streams .ReceiptStreamProvider .CompleteSync (
307
- syncReq .Context , syncReq ,
308
- ),
309
- InvitePosition : rp .streams .InviteStreamProvider .CompleteSync (
310
- syncReq .Context , syncReq ,
311
- ),
312
- SendToDevicePosition : rp .streams .SendToDeviceStreamProvider .CompleteSync (
313
- syncReq .Context , syncReq ,
314
- ),
315
- AccountDataPosition : rp .streams .AccountDataStreamProvider .CompleteSync (
316
- syncReq .Context , syncReq ,
317
- ),
318
- NotificationDataPosition : rp .streams .NotificationDataStreamProvider .CompleteSync (
319
- syncReq .Context , syncReq ,
320
- ),
321
- DeviceListPosition : rp .streams .DeviceListStreamProvider .CompleteSync (
322
- syncReq .Context , syncReq ,
323
- ),
324
- PresencePosition : rp .streams .PresenceStreamProvider .CompleteSync (
325
- syncReq .Context , syncReq ,
326
- ),
327
- }
328
- } else {
329
- // Incremental sync
330
- syncReq .Response .NextBatch = types.StreamingToken {
331
- PDUPosition : rp .streams .PDUStreamProvider .IncrementalSync (
332
- syncReq .Context , syncReq ,
333
- syncReq .Since .PDUPosition , currentPos .PDUPosition ,
334
- ),
335
- TypingPosition : rp .streams .TypingStreamProvider .IncrementalSync (
336
- syncReq .Context , syncReq ,
337
- syncReq .Since .TypingPosition , currentPos .TypingPosition ,
338
- ),
339
- ReceiptPosition : rp .streams .ReceiptStreamProvider .IncrementalSync (
340
- syncReq .Context , syncReq ,
341
- syncReq .Since .ReceiptPosition , currentPos .ReceiptPosition ,
342
- ),
343
- InvitePosition : rp .streams .InviteStreamProvider .IncrementalSync (
344
- syncReq .Context , syncReq ,
345
- syncReq .Since .InvitePosition , currentPos .InvitePosition ,
346
- ),
347
- SendToDevicePosition : rp .streams .SendToDeviceStreamProvider .IncrementalSync (
348
- syncReq .Context , syncReq ,
349
- syncReq .Since .SendToDevicePosition , currentPos .SendToDevicePosition ,
350
- ),
351
- AccountDataPosition : rp .streams .AccountDataStreamProvider .IncrementalSync (
352
- syncReq .Context , syncReq ,
353
- syncReq .Since .AccountDataPosition , currentPos .AccountDataPosition ,
354
- ),
355
- NotificationDataPosition : rp .streams .NotificationDataStreamProvider .IncrementalSync (
356
- syncReq .Context , syncReq ,
357
- syncReq .Since .NotificationDataPosition , currentPos .NotificationDataPosition ,
358
- ),
359
- DeviceListPosition : rp .streams .DeviceListStreamProvider .IncrementalSync (
360
- syncReq .Context , syncReq ,
361
- syncReq .Since .DeviceListPosition , currentPos .DeviceListPosition ,
362
- ),
363
- PresencePosition : rp .streams .PresenceStreamProvider .IncrementalSync (
364
- syncReq .Context , syncReq ,
365
- syncReq .Since .PresencePosition , currentPos .PresencePosition ,
366
- ),
301
+ if syncReq .Since .IsEmpty () {
302
+ // Complete sync
303
+ syncReq .Response .NextBatch = types.StreamingToken {
304
+ PDUPosition : rp .streams .PDUStreamProvider .CompleteSync (
305
+ syncReq .Context , syncReq ,
306
+ ),
307
+ TypingPosition : rp .streams .TypingStreamProvider .CompleteSync (
308
+ syncReq .Context , syncReq ,
309
+ ),
310
+ ReceiptPosition : rp .streams .ReceiptStreamProvider .CompleteSync (
311
+ syncReq .Context , syncReq ,
312
+ ),
313
+ InvitePosition : rp .streams .InviteStreamProvider .CompleteSync (
314
+ syncReq .Context , syncReq ,
315
+ ),
316
+ SendToDevicePosition : rp .streams .SendToDeviceStreamProvider .CompleteSync (
317
+ syncReq .Context , syncReq ,
318
+ ),
319
+ AccountDataPosition : rp .streams .AccountDataStreamProvider .CompleteSync (
320
+ syncReq .Context , syncReq ,
321
+ ),
322
+ NotificationDataPosition : rp .streams .NotificationDataStreamProvider .CompleteSync (
323
+ syncReq .Context , syncReq ,
324
+ ),
325
+ DeviceListPosition : rp .streams .DeviceListStreamProvider .CompleteSync (
326
+ syncReq .Context , syncReq ,
327
+ ),
328
+ PresencePosition : rp .streams .PresenceStreamProvider .CompleteSync (
329
+ syncReq .Context , syncReq ,
330
+ ),
331
+ }
332
+ } else {
333
+ // Incremental sync
334
+ syncReq .Response .NextBatch = types.StreamingToken {
335
+ PDUPosition : rp .streams .PDUStreamProvider .IncrementalSync (
336
+ syncReq .Context , syncReq ,
337
+ syncReq .Since .PDUPosition , currentPos .PDUPosition ,
338
+ ),
339
+ TypingPosition : rp .streams .TypingStreamProvider .IncrementalSync (
340
+ syncReq .Context , syncReq ,
341
+ syncReq .Since .TypingPosition , currentPos .TypingPosition ,
342
+ ),
343
+ ReceiptPosition : rp .streams .ReceiptStreamProvider .IncrementalSync (
344
+ syncReq .Context , syncReq ,
345
+ syncReq .Since .ReceiptPosition , currentPos .ReceiptPosition ,
346
+ ),
347
+ InvitePosition : rp .streams .InviteStreamProvider .IncrementalSync (
348
+ syncReq .Context , syncReq ,
349
+ syncReq .Since .InvitePosition , currentPos .InvitePosition ,
350
+ ),
351
+ SendToDevicePosition : rp .streams .SendToDeviceStreamProvider .IncrementalSync (
352
+ syncReq .Context , syncReq ,
353
+ syncReq .Since .SendToDevicePosition , currentPos .SendToDevicePosition ,
354
+ ),
355
+ AccountDataPosition : rp .streams .AccountDataStreamProvider .IncrementalSync (
356
+ syncReq .Context , syncReq ,
357
+ syncReq .Since .AccountDataPosition , currentPos .AccountDataPosition ,
358
+ ),
359
+ NotificationDataPosition : rp .streams .NotificationDataStreamProvider .IncrementalSync (
360
+ syncReq .Context , syncReq ,
361
+ syncReq .Since .NotificationDataPosition , currentPos .NotificationDataPosition ,
362
+ ),
363
+ DeviceListPosition : rp .streams .DeviceListStreamProvider .IncrementalSync (
364
+ syncReq .Context , syncReq ,
365
+ syncReq .Since .DeviceListPosition , currentPos .DeviceListPosition ,
366
+ ),
367
+ PresencePosition : rp .streams .PresenceStreamProvider .IncrementalSync (
368
+ syncReq .Context , syncReq ,
369
+ syncReq .Since .PresencePosition , currentPos .PresencePosition ,
370
+ ),
371
+ }
372
+ // it's possible for there to be no updates for this user even though since < current pos,
373
+ // e.g busy servers with a quiet user. In this scenario, we don't want to return a no-op
374
+ // response immediately, so let's try this again but pretend they bumped their since token.
375
+ // If the incremental sync was processed very quickly then we expect the next loop to block
376
+ // with a notifier, but if things are slow it's entirely possible that currentPos is no
377
+ // longer the current position so we will hit this code path again. We need to do this and
378
+ // not return a no-op response because:
379
+ // - It's an inefficient use of bandwidth.
380
+ // - Some sytests which test 'waking up' sync rely on some sync requests to block, which
381
+ // they weren't always doing, resulting in flakey tests.
382
+ if ! syncReq .Response .HasUpdates () {
383
+ syncReq .Since = currentPos
384
+ // do not loop again if the ?timeout= is 0 as that means "return immediately"
385
+ if syncReq .Timeout > 0 {
386
+ syncReq .Timeout = syncReq .Timeout - time .Since (startTime )
387
+ if syncReq .Timeout < 0 {
388
+ syncReq .Timeout = 0
389
+ }
390
+ continue
391
+ }
392
+ }
367
393
}
368
- }
369
394
370
- return util.JSONResponse {
371
- Code : http .StatusOK ,
372
- JSON : syncReq .Response ,
395
+ return util.JSONResponse {
396
+ Code : http .StatusOK ,
397
+ JSON : syncReq .Response ,
398
+ }
373
399
}
374
400
}
375
401
0 commit comments