@@ -56,7 +56,7 @@ use self::{
56
56
client:: SlidingSyncResponseProcessor ,
57
57
sticky_parameters:: { LazyTransactionId , SlidingSyncStickyManager , StickyData } ,
58
58
} ;
59
- use crate :: { config:: RequestConfig , Client , Result } ;
59
+ use crate :: { config:: RequestConfig , Client , HttpError , Result } ;
60
60
61
61
/// The Sliding Sync instance.
62
62
///
@@ -518,6 +518,136 @@ impl SlidingSync {
518
518
) )
519
519
}
520
520
521
+ /// Send a sliding sync request.
522
+ ///
523
+ /// This method contains the sending logic. It takes a generic `Request`
524
+ /// because it can be a Simplified MSC3575 or a MSC3575 `Request`.
525
+ async fn send_sync_request < Request > (
526
+ & self ,
527
+ request : Request ,
528
+ request_config : RequestConfig ,
529
+ mut position_guard : OwnedMutexGuard < SlidingSyncPositionMarkers > ,
530
+ ) -> Result < UpdateSummary >
531
+ where
532
+ Request : OutgoingRequest + Clone + Debug + Send + Sync + ' static ,
533
+ Request :: IncomingResponse : Send
534
+ + Sync
535
+ +
536
+ // This is required to get back a Simplified MSC3575 `Response` whatever the
537
+ // `Request` type.
538
+ Into < http:: Response > ,
539
+ HttpError : From < ruma:: api:: error:: FromHttpResponseError < Request :: EndpointError > > ,
540
+ {
541
+ debug ! ( "Sending request" ) ;
542
+
543
+ // Prepare the request.
544
+ let request =
545
+ self . inner . client . send ( request, Some ( request_config) ) . with_homeserver_override (
546
+ self . inner . sliding_sync_proxy . as_ref ( ) . map ( ToString :: to_string) ,
547
+ ) ;
548
+
549
+ // Send the request and get a response with end-to-end encryption support.
550
+ //
551
+ // Sending the `/sync` request out when end-to-end encryption is enabled means
552
+ // that we need to also send out any outgoing e2ee related request out
553
+ // coming from the `OlmMachine::outgoing_requests()` method.
554
+
555
+ #[ cfg( feature = "e2e-encryption" ) ]
556
+ let response = {
557
+ if self . is_e2ee_enabled ( ) {
558
+ // Here, we need to run 2 things:
559
+ //
560
+ // 1. Send the sliding sync request and get a response,
561
+ // 2. Send the E2EE requests.
562
+ //
563
+ // We don't want to use a `join` or `try_join` because we want to fail if and
564
+ // only if sending the sliding sync request fails. Failing to send the E2EE
565
+ // requests should just result in a log.
566
+ //
567
+ // We also want to give the priority to sliding sync request. E2EE requests are
568
+ // sent concurrently to the sliding sync request, but the priority is on waiting
569
+ // a sliding sync response.
570
+ //
571
+ // If sending sliding sync request fails, the sending of E2EE requests must be
572
+ // aborted as soon as possible.
573
+
574
+ let client = self . inner . client . clone ( ) ;
575
+ let e2ee_uploads = spawn ( async move {
576
+ if let Err ( error) = client. send_outgoing_requests ( ) . await {
577
+ error ! ( ?error, "Error while sending outgoing E2EE requests" ) ;
578
+ }
579
+ } )
580
+ // Ensure that the task is not running in detached mode. It is aborted when it's
581
+ // dropped.
582
+ . abort_on_drop ( ) ;
583
+
584
+ // Wait on the sliding sync request success or failure early.
585
+ let response = request. await ?;
586
+
587
+ // At this point, if `request` has been resolved successfully, we wait on
588
+ // `e2ee_uploads`. It did run concurrently, so it should not be blocking for too
589
+ // long. Otherwise —if `request` has failed— `e2ee_uploads` has
590
+ // been dropped, so aborted.
591
+ e2ee_uploads. await . map_err ( |error| Error :: JoinError {
592
+ task_description : "e2ee_uploads" . to_owned ( ) ,
593
+ error,
594
+ } ) ?;
595
+
596
+ response
597
+ } else {
598
+ request. await ?
599
+ }
600
+ } ;
601
+
602
+ // Send the request and get a response _without_ end-to-end encryption support.
603
+ #[ cfg( not( feature = "e2e-encryption" ) ) ]
604
+ let response = request. await ?;
605
+
606
+ // The code manipulates `Request` and `Response` from Simplified MSC3575 because
607
+ // it's the future standard. But this function may have received a `Request`
608
+ // from Simplified MSC3575 or MSC3575. We need to get back a
609
+ // Simplified MSC3575 `Response`.
610
+ let response = Into :: < http:: simplified_msc3575:: Response > :: into ( response) ;
611
+
612
+ debug ! ( "Received response" ) ;
613
+
614
+ // At this point, the request has been sent, and a response has been received.
615
+ //
616
+ // We must ensure the handling of the response cannot be stopped/
617
+ // cancelled. It must be done entirely, otherwise we can have
618
+ // corrupted/incomplete states for Sliding Sync and other parts of
619
+ // the code.
620
+ //
621
+ // That's why we are running the handling of the response in a spawned
622
+ // future that cannot be cancelled by anything.
623
+ let this = self . clone ( ) ;
624
+
625
+ // Spawn a new future to ensure that the code inside this future cannot be
626
+ // cancelled if this method is cancelled.
627
+ let future = async move {
628
+ debug ! ( "Start handling response" ) ;
629
+
630
+ // In case the task running this future is detached, we must
631
+ // ensure responses are handled one at a time. At this point we still own
632
+ // `position_guard`, so we're fine.
633
+
634
+ // Handle the response.
635
+ let updates = this. handle_response ( response, & mut position_guard) . await ?;
636
+
637
+ this. cache_to_storage ( & position_guard) . await ?;
638
+
639
+ // Release the position guard lock.
640
+ // It means that other responses can be generated and then handled later.
641
+ drop ( position_guard) ;
642
+
643
+ debug ! ( "Done handling response" ) ;
644
+
645
+ Ok ( updates)
646
+ } ;
647
+
648
+ spawn ( future. instrument ( Span :: current ( ) ) ) . await . unwrap ( )
649
+ }
650
+
521
651
/// Is the e2ee extension enabled for this sliding sync instance?
522
652
#[ cfg( feature = "e2e-encryption" ) ]
523
653
fn is_e2ee_enabled ( & self ) -> bool {
@@ -546,144 +676,15 @@ impl SlidingSync {
546
676
// because it's the future standard. If
547
677
// `Client::is_simplified_sliding_sync_enabled` is turned off, the
548
678
// Simplified MSC3575 `Request` must be transformed into a MSC3575 `Request`.
549
- return if !self . inner . client . is_simplified_sliding_sync_enabled ( ) {
550
- send (
551
- self ,
679
+ if !self . inner . client . is_simplified_sliding_sync_enabled ( ) {
680
+ self . send_sync_request (
552
681
Into :: < http:: msc3575:: Request > :: into ( request) ,
553
682
request_config,
554
683
position_guard,
555
684
)
556
685
. await
557
686
} else {
558
- send ( self , request, request_config, position_guard) . await
559
- } ;
560
-
561
- // The sending logic. It takes a generic `Request` because it can be a
562
- // Simplified MSC3575 or a MSC3575 `Request`.
563
- async fn send < Request > (
564
- this : & SlidingSync ,
565
- request : Request ,
566
- request_config : RequestConfig ,
567
- mut position_guard : OwnedMutexGuard < SlidingSyncPositionMarkers > ,
568
- ) -> Result < UpdateSummary >
569
- where
570
- Request : OutgoingRequest + Clone + Debug + Send + Sync + ' static ,
571
- Request :: IncomingResponse : Send
572
- + Sync
573
- +
574
- // This is required to get back a Simplified MSC3575 `Response` whatever the
575
- // `Request` type.
576
- Into < http:: Response > ,
577
- crate :: HttpError : From < ruma:: api:: error:: FromHttpResponseError < Request :: EndpointError > > ,
578
- {
579
- debug ! ( "Sending request" ) ;
580
-
581
- // Prepare the request.
582
- let request =
583
- this. inner . client . send ( request, Some ( request_config) ) . with_homeserver_override (
584
- this. inner . sliding_sync_proxy . as_ref ( ) . map ( ToString :: to_string) ,
585
- ) ;
586
-
587
- // Send the request and get a response with end-to-end encryption support.
588
- //
589
- // Sending the `/sync` request out when end-to-end encryption is enabled means
590
- // that we need to also send out any outgoing e2ee related request out
591
- // coming from the `OlmMachine::outgoing_requests()` method.
592
-
593
- #[ cfg( feature = "e2e-encryption" ) ]
594
- let response = {
595
- if this. is_e2ee_enabled ( ) {
596
- // Here, we need to run 2 things:
597
- //
598
- // 1. Send the sliding sync request and get a response,
599
- // 2. Send the E2EE requests.
600
- //
601
- // We don't want to use a `join` or `try_join` because we want to fail if and
602
- // only if sending the sliding sync request fails. Failing to send the E2EE
603
- // requests should just result in a log.
604
- //
605
- // We also want to give the priority to sliding sync request. E2EE requests are
606
- // sent concurrently to the sliding sync request, but the priority is on waiting
607
- // a sliding sync response.
608
- //
609
- // If sending sliding sync request fails, the sending of E2EE requests must be
610
- // aborted as soon as possible.
611
-
612
- let client = this. inner . client . clone ( ) ;
613
- let e2ee_uploads = spawn ( async move {
614
- if let Err ( error) = client. send_outgoing_requests ( ) . await {
615
- error ! ( ?error, "Error while sending outgoing E2EE requests" ) ;
616
- }
617
- } )
618
- // Ensure that the task is not running in detached mode. It is aborted when it's
619
- // dropped.
620
- . abort_on_drop ( ) ;
621
-
622
- // Wait on the sliding sync request success or failure early.
623
- let response = request. await ?;
624
-
625
- // At this point, if `request` has been resolved successfully, we wait on
626
- // `e2ee_uploads`. It did run concurrently, so it should not be blocking for too
627
- // long. Otherwise —if `request` has failed— `e2ee_uploads` has
628
- // been dropped, so aborted.
629
- e2ee_uploads. await . map_err ( |error| Error :: JoinError {
630
- task_description : "e2ee_uploads" . to_owned ( ) ,
631
- error,
632
- } ) ?;
633
-
634
- response
635
- } else {
636
- request. await ?
637
- }
638
- } ;
639
-
640
- // Send the request and get a response _without_ end-to-end encryption support.
641
- #[ cfg( not( feature = "e2e-encryption" ) ) ]
642
- let response = request. await ?;
643
-
644
- // The code manipulates `Request` and `Response` from Simplified MSC3575 because
645
- // it's the future standard. But this function may have received a `Request`
646
- // from Simplified MSC3575 or MSC3575. We need to get back a
647
- // Simplified MSC3575 `Response`.
648
- let response = Into :: < http:: simplified_msc3575:: Response > :: into ( response) ;
649
-
650
- debug ! ( "Received response" ) ;
651
-
652
- // At this point, the request has been sent, and a response has been received.
653
- //
654
- // We must ensure the handling of the response cannot be stopped/
655
- // cancelled. It must be done entirely, otherwise we can have
656
- // corrupted/incomplete states for Sliding Sync and other parts of
657
- // the code.
658
- //
659
- // That's why we are running the handling of the response in a spawned
660
- // future that cannot be cancelled by anything.
661
- let this = this. clone ( ) ;
662
-
663
- // Spawn a new future to ensure that the code inside this future cannot be
664
- // cancelled if this method is cancelled.
665
- let future = async move {
666
- debug ! ( "Start handling response" ) ;
667
-
668
- // In case the task running this future is detached, we must
669
- // ensure responses are handled one at a time. At this point we still own
670
- // `position_guard`, so we're fine.
671
-
672
- // Handle the response.
673
- let updates = this. handle_response ( response, & mut position_guard) . await ?;
674
-
675
- this. cache_to_storage ( & position_guard) . await ?;
676
-
677
- // Release the position guard lock.
678
- // It means that other responses can be generated and then handled later.
679
- drop ( position_guard) ;
680
-
681
- debug ! ( "Done handling response" ) ;
682
-
683
- Ok ( updates)
684
- } ;
685
-
686
- spawn ( future. instrument ( Span :: current ( ) ) ) . await . unwrap ( )
687
+ self . send_sync_request ( request, request_config, position_guard) . await
687
688
}
688
689
}
689
690
0 commit comments