Skip to content

Commit cf3e4c6

Browse files
authored
feat(quic): implement hole punching
Implement `Transport::dial_as_listener` for QUIC as specified by the [DCUtR spec](https://github.com/libp2p/specs/blob/master/relay/DCUtR.md). To facilitate hole punching in QUIC, one side needs to send random UDP packets to establish a mapping in the routing table of the NAT device. If successful, our listener will emit a new inbound connection. This connection needs to then be sent to the dialing task. We achieve this by storing a `HashMap` of hole punch attempts indexed by the remote's `SocketAddr`. A matching incoming connection is then sent via a oneshot channel to the dialing task which continues with upgrading the connection. Related #2883. Pull-Request: #3964.
1 parent 2a6311f commit cf3e4c6

File tree

14 files changed

+293
-78
lines changed

14 files changed

+293
-78
lines changed

Cargo.lock

+2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/dcutr/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,5 @@ env_logger = "0.10.0"
1111
futures = "0.3.28"
1212
futures-timer = "3.0"
1313
libp2p = { path = "../../libp2p", features = ["async-std", "dns", "dcutr", "identify", "macros", "noise", "ping", "relay", "rendezvous", "tcp", "tokio", "yamux"] }
14+
libp2p-quic = { path = "../../transports/quic", features = ["async-std"] }
1415
log = "0.4"

examples/dcutr/src/main.rs

+28-21
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,14 @@
2323
use clap::Parser;
2424
use futures::{
2525
executor::{block_on, ThreadPool},
26-
future::FutureExt,
26+
future::{Either, FutureExt},
2727
stream::StreamExt,
2828
};
2929
use libp2p::{
3030
core::{
3131
multiaddr::{Multiaddr, Protocol},
32-
transport::{OrTransport, Transport},
32+
muxing::StreamMuxerBox,
33+
transport::Transport,
3334
upgrade,
3435
},
3536
dcutr,
@@ -38,9 +39,9 @@ use libp2p::{
3839
swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent},
3940
tcp, yamux, PeerId,
4041
};
42+
use libp2p_quic as quic;
4143
use log::info;
4244
use std::error::Error;
43-
use std::net::Ipv4Addr;
4445
use std::str::FromStr;
4546

4647
#[derive(Debug, Parser)]
@@ -91,19 +92,26 @@ fn main() -> Result<(), Box<dyn Error>> {
9192

9293
let (relay_transport, client) = relay::client::new(local_peer_id);
9394

94-
let transport = OrTransport::new(
95-
relay_transport,
96-
block_on(DnsConfig::system(tcp::async_io::Transport::new(
97-
tcp::Config::default().port_reuse(true),
98-
)))
99-
.unwrap(),
100-
)
101-
.upgrade(upgrade::Version::V1Lazy)
102-
.authenticate(
103-
noise::Config::new(&local_key).expect("Signing libp2p-noise static DH keypair failed."),
104-
)
105-
.multiplex(yamux::Config::default())
106-
.boxed();
95+
let transport = {
96+
let relay_tcp_quic_transport = relay_transport
97+
.or_transport(tcp::async_io::Transport::new(
98+
tcp::Config::default().port_reuse(true),
99+
))
100+
.upgrade(upgrade::Version::V1)
101+
.authenticate(noise::Config::new(&local_key).unwrap())
102+
.multiplex(yamux::Config::default())
103+
.or_transport(quic::async_std::Transport::new(quic::Config::new(
104+
&local_key,
105+
)));
106+
107+
block_on(DnsConfig::system(relay_tcp_quic_transport))
108+
.unwrap()
109+
.map(|either_output, _| match either_output {
110+
Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
111+
Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
112+
})
113+
.boxed()
114+
};
107115

108116
#[derive(NetworkBehaviour)]
109117
#[behaviour(to_swarm = "Event")]
@@ -164,11 +172,10 @@ fn main() -> Result<(), Box<dyn Error>> {
164172
.build();
165173

166174
swarm
167-
.listen_on(
168-
Multiaddr::empty()
169-
.with("0.0.0.0".parse::<Ipv4Addr>().unwrap().into())
170-
.with(Protocol::Tcp(0)),
171-
)
175+
.listen_on("/ip4/0.0.0.0/udp/0/quic-v1".parse().unwrap())
176+
.unwrap();
177+
swarm
178+
.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap())
172179
.unwrap();
173180

174181
// Wait to listen on all interfaces.

examples/relay-server/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,4 @@ async-trait = "0.1"
1212
env_logger = "0.10.0"
1313
futures = "0.3.28"
1414
libp2p = { path = "../../libp2p", features = ["async-std", "noise", "macros", "ping", "tcp", "identify", "yamux", "relay"] }
15+
libp2p-quic = { path = "../../transports/quic", features = ["async-std"] }

examples/relay-server/src/main.rs

+25-5
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,11 @@
2222
#![doc = include_str!("../README.md")]
2323

2424
use clap::Parser;
25-
use futures::executor::block_on;
2625
use futures::stream::StreamExt;
26+
use futures::{executor::block_on, future::Either};
2727
use libp2p::{
2828
core::multiaddr::Protocol,
29+
core::muxing::StreamMuxerBox,
2930
core::upgrade,
3031
core::{Multiaddr, Transport},
3132
identify, identity,
@@ -34,6 +35,7 @@ use libp2p::{
3435
swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent},
3536
tcp,
3637
};
38+
use libp2p_quic as quic;
3739
use std::error::Error;
3840
use std::net::{Ipv4Addr, Ipv6Addr};
3941

@@ -50,12 +52,21 @@ fn main() -> Result<(), Box<dyn Error>> {
5052

5153
let tcp_transport = tcp::async_io::Transport::default();
5254

53-
let transport = tcp_transport
55+
let tcp_transport = tcp_transport
5456
.upgrade(upgrade::Version::V1Lazy)
5557
.authenticate(
5658
noise::Config::new(&local_key).expect("Signing libp2p-noise static DH keypair failed."),
5759
)
58-
.multiplex(libp2p::yamux::Config::default())
60+
.multiplex(libp2p::yamux::Config::default());
61+
62+
let quic_transport = quic::async_std::Transport::new(quic::Config::new(&local_key));
63+
64+
let transport = quic_transport
65+
.or_transport(tcp_transport)
66+
.map(|either_output, _| match either_output {
67+
Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
68+
Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
69+
})
5970
.boxed();
6071

6172
let behaviour = Behaviour {
@@ -70,13 +81,22 @@ fn main() -> Result<(), Box<dyn Error>> {
7081
let mut swarm = SwarmBuilder::without_executor(transport, behaviour, local_peer_id).build();
7182

7283
// Listen on all interfaces
73-
let listen_addr = Multiaddr::empty()
84+
let listen_addr_tcp = Multiaddr::empty()
7485
.with(match opt.use_ipv6 {
7586
Some(true) => Protocol::from(Ipv6Addr::UNSPECIFIED),
7687
_ => Protocol::from(Ipv4Addr::UNSPECIFIED),
7788
})
7889
.with(Protocol::Tcp(opt.port));
79-
swarm.listen_on(listen_addr)?;
90+
swarm.listen_on(listen_addr_tcp)?;
91+
92+
let listen_addr_quic = Multiaddr::empty()
93+
.with(match opt.use_ipv6 {
94+
Some(true) => Protocol::from(Ipv6Addr::UNSPECIFIED),
95+
_ => Protocol::from(Ipv4Addr::UNSPECIFIED),
96+
})
97+
.with(Protocol::Udp(opt.port))
98+
.with(Protocol::QuicV1);
99+
swarm.listen_on(listen_addr_quic)?;
80100

81101
block_on(async {
82102
loop {

transports/quic/CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@
33
- Raise MSRV to 1.65.
44
See [PR 3715].
55

6+
- Add hole punching support by implementing `Transport::dial_as_listener`. See [PR 3964].
7+
68
[PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715
9+
[PR 3964]: https://github.com/libp2p/rust-libp2p/pull/3964
710

811
## 0.7.0-alpha.3
912

transports/quic/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ quinn-proto = { version = "0.10.1", default-features = false, features = ["tls-r
2323
rand = "0.8.5"
2424
rustls = { version = "0.21.1", default-features = false }
2525
thiserror = "1.0.40"
26-
tokio = { version = "1.28.2", default-features = false, features = ["net", "rt"], optional = true }
26+
tokio = { version = "1.28.2", default-features = false, features = ["net", "rt", "time"], optional = true }
2727

2828
[features]
2929
tokio = ["dep:tokio", "if-watch/tokio"]

transports/quic/src/endpoint.rs

+7
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,13 @@ impl Channel {
279279
Ok(Ok(()))
280280
}
281281

282+
pub(crate) async fn send(&mut self, to_endpoint: ToEndpoint) -> Result<(), Disconnected> {
283+
self.to_endpoint
284+
.send(to_endpoint)
285+
.await
286+
.map_err(|_| Disconnected {})
287+
}
288+
282289
/// Send a message to inform the [`Driver`] about an
283290
/// event caused by the owner of this [`Channel`] dropping.
284291
/// This clones the sender to the endpoint to guarantee delivery.

transports/quic/src/hole_punching.rs

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
use std::{net::SocketAddr, time::Duration};
2+
3+
use futures::future::Either;
4+
use rand::{distributions, Rng};
5+
6+
use crate::{
7+
endpoint::{self, ToEndpoint},
8+
Error, Provider,
9+
};
10+
11+
pub(crate) async fn hole_puncher<P: Provider>(
12+
endpoint_channel: endpoint::Channel,
13+
remote_addr: SocketAddr,
14+
timeout_duration: Duration,
15+
) -> Error {
16+
let punch_holes_future = punch_holes::<P>(endpoint_channel, remote_addr);
17+
futures::pin_mut!(punch_holes_future);
18+
match futures::future::select(P::sleep(timeout_duration), punch_holes_future).await {
19+
Either::Left(_) => Error::HandshakeTimedOut,
20+
Either::Right((hole_punch_err, _)) => hole_punch_err,
21+
}
22+
}
23+
24+
async fn punch_holes<P: Provider>(
25+
mut endpoint_channel: endpoint::Channel,
26+
remote_addr: SocketAddr,
27+
) -> Error {
28+
loop {
29+
let sleep_duration = Duration::from_millis(rand::thread_rng().gen_range(10..=200));
30+
P::sleep(sleep_duration).await;
31+
32+
let random_udp_packet = ToEndpoint::SendUdpPacket(quinn_proto::Transmit {
33+
destination: remote_addr,
34+
ecn: None,
35+
contents: rand::thread_rng()
36+
.sample_iter(distributions::Standard)
37+
.take(64)
38+
.collect(),
39+
segment_size: None,
40+
src_ip: None,
41+
});
42+
43+
if endpoint_channel.send(random_udp_packet).await.is_err() {
44+
return Error::EndpointDriverCrashed;
45+
}
46+
}
47+
}

transports/quic/src/lib.rs

+11
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,12 @@
5959

6060
mod connection;
6161
mod endpoint;
62+
mod hole_punching;
6263
mod provider;
6364
mod transport;
6465

66+
use std::net::SocketAddr;
67+
6568
pub use connection::{Connecting, Connection, Substream};
6669
pub use endpoint::Config;
6770
#[cfg(feature = "async-std")]
@@ -94,6 +97,14 @@ pub enum Error {
9497
/// The [`Connecting`] future timed out.
9598
#[error("Handshake with the remote timed out.")]
9699
HandshakeTimedOut,
100+
101+
/// Error when `Transport::dial_as_listener` is called without an active listener.
102+
#[error("Tried to dial as listener without an active listener.")]
103+
NoActiveListenerForDialAsListener,
104+
105+
/// Error when holepunching for a remote is already in progress
106+
#[error("Already punching hole for {0}).")]
107+
HolePunchInProgress(SocketAddr),
97108
}
98109

99110
/// Dialing a remote peer failed.

transports/quic/src/provider.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@
1818
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
1919
// DEALINGS IN THE SOFTWARE.
2020

21-
use futures::Future;
21+
use futures::{future::BoxFuture, Future};
2222
use if_watch::IfEvent;
2323
use std::{
2424
io,
2525
net::SocketAddr,
2626
task::{Context, Poll},
27+
time::Duration,
2728
};
2829

2930
#[cfg(feature = "async-std")]
@@ -74,4 +75,7 @@ pub trait Provider: Unpin + Send + Sized + 'static {
7475
watcher: &mut Self::IfWatcher,
7576
cx: &mut Context<'_>,
7677
) -> Poll<io::Result<IfEvent>>;
78+
79+
/// Sleep for specified amount of time.
80+
fn sleep(duration: Duration) -> BoxFuture<'static, ()>;
7781
}

transports/quic/src/provider/async_std.rs

+5
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use std::{
2626
pin::Pin,
2727
sync::Arc,
2828
task::{Context, Poll},
29+
time::Duration,
2930
};
3031

3132
use crate::GenTransport;
@@ -104,6 +105,10 @@ impl super::Provider for Provider {
104105
) -> Poll<io::Result<if_watch::IfEvent>> {
105106
watcher.poll_if_event(cx)
106107
}
108+
109+
fn sleep(duration: Duration) -> BoxFuture<'static, ()> {
110+
async_std::task::sleep(duration).boxed()
111+
}
107112
}
108113

109114
type ReceiveStreamItem = (

transports/quic/src/provider/tokio.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@
1818
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
1919
// DEALINGS IN THE SOFTWARE.
2020

21-
use futures::{ready, Future};
21+
use futures::{future::BoxFuture, ready, Future, FutureExt};
2222
use std::{
2323
io,
2424
net::SocketAddr,
2525
task::{Context, Poll},
26+
time::Duration,
2627
};
2728
use tokio::{io::ReadBuf, net::UdpSocket};
2829

@@ -95,4 +96,8 @@ impl super::Provider for Provider {
9596
) -> Poll<io::Result<if_watch::IfEvent>> {
9697
watcher.poll_if_event(cx)
9798
}
99+
100+
fn sleep(duration: Duration) -> BoxFuture<'static, ()> {
101+
tokio::time::sleep(duration).boxed()
102+
}
98103
}

0 commit comments

Comments
 (0)