@@ -23,7 +23,7 @@ use chrono::{DateTime, Utc};
23
23
use fnv:: { FnvHashMap , FnvHashSet } ;
24
24
use futures:: {
25
25
channel:: {
26
- mpsc:: { self , Receiver , Sender , UnboundedReceiver , UnboundedSender } ,
26
+ mpsc:: { self , Receiver , Sender , TrySendError , UnboundedReceiver , UnboundedSender } ,
27
27
oneshot,
28
28
} ,
29
29
future:: { self , Either } ,
@@ -42,7 +42,7 @@ use libp2p::tcp::TokioTcpConfig as TcpConfig;
42
42
use libp2p:: {
43
43
core:: {
44
44
either:: EitherTransport ,
45
- transport:: Transport ,
45
+ transport:: { ListenerId , Transport } ,
46
46
upgrade:: { SelectUpgrade , Version } ,
47
47
} ,
48
48
identity:: ed25519:: PublicKey ,
@@ -273,23 +273,17 @@ impl NetworkService {
273
273
}
274
274
275
275
fn cmd ( & mut self , msg : NetworkCommand ) -> Option < ( NetworkCommand , & ' static str ) > {
276
- match self . cmd . try_send ( msg) {
277
- Ok ( _) => None ,
278
- Err ( err) => {
279
- let reason = if err. is_disconnected ( ) {
280
- "receiver went away"
281
- } else {
282
- "channel is full"
283
- } ;
284
- let val = err. into_inner ( ) ;
285
- tracing:: warn!( "failed IPFS swarm command {:?}: {}" , val, reason) ;
286
- Some ( ( val, reason) )
287
- }
288
- }
276
+ Self :: handle_send_result ( self . cmd . try_send ( msg) )
289
277
}
290
278
291
279
fn cmd_shared ( & self , msg : NetworkCommand ) -> Option < ( NetworkCommand , & ' static str ) > {
292
- match self . cmd . clone ( ) . try_send ( msg) {
280
+ Self :: handle_send_result ( self . cmd . clone ( ) . try_send ( msg) )
281
+ }
282
+
283
+ fn handle_send_result (
284
+ res : Result < ( ) , TrySendError < NetworkCommand > > ,
285
+ ) -> Option < ( NetworkCommand , & ' static str ) > {
286
+ match res {
293
287
Ok ( _) => None ,
294
288
Err ( err) => {
295
289
let reason = if err. is_disconnected ( ) {
@@ -328,7 +322,7 @@ impl NetworkService {
328
322
}
329
323
330
324
pub fn external_addresses ( & self ) -> Vec < AddressRecord > {
331
- self . external . cloned ( )
325
+ self . external . get_cloned ( )
332
326
}
333
327
334
328
pub fn add_address ( & mut self , peer : PeerId , addr : Multiaddr ) {
@@ -504,6 +498,7 @@ impl NetworkService {
504
498
async { rx. await ? } . right_future ( )
505
499
}
506
500
501
+ // This cannot take `&mut self` due to trait constraints, so it needs to use the less efficient cmd_shared.
507
502
pub fn get ( & self , cid : Cid , providers : Vec < PeerId > ) -> impl Future < Output = Result < GetQuery > > {
508
503
let ( tx, rx) = oneshot:: channel ( ) ;
509
504
if let Some ( ( _, err) ) = self . cmd_shared ( NetworkCommand :: Get ( cid, providers, tx) ) {
@@ -512,6 +507,7 @@ impl NetworkService {
512
507
async { Ok ( rx. await ?) } . right_future ( )
513
508
}
514
509
510
+ // This cannot take `&mut self` due to trait constraints, so it needs to use the less efficient cmd_shared.
515
511
pub fn sync (
516
512
& self ,
517
513
cid : Cid ,
@@ -581,6 +577,7 @@ async fn poll_swarm<P: StoreParams>(
581
577
behaviour:: NetworkBackendBehaviourEvent :: Kad ( e) => {
582
578
let mut bootstrap_complete = * bootstrapped. read ( ) ;
583
579
let bootstrap_old = bootstrap_complete;
580
+ // DO NOT HOLD bootstrapped LOCK ACROSS ARBITRARY CODE
584
581
swarm. inject_kad_event ( e, & mut bootstrap_complete, & mut queries) ;
585
582
if bootstrap_complete != bootstrap_old {
586
583
* bootstrapped. write ( ) = bootstrap_complete;
@@ -618,33 +615,7 @@ async fn poll_swarm<P: StoreParams>(
618
615
swarm. behaviour_mut ( ) . swarm_events ( tx) ;
619
616
match swarm. listen_on ( addr. clone ( ) ) {
620
617
Ok ( listener) => executor
621
- . spawn (
622
- rx. take_while ( move |event| match event {
623
- Event :: ListenerClosed ( id) if * id == listener => {
624
- future:: ready ( false )
625
- }
626
- Event :: NewListenAddr ( id, addr) if * id == listener => {
627
- future:: ready (
628
- response
629
- . unbounded_send ( ListenerEvent :: NewListenAddr (
630
- addr. clone ( ) ,
631
- ) )
632
- . is_ok ( ) ,
633
- )
634
- }
635
- Event :: ExpiredListenAddr ( id, addr) if * id == listener => {
636
- future:: ready (
637
- response
638
- . unbounded_send ( ListenerEvent :: ExpiredListenAddr (
639
- addr. clone ( ) ,
640
- ) )
641
- . is_ok ( ) ,
642
- )
643
- }
644
- _ => future:: ready ( true ) ,
645
- } )
646
- . for_each ( |_| future:: ready ( ( ) ) ) ,
647
- )
618
+ . spawn ( forward_listener_events ( listener, response, rx) )
648
619
. detach ( ) ,
649
620
Err ( error) => {
650
621
response
@@ -777,6 +748,28 @@ async fn poll_swarm<P: StoreParams>(
777
748
}
778
749
}
779
750
751
+ fn forward_listener_events (
752
+ listener : ListenerId ,
753
+ response : UnboundedSender < ListenerEvent > ,
754
+ rx : UnboundedReceiver < Event > ,
755
+ ) -> impl Future < Output = ( ) > {
756
+ rx. take_while ( move |event| match event {
757
+ Event :: ListenerClosed ( id) if * id == listener => future:: ready ( false ) ,
758
+ Event :: NewListenAddr ( id, addr) if * id == listener => future:: ready (
759
+ response
760
+ . unbounded_send ( ListenerEvent :: NewListenAddr ( addr. clone ( ) ) )
761
+ . is_ok ( ) ,
762
+ ) ,
763
+ Event :: ExpiredListenAddr ( id, addr) if * id == listener => future:: ready (
764
+ response
765
+ . unbounded_send ( ListenerEvent :: ExpiredListenAddr ( addr. clone ( ) ) )
766
+ . is_ok ( ) ,
767
+ ) ,
768
+ _ => future:: ready ( true ) ,
769
+ } )
770
+ . for_each ( |_| future:: ready ( ( ) ) )
771
+ }
772
+
780
773
#[ derive( Debug ) ]
781
774
pub struct GetQuery {
782
775
swarm : Sender < NetworkCommand > ,
@@ -790,7 +783,7 @@ impl Future for GetQuery {
790
783
fn poll ( mut self : Pin < & mut Self > , cx : & mut Context ) -> Poll < Self :: Output > {
791
784
match Pin :: new ( & mut self . rx ) . poll ( cx) {
792
785
Poll :: Ready ( Ok ( result) ) => Poll :: Ready ( result) ,
793
- Poll :: Ready ( Err ( err) ) => Poll :: Ready ( Err ( anyhow ! ( "{}" , err) ) ) ,
786
+ Poll :: Ready ( Err ( err) ) => Poll :: Ready ( Err ( err. into ( ) ) ) ,
794
787
Poll :: Pending => Poll :: Pending ,
795
788
}
796
789
}
@@ -799,7 +792,9 @@ impl Future for GetQuery {
799
792
impl Drop for GetQuery {
800
793
fn drop ( & mut self ) {
801
794
if let Err ( err) = self . swarm . try_send ( NetworkCommand :: CancelQuery ( self . id ) ) {
802
- tracing:: warn!( "cannot cancel dropped GetQuery: {}" , err. into_send_error( ) ) ;
795
+ if !err. is_disconnected ( ) {
796
+ tracing:: warn!( "cannot cancel dropped GetQuery: {}" , err. into_send_error( ) ) ;
797
+ }
803
798
}
804
799
}
805
800
}
@@ -852,10 +847,11 @@ impl Stream for SyncQuery {
852
847
853
848
impl Drop for SyncQuery {
854
849
fn drop ( & mut self ) {
855
- if let Some ( id) = self . id . take ( ) {
856
- let swarm = self . swarm . as_mut ( ) . unwrap ( ) ;
850
+ if let ( Some ( id) , Some ( mut swarm) ) = ( self . id . take ( ) , self . swarm . take ( ) ) {
857
851
if let Err ( err) = swarm. try_send ( NetworkCommand :: CancelQuery ( id) ) {
858
- tracing:: warn!( "cannot cancel dropped SyncQuery: {}" , err. into_send_error( ) ) ;
852
+ if !err. is_disconnected ( ) {
853
+ tracing:: warn!( "cannot cancel dropped SyncQuery: {}" , err. into_send_error( ) ) ;
854
+ }
859
855
}
860
856
}
861
857
}
0 commit comments