@@ -36,7 +36,10 @@ use async_stream::stream;
36
36
use futures_core:: stream:: Stream ;
37
37
pub use matrix_sdk_base:: sliding_sync:: http;
38
38
use matrix_sdk_common:: { ring_buffer:: RingBuffer , timer} ;
39
- use ruma:: { api:: client:: error:: ErrorKind , assign, OwnedEventId , OwnedRoomId , RoomId } ;
39
+ use ruma:: {
40
+ api:: { client:: error:: ErrorKind , OutgoingRequest } ,
41
+ assign, OwnedEventId , OwnedRoomId , RoomId ,
42
+ } ;
40
43
use serde:: { Deserialize , Serialize } ;
41
44
use tokio:: {
42
45
select, spawn,
@@ -536,127 +539,152 @@ impl SlidingSync {
536
539
537
540
#[ instrument( skip_all, fields( pos) ) ]
538
541
async fn sync_once ( & self ) -> Result < UpdateSummary > {
539
- let ( request, request_config, mut position_guard) =
542
+ let ( request, request_config, position_guard) =
540
543
self . generate_sync_request ( & mut LazyTransactionId :: new ( ) ) . await ?;
541
544
542
545
// The code manipulates `Request` and `Response` from Simplified MSC3575
543
- // because it's simpler. If the `experimental-not-simplified-sliding-sync`
544
- // feature flag is turned on, we transform the Simplified MSC3575 `Request`
545
- // into a MSC3575 `Request`.
546
- #[ cfg( feature = "experimental-not-simplified-sliding-sync" ) ]
547
- let request: http:: msc3575:: Request = http:: From :: from ( request) ;
548
-
549
- debug ! ( "Sending request" ) ;
550
-
551
- // Prepare the request.
552
- let request =
553
- self . inner . client . send ( request, Some ( request_config) ) . with_homeserver_override (
554
- self . inner . sliding_sync_proxy . as_ref ( ) . map ( ToString :: to_string) ,
555
- ) ;
546
+ // because it's the future standard. If
547
+ // `Client::is_simplified_sliding_sync_enabled` is turned off, the
548
+ // Simplified MSC3575 `Request` must be transformed into a MSC3575 `Request`.
549
+ return if !self . inner . client . is_simplified_sliding_sync_enabled ( ) {
550
+ send (
551
+ self ,
552
+ http:: Into :: < http:: msc3575:: Request > :: into ( request) ,
553
+ request_config,
554
+ position_guard,
555
+ )
556
+ . await
557
+ } else {
558
+ send ( self , request, request_config, position_guard) . await
559
+ } ;
556
560
557
- // Send the request and get a response with end-to-end encryption support.
558
- //
559
- // Sending the `/sync` request out when end-to-end encryption is enabled means
560
- // that we need to also send out any outgoing e2ee related request out
561
- // coming from the `OlmMachine::outgoing_requests()` method.
561
+ // The sending logic. It takes a generic `Request` because it can be a
562
+ // Simplified MSC3575 or a MSC3575 `Request`.
563
+ async fn send < Request > (
564
+ this : & SlidingSync ,
565
+ request : Request ,
566
+ request_config : RequestConfig ,
567
+ mut position_guard : OwnedMutexGuard < SlidingSyncPositionMarkers > ,
568
+ ) -> Result < UpdateSummary >
569
+ where
570
+ Request : OutgoingRequest + Clone + Debug + Send + Sync + ' static ,
571
+ Request :: IncomingResponse : Send
572
+ + Sync
573
+ +
574
+ // This is required to get back a Simplified MSC3575 `Response` whatever the
575
+ // `Request` type.
576
+ http:: Into < http:: Response > ,
577
+ crate :: HttpError : From < ruma:: api:: error:: FromHttpResponseError < Request :: EndpointError > > ,
578
+ {
579
+ debug ! ( "Sending request" ) ;
562
580
563
- #[ cfg( feature = "e2e-encryption" ) ]
564
- let response = {
565
- if self . is_e2ee_enabled ( ) {
566
- // Here, we need to run 2 things:
567
- //
568
- // 1. Send the sliding sync request and get a response,
569
- // 2. Send the E2EE requests.
570
- //
571
- // We don't want to use a `join` or `try_join` because we want to fail if and
572
- // only if sending the sliding sync request fails. Failing to send the E2EE
573
- // requests should just result in a log.
574
- //
575
- // We also want to give the priority to sliding sync request. E2EE requests are
576
- // sent concurrently to the sliding sync request, but the priority is on waiting
577
- // a sliding sync response.
578
- //
579
- // If sending sliding sync request fails, the sending of E2EE requests must be
580
- // aborted as soon as possible.
581
+ // Prepare the request.
582
+ let request =
583
+ this. inner . client . send ( request, Some ( request_config) ) . with_homeserver_override (
584
+ this. inner . sliding_sync_proxy . as_ref ( ) . map ( ToString :: to_string) ,
585
+ ) ;
581
586
582
- let client = self . inner . client . clone ( ) ;
583
- let e2ee_uploads = spawn ( async move {
584
- if let Err ( error) = client. send_outgoing_requests ( ) . await {
585
- error ! ( ?error, "Error while sending outgoing E2EE requests" ) ;
586
- }
587
- } )
588
- // Ensure that the task is not running in detached mode. It is aborted when it's
589
- // dropped.
590
- . abort_on_drop ( ) ;
591
-
592
- // Wait on the sliding sync request success or failure early.
593
- let response = request. await ?;
594
-
595
- // At this point, if `request` has been resolved successfully, we wait on
596
- // `e2ee_uploads`. It did run concurrently, so it should not be blocking for too
597
- // long. Otherwise —if `request` has failed— `e2ee_uploads` has
598
- // been dropped, so aborted.
599
- e2ee_uploads. await . map_err ( |error| Error :: JoinError {
600
- task_description : "e2ee_uploads" . to_owned ( ) ,
601
- error,
602
- } ) ?;
603
-
604
- response
605
- } else {
606
- request. await ?
607
- }
608
- } ;
587
+ // Send the request and get a response with end-to-end encryption support.
588
+ //
589
+ // Sending the `/sync` request out when end-to-end encryption is enabled means
590
+ // that we need to also send out any outgoing e2ee related request out
591
+ // coming from the `OlmMachine::outgoing_requests()` method.
609
592
610
- // Send the request and get a response _without_ end-to-end encryption support.
611
- #[ cfg( not( feature = "e2e-encryption" ) ) ]
612
- let response = request. await ?;
593
+ #[ cfg( feature = "e2e-encryption" ) ]
594
+ let response = {
595
+ if this. is_e2ee_enabled ( ) {
596
+ // Here, we need to run 2 things:
597
+ //
598
+ // 1. Send the sliding sync request and get a response,
599
+ // 2. Send the E2EE requests.
600
+ //
601
+ // We don't want to use a `join` or `try_join` because we want to fail if and
602
+ // only if sending the sliding sync request fails. Failing to send the E2EE
603
+ // requests should just result in a log.
604
+ //
605
+ // We also want to give the priority to sliding sync request. E2EE requests are
606
+ // sent concurrently to the sliding sync request, but the priority is on waiting
607
+ // a sliding sync response.
608
+ //
609
+ // If sending sliding sync request fails, the sending of E2EE requests must be
610
+ // aborted as soon as possible.
611
+
612
+ let client = this. inner . client . clone ( ) ;
613
+ let e2ee_uploads = spawn ( async move {
614
+ if let Err ( error) = client. send_outgoing_requests ( ) . await {
615
+ error ! ( ?error, "Error while sending outgoing E2EE requests" ) ;
616
+ }
617
+ } )
618
+ // Ensure that the task is not running in detached mode. It is aborted when it's
619
+ // dropped.
620
+ . abort_on_drop ( ) ;
621
+
622
+ // Wait on the sliding sync request success or failure early.
623
+ let response = request. await ?;
624
+
625
+ // At this point, if `request` has been resolved successfully, we wait on
626
+ // `e2ee_uploads`. It did run concurrently, so it should not be blocking for too
627
+ // long. Otherwise —if `request` has failed— `e2ee_uploads` has
628
+ // been dropped, so aborted.
629
+ e2ee_uploads. await . map_err ( |error| Error :: JoinError {
630
+ task_description : "e2ee_uploads" . to_owned ( ) ,
631
+ error,
632
+ } ) ?;
633
+
634
+ response
635
+ } else {
636
+ request. await ?
637
+ }
638
+ } ;
613
639
614
- // The code manipulates `Request` and `Response` from Simplified MSC3575
615
- // because it's simpler. If the `experimental-not-simplified-sliding-sync`
616
- // feature flag is turned on, the Simplified MSC3575 `Request` has been
617
- // transformed into a MSC3575 `Request` a couple of lines above. Now it's
618
- // time to do the opposite: transform the MSC3575 `Response` into a
619
- // Simplified MSC3575 `Response`.
620
- #[ cfg( feature = "experimental-not-simplified-sliding-sync" ) ]
621
- let response: http:: simplified_msc3575:: Response = http:: From :: from ( response) ;
640
+ // Send the request and get a response _without_ end-to-end encryption support.
641
+ #[ cfg( not( feature = "e2e-encryption" ) ) ]
642
+ let response = request. await ?;
622
643
623
- debug ! ( "Received response" ) ;
644
+ // The code manipulates `Request` and `Response` from Simplified MSC3575 because
645
+ // it's the future standard. But this function may have received a `Request`
646
+ // from Simplified MSC3575 or MSC3575. We need to get back a
647
+ // Simplified MSC3575 `Response`.
648
+ let response = http:: Into :: < http:: simplified_msc3575:: Response > :: into ( response) ;
624
649
625
- // At this point, the request has been sent, and a response has been received.
626
- //
627
- // We must ensure the handling of the response cannot be stopped/
628
- // cancelled. It must be done entirely, otherwise we can have
629
- // corrupted/incomplete states for Sliding Sync and other parts of
630
- // the code.
631
- //
632
- // That's why we are running the handling of the response in a spawned
633
- // future that cannot be cancelled by anything.
634
- let this = self . clone ( ) ;
650
+ debug ! ( "Received response" ) ;
651
+
652
+ // At this point, the request has been sent, and a response has been received.
653
+ //
654
+ // We must ensure the handling of the response cannot be stopped/
655
+ // cancelled. It must be done entirely, otherwise we can have
656
+ // corrupted/incomplete states for Sliding Sync and other parts of
657
+ // the code.
658
+ //
659
+ // That's why we are running the handling of the response in a spawned
660
+ // future that cannot be cancelled by anything.
661
+ let this = this. clone ( ) ;
635
662
636
- // Spawn a new future to ensure that the code inside this future cannot be
637
- // cancelled if this method is cancelled.
638
- let future = async move {
639
- debug ! ( "Start handling response" ) ;
663
+ // Spawn a new future to ensure that the code inside this future cannot be
664
+ // cancelled if this method is cancelled.
665
+ let future = async move {
666
+ debug ! ( "Start handling response" ) ;
640
667
641
- // In case the task running this future is detached, we must
642
- // ensure responses are handled one at a time. At this point we still own
643
- // `position_guard`, so we're fine.
668
+ // In case the task running this future is detached, we must
669
+ // ensure responses are handled one at a time. At this point we still own
670
+ // `position_guard`, so we're fine.
644
671
645
- // Handle the response.
646
- let updates = this. handle_response ( response, & mut position_guard) . await ?;
672
+ // Handle the response.
673
+ let updates = this. handle_response ( response, & mut position_guard) . await ?;
647
674
648
- this. cache_to_storage ( & position_guard) . await ?;
675
+ this. cache_to_storage ( & position_guard) . await ?;
649
676
650
- // Release the position guard lock.
651
- // It means that other responses can be generated and then handled later.
652
- drop ( position_guard) ;
677
+ // Release the position guard lock.
678
+ // It means that other responses can be generated and then handled later.
679
+ drop ( position_guard) ;
653
680
654
- debug ! ( "Done handling response" ) ;
681
+ debug ! ( "Done handling response" ) ;
655
682
656
- Ok ( updates)
657
- } ;
683
+ Ok ( updates)
684
+ } ;
658
685
659
- spawn ( future. instrument ( Span :: current ( ) ) ) . await . unwrap ( )
686
+ spawn ( future. instrument ( Span :: current ( ) ) ) . await . unwrap ( )
687
+ }
660
688
}
661
689
662
690
/// Create a _new_ Sliding Sync sync loop.
0 commit comments