@@ -48,7 +48,7 @@ use quinn::{AsyncUdpSocket, ServerConfig};
48
48
use rand:: { seq:: SliceRandom , Rng , SeedableRng } ;
49
49
use relay_actor:: RelaySendItem ;
50
50
use smallvec:: { smallvec, SmallVec } ;
51
- use tokio:: sync:: { self , mpsc, Mutex } ;
51
+ use tokio:: sync:: { self , mpsc, oneshot , Mutex } ;
52
52
use tokio_util:: sync:: CancellationToken ;
53
53
use tracing:: {
54
54
debug, error, error_span, event, info, info_span, instrument, trace, trace_span, warn,
@@ -75,8 +75,8 @@ use crate::{
75
75
discovery:: { Discovery , DiscoveryItem , DiscoverySubscribers , NodeData , UserData } ,
76
76
key:: { public_ed_box, secret_ed_box, DecryptionError , SharedSecret } ,
77
77
metrics:: EndpointMetrics ,
78
- net_report:: { self , IpMappedAddresses , Report } ,
79
- watcher:: { self , Watchable } ,
78
+ net_report:: { self , IpMappedAddresses , NetReporter , Report } ,
79
+ watcher:: { self , Watchable , Watcher } ,
80
80
} ;
81
81
82
82
mod metrics;
@@ -1587,6 +1587,26 @@ impl MagicSock {
1587
1587
discovery. publish ( & data) ;
1588
1588
}
1589
1589
}
1590
+
1591
+ /// Run a net-report, outside of the usual net-report cycle. This is for
1592
+ /// diagnostic purposes only, and will not effect the usual net-report
1593
+ /// run cycle nor adjust the
1594
+ async fn run_diagnostic_net_report ( & self ) -> Result < oneshot:: Receiver < Result < NetReporter > > > {
1595
+ let is_running = net_report:: Client :: is_running ( & self . net_reporter ) . await ?;
1596
+ // wait for any current runs to complete
1597
+ // before requesting a run
1598
+ let mut is_running = is_running. stream ( ) ;
1599
+ while let Some ( is_running) = is_running. next ( ) . await {
1600
+ if is_running {
1601
+ continue ;
1602
+ }
1603
+ break ;
1604
+ }
1605
+ let ( tx, rx) = oneshot:: channel ( ) ;
1606
+ let msg = ActorMessage :: DiagnosticNetReport ( tx) ;
1607
+ self . actor_sender . send ( msg) . await ?;
1608
+ Ok ( rx)
1609
+ }
1590
1610
}
1591
1611
1592
1612
#[ derive( Clone , Debug ) ]
@@ -1985,6 +2005,24 @@ impl Handle {
1985
2005
}
1986
2006
trace ! ( "magicsock closed" ) ;
1987
2007
}
2008
+
2009
+ /// Run a diagnosic net-report check, that waits for all
2010
+ /// probes to return before ending the run (within the
2011
+ /// timeout).
2012
+ ///
2013
+ /// This does not interfere with the normal net-report run
2014
+ /// and does not update the known public addresses or
2015
+ /// adjust the known latency for any relay nodes, it is
2016
+ /// strictly for diagnostic purposes.
2017
+ ///
2018
+ /// Return a [`NetReporter`], allowing you to iterate
2019
+ /// over all of the returned probes using `.next()`, or
2020
+ /// you can just `.await` the [`NetReporter`] to get
2021
+ /// the [`Report`].
2022
+ pub ( crate ) async fn run_diagnostic_net_report ( & self ) -> Result < NetReporter > {
2023
+ let rx = self . msock . run_diagnostic_net_report ( ) . await ?;
2024
+ rx. await ?
2025
+ }
1988
2026
}
1989
2027
1990
2028
#[ derive( Debug , Default ) ]
@@ -2335,6 +2373,7 @@ enum ActorMessage {
2335
2373
Shutdown ,
2336
2374
EndpointPingExpired ( usize , stun_rs:: TransactionId ) ,
2337
2375
NetReport ( Result < Option < Arc < net_report:: Report > > > , & ' static str ) ,
2376
+ DiagnosticNetReport ( oneshot:: Sender < anyhow:: Result < NetReporter > > ) ,
2338
2377
NetworkChange ,
2339
2378
#[ cfg( test) ]
2340
2379
ForceNetworkChange ( bool ) ,
@@ -2676,6 +2715,9 @@ impl Actor {
2676
2715
ActorMessage :: ForceNetworkChange ( is_major) => {
2677
2716
self . handle_network_change ( is_major) . await ;
2678
2717
}
2718
+ ActorMessage :: DiagnosticNetReport ( tx) => {
2719
+ self . net_report_diagnostic ( tx) . await ;
2720
+ }
2679
2721
}
2680
2722
2681
2723
false
@@ -2876,6 +2918,39 @@ impl Actor {
2876
2918
self . net_info_last = Some ( ni) ;
2877
2919
}
2878
2920
2921
+ /// User requested a full diagnosic run of net-report, outside
2922
+ /// of the normal net report cycle.
2923
+ #[ instrument( level = "debug" , skip_all) ]
2924
+ async fn net_report_diagnostic ( & mut self , tx : oneshot:: Sender < Result < NetReporter > > ) {
2925
+ // Don't start a net report probe if we know
2926
+ // we are shutting down
2927
+ if self . msock . is_closing ( ) || self . msock . is_closed ( ) {
2928
+ tx. send ( Err ( anyhow ! (
2929
+ "magicsock is closed, cancelling net-report diagnostic"
2930
+ ) ) )
2931
+ . ok ( ) ;
2932
+ return ;
2933
+ }
2934
+ if self . msock . relay_map . is_empty ( ) {
2935
+ tx. send ( Err ( anyhow ! (
2936
+ "no relay nodes, cancelling net-report diagnostic"
2937
+ ) ) )
2938
+ . ok ( ) ;
2939
+ return ;
2940
+ }
2941
+ let relay_map = self . msock . relay_map . clone ( ) ;
2942
+
2943
+ #[ cfg( wasm_browser) ]
2944
+ let opts = self . net_report_config . clone ( ) ;
2945
+ // run a non-sparse report, meaning the report will ensure
2946
+ // that each probe protocol response is received for each relay
2947
+ // before finishing
2948
+ #[ cfg( not( wasm_browser) ) ]
2949
+ let opts = self . net_report_config . clone ( ) . sparse ( false ) ;
2950
+ let res = self . net_reporter . get_report_channel ( relay_map, opts) . await ;
2951
+ tx. send ( res) . ok ( ) ;
2952
+ }
2953
+
2879
2954
/// Calls net_report.
2880
2955
///
2881
2956
/// Note that invoking this is managed by [`DirectAddrUpdateState`] via
@@ -2908,9 +2983,8 @@ impl Actor {
2908
2983
task:: spawn ( async move {
2909
2984
let report = time:: timeout ( NET_REPORT_TIMEOUT , rx) . await ;
2910
2985
let report: anyhow:: Result < _ > = match report {
2911
- Ok ( Ok ( Ok ( report) ) ) => Ok ( Some ( report) ) ,
2912
- Ok ( Ok ( Err ( err) ) ) => Err ( err) ,
2913
- Ok ( Err ( _) ) => Err ( anyhow ! ( "net_report report not received" ) ) ,
2986
+ Ok ( Ok ( report) ) => Ok ( Some ( report) ) ,
2987
+ Ok ( Err ( err) ) => Err ( err) ,
2914
2988
Err ( err) => Err ( anyhow ! ( "net_report report timeout: {:?}" , err) ) ,
2915
2989
} ;
2916
2990
msg_sender
@@ -3456,9 +3530,7 @@ mod tests {
3456
3530
use crate :: {
3457
3531
defaults:: staging:: { self , EU_RELAY_HOSTNAME } ,
3458
3532
dns:: DnsResolver ,
3459
- tls,
3460
- watcher:: Watcher as _,
3461
- Endpoint , RelayMode ,
3533
+ tls, Endpoint , RelayMode ,
3462
3534
} ;
3463
3535
3464
3536
const ALPN : & [ u8 ] = b"n0/test/1" ;
@@ -4487,4 +4559,28 @@ mod tests {
4487
4559
4488
4560
Ok ( ( ) )
4489
4561
}
4562
+
4563
+ #[ tokio:: test]
4564
+ async fn multiple_net_report_runs ( ) -> Result < ( ) > {
4565
+ let stack = MagicStack :: new ( RelayMode :: Default ) . await ?;
4566
+ let ep = stack. endpoint ;
4567
+ println ! ( "running multiple net-report checks at once" ) ;
4568
+ let mut reporter0 = ep. magic_sock ( ) . run_diagnostic_net_report ( ) . await ?;
4569
+ let mut reporter1 = ep. magic_sock ( ) . run_diagnostic_net_report ( ) . await ?;
4570
+ let mut set = JoinSet :: new ( ) ;
4571
+ set. spawn ( async move {
4572
+ while let Some ( probe_result) = reporter0. next ( ) . await {
4573
+ println ! ( "probe from reporter0: {probe_result}" ) ;
4574
+ }
4575
+ println ! ( "probe report from reporter0: {:?}" , reporter0. await ) ;
4576
+ } ) ;
4577
+ set. spawn ( async move {
4578
+ while let Some ( probe_result) = reporter1. next ( ) . await {
4579
+ println ! ( "probe from reporter1: {probe_result}" ) ;
4580
+ }
4581
+ println ! ( "probe report from reporter1: {:?}" , reporter1. await ) ;
4582
+ } ) ;
4583
+ set. join_all ( ) . await ;
4584
+ Ok ( ( ) )
4585
+ }
4490
4586
}
0 commit comments