Skip to content

Commit 4e1fa9b

Browse files
authoredJul 28, 2023
refactor(quic): rewrite quic using quinn
Rewrite quic using quinn instead of quinn-proto. libp2p-quic::endpoint::Driver is eliminated (and that hard quinn-proto machinery). Also: - ECN bits are handled - Support Generic Send Offload (GSO) Pull-Request: #3454.
1 parent f10f1a2 commit 4e1fa9b

17 files changed

+608
-1803
lines changed
 

‎Cargo.lock

+35-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ libp2p-perf = { version = "0.2.0", path = "protocols/perf" }
8383
libp2p-ping = { version = "0.43.0", path = "protocols/ping" }
8484
libp2p-plaintext = { version = "0.40.0", path = "transports/plaintext" }
8585
libp2p-pnet = { version = "0.23.0", path = "transports/pnet" }
86-
libp2p-quic = { version = "0.8.0-alpha", path = "transports/quic" }
86+
libp2p-quic = { version = "0.9.0-alpha", path = "transports/quic" }
8787
libp2p-relay = { version = "0.16.1", path = "protocols/relay" }
8888
libp2p-rendezvous = { version = "0.13.0", path = "protocols/rendezvous" }
8989
libp2p-request-response = { version = "0.25.1", path = "protocols/request-response" }

‎transports/quic/CHANGELOG.md

+7
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
## 0.9.0-alpha - unreleased
2+
3+
- Use `quinn` instead of `quinn-proto`.
4+
See [PR 3454].
5+
6+
[PR 3454]: https://github.com/libp2p/rust-libp2p/pull/3454
7+
18
## 0.8.0-alpha
29

310
- Raise MSRV to 1.65.

‎transports/quic/Cargo.toml

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "libp2p-quic"
3-
version = "0.8.0-alpha"
3+
version = "0.9.0-alpha"
44
authors = ["Parity Technologies <admin@parity.io>"]
55
edition = "2021"
66
rust-version = { workspace = true }
@@ -19,15 +19,15 @@ libp2p-tls = { workspace = true }
1919
libp2p-identity = { workspace = true }
2020
log = "0.4"
2121
parking_lot = "0.12.0"
22-
quinn-proto = { version = "0.10.1", default-features = false, features = ["tls-rustls"] }
22+
quinn = { version = "0.10.1", default-features = false, features = ["tls-rustls", "futures-io"] }
2323
rand = "0.8.5"
2424
rustls = { version = "0.21.2", default-features = false }
2525
thiserror = "1.0.44"
2626
tokio = { version = "1.29.1", default-features = false, features = ["net", "rt", "time"], optional = true }
2727

2828
[features]
29-
tokio = ["dep:tokio", "if-watch/tokio"]
30-
async-std = ["dep:async-std", "if-watch/smol"]
29+
tokio = ["dep:tokio", "if-watch/tokio", "quinn/runtime-tokio"]
30+
async-std = ["dep:async-std", "if-watch/smol", "quinn/runtime-async-std"]
3131

3232
# Passing arguments to the docsrs builder in order to properly document cfg's.
3333
# More information: https://docs.rs/about/builds#cross-compiling

‎transports/quic/src/config.rs

+142
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a
4+
// copy of this software and associated documentation files (the "Software"),
5+
// to deal in the Software without restriction, including without limitation
6+
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7+
// and/or sell copies of the Software, and to permit persons to whom the
8+
// Software is furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14+
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18+
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19+
// DEALINGS IN THE SOFTWARE.
20+
21+
use quinn::VarInt;
22+
use std::{sync::Arc, time::Duration};
23+
24+
/// Config for the transport.
25+
#[derive(Clone)]
26+
pub struct Config {
27+
/// Timeout for the initial handshake when establishing a connection.
28+
/// The actual timeout is the minimum of this and the [`Config::max_idle_timeout`].
29+
pub handshake_timeout: Duration,
30+
/// Maximum duration of inactivity in ms to accept before timing out the connection.
31+
pub max_idle_timeout: u32,
32+
/// Period of inactivity before sending a keep-alive packet.
33+
/// Must be set lower than the idle_timeout of both
34+
/// peers to be effective.
35+
///
36+
/// See [`quinn::TransportConfig::keep_alive_interval`] for more
37+
/// info.
38+
pub keep_alive_interval: Duration,
39+
/// Maximum number of incoming bidirectional streams that may be open
40+
/// concurrently by the remote peer.
41+
pub max_concurrent_stream_limit: u32,
42+
43+
/// Max unacknowledged data in bytes that may be send on a single stream.
44+
pub max_stream_data: u32,
45+
46+
/// Max unacknowledged data in bytes that may be send in total on all streams
47+
/// of a connection.
48+
pub max_connection_data: u32,
49+
50+
/// Support QUIC version draft-29 for dialing and listening.
51+
///
52+
/// Per default only QUIC Version 1 / [`libp2p_core::multiaddr::Protocol::QuicV1`]
53+
/// is supported.
54+
///
55+
/// If support for draft-29 is enabled servers support draft-29 and version 1 on all
56+
/// QUIC listening addresses.
57+
/// As client the version is chosen based on the remote's address.
58+
pub support_draft_29: bool,
59+
60+
/// TLS client config for the inner [`quinn::ClientConfig`].
61+
client_tls_config: Arc<rustls::ClientConfig>,
62+
/// TLS server config for the inner [`quinn::ServerConfig`].
63+
server_tls_config: Arc<rustls::ServerConfig>,
64+
}
65+
66+
impl Config {
67+
/// Creates a new configuration object with default values.
68+
pub fn new(keypair: &libp2p_identity::Keypair) -> Self {
69+
let client_tls_config = Arc::new(libp2p_tls::make_client_config(keypair, None).unwrap());
70+
let server_tls_config = Arc::new(libp2p_tls::make_server_config(keypair).unwrap());
71+
Self {
72+
client_tls_config,
73+
server_tls_config,
74+
support_draft_29: false,
75+
handshake_timeout: Duration::from_secs(5),
76+
max_idle_timeout: 30 * 1000,
77+
max_concurrent_stream_limit: 256,
78+
keep_alive_interval: Duration::from_secs(15),
79+
max_connection_data: 15_000_000,
80+
81+
// Ensure that one stream is not consuming the whole connection.
82+
max_stream_data: 10_000_000,
83+
}
84+
}
85+
}
86+
87+
/// Represents the inner configuration for [`quinn`].
88+
#[derive(Debug, Clone)]
89+
pub(crate) struct QuinnConfig {
90+
pub(crate) client_config: quinn::ClientConfig,
91+
pub(crate) server_config: quinn::ServerConfig,
92+
pub(crate) endpoint_config: quinn::EndpointConfig,
93+
}
94+
95+
impl From<Config> for QuinnConfig {
96+
fn from(config: Config) -> QuinnConfig {
97+
let Config {
98+
client_tls_config,
99+
server_tls_config,
100+
max_idle_timeout,
101+
max_concurrent_stream_limit,
102+
keep_alive_interval,
103+
max_connection_data,
104+
max_stream_data,
105+
support_draft_29,
106+
handshake_timeout: _,
107+
} = config;
108+
let mut transport = quinn::TransportConfig::default();
109+
// Disable uni-directional streams.
110+
transport.max_concurrent_uni_streams(0u32.into());
111+
transport.max_concurrent_bidi_streams(max_concurrent_stream_limit.into());
112+
// Disable datagrams.
113+
transport.datagram_receive_buffer_size(None);
114+
transport.keep_alive_interval(Some(keep_alive_interval));
115+
transport.max_idle_timeout(Some(VarInt::from_u32(max_idle_timeout).into()));
116+
transport.allow_spin(false);
117+
transport.stream_receive_window(max_stream_data.into());
118+
transport.receive_window(max_connection_data.into());
119+
let transport = Arc::new(transport);
120+
121+
let mut server_config = quinn::ServerConfig::with_crypto(server_tls_config);
122+
server_config.transport = Arc::clone(&transport);
123+
// Disables connection migration.
124+
// Long-term this should be enabled, however we then need to handle address change
125+
// on connections in the `Connection`.
126+
server_config.migration(false);
127+
128+
let mut client_config = quinn::ClientConfig::new(client_tls_config);
129+
client_config.transport_config(transport);
130+
131+
let mut endpoint_config = quinn::EndpointConfig::default();
132+
if !support_draft_29 {
133+
endpoint_config.supported_versions(vec![1]);
134+
}
135+
136+
QuinnConfig {
137+
client_config,
138+
server_config,
139+
endpoint_config,
140+
}
141+
}
142+
}

‎transports/quic/src/connection.rs

+63-359
Large diffs are not rendered by default.

‎transports/quic/src/connection/connecting.rs

+33-48
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@
2020

2121
//! Future that drives a QUIC connection until is has performed its TLS handshake.
2222
23-
use crate::{Connection, Error};
23+
use crate::{Connection, ConnectionError, Error};
2424

25-
use futures::prelude::*;
25+
use futures::{
26+
future::{select, Either, FutureExt, Select},
27+
prelude::*,
28+
};
2629
use futures_timer::Delay;
2730
use libp2p_identity::PeerId;
2831
use std::{
@@ -34,64 +37,46 @@ use std::{
3437
/// A QUIC connection currently being negotiated.
3538
#[derive(Debug)]
3639
pub struct Connecting {
37-
connection: Option<Connection>,
38-
timeout: Delay,
40+
connecting: Select<quinn::Connecting, Delay>,
3941
}
4042

4143
impl Connecting {
42-
pub(crate) fn new(connection: Connection, timeout: Duration) -> Self {
44+
pub(crate) fn new(connection: quinn::Connecting, timeout: Duration) -> Self {
4345
Connecting {
44-
connection: Some(connection),
45-
timeout: Delay::new(timeout),
46+
connecting: select(connection, Delay::new(timeout)),
4647
}
4748
}
4849
}
4950

51+
impl Connecting {
52+
/// Returns the address of the node we're connected to.
53+
/// Panics if the connection is still handshaking.
54+
fn remote_peer_id(connection: &quinn::Connection) -> PeerId {
55+
let identity = connection
56+
.peer_identity()
57+
.expect("connection got identity because it passed TLS handshake; qed");
58+
let certificates: Box<Vec<rustls::Certificate>> =
59+
identity.downcast().expect("we rely on rustls feature; qed");
60+
let end_entity = certificates
61+
.get(0)
62+
.expect("there should be exactly one certificate; qed");
63+
let p2p_cert = libp2p_tls::certificate::parse(end_entity)
64+
.expect("the certificate was validated during TLS handshake; qed");
65+
p2p_cert.peer_id()
66+
}
67+
}
68+
5069
impl Future for Connecting {
5170
type Output = Result<(PeerId, Connection), Error>;
5271

5372
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
54-
let connection = self
55-
.connection
56-
.as_mut()
57-
.expect("Future polled after it has completed");
58-
59-
loop {
60-
let event = match connection.poll_event(cx) {
61-
Poll::Ready(Some(event)) => event,
62-
Poll::Ready(None) => return Poll::Ready(Err(Error::EndpointDriverCrashed)),
63-
Poll::Pending => {
64-
return self
65-
.timeout
66-
.poll_unpin(cx)
67-
.map(|()| Err(Error::HandshakeTimedOut));
68-
}
69-
};
70-
match event {
71-
quinn_proto::Event::Connected => {
72-
// Parse the remote's Id identity from the certificate.
73-
let identity = connection
74-
.peer_identity()
75-
.expect("connection got identity because it passed TLS handshake; qed");
76-
let certificates: Box<Vec<rustls::Certificate>> =
77-
identity.downcast().expect("we rely on rustls feature; qed");
78-
let end_entity = certificates
79-
.get(0)
80-
.expect("there should be exactly one certificate; qed");
81-
let p2p_cert = libp2p_tls::certificate::parse(end_entity)
82-
.expect("the certificate was validated during TLS handshake; qed");
83-
let peer_id = p2p_cert.peer_id();
73+
let connection = match futures::ready!(self.connecting.poll_unpin(cx)) {
74+
Either::Right(_) => return Poll::Ready(Err(Error::HandshakeTimedOut)),
75+
Either::Left((connection, _)) => connection.map_err(ConnectionError)?,
76+
};
8477

85-
return Poll::Ready(Ok((peer_id, self.connection.take().unwrap())));
86-
}
87-
quinn_proto::Event::ConnectionLost { reason } => {
88-
return Poll::Ready(Err(Error::Connection(reason.into())))
89-
}
90-
quinn_proto::Event::HandshakeDataReady | quinn_proto::Event::Stream(_) => {}
91-
quinn_proto::Event::DatagramReceived => {
92-
debug_assert!(false, "Datagrams are not supported")
93-
}
94-
}
95-
}
78+
let peer_id = Self::remote_peer_id(&connection);
79+
let muxer = Connection::new(connection);
80+
Poll::Ready(Ok((peer_id, muxer)))
9681
}
9782
}
+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
// Copyright 2022 Protocol Labs.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a
4+
// copy of this software and associated documentation files (the "Software"),
5+
// to deal in the Software without restriction, including without limitation
6+
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7+
// and/or sell copies of the Software, and to permit persons to whom the
8+
// Software is furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14+
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18+
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19+
// DEALINGS IN THE SOFTWARE.
20+
21+
use std::{
22+
io::{self},
23+
pin::Pin,
24+
task::{Context, Poll},
25+
};
26+
27+
use futures::{AsyncRead, AsyncWrite};
28+
29+
/// A single stream on a connection
30+
pub struct Stream {
31+
/// A send part of the stream
32+
send: quinn::SendStream,
33+
/// A receive part of the stream
34+
recv: quinn::RecvStream,
35+
/// Whether the stream is closed or not
36+
close_result: Option<Result<(), io::ErrorKind>>,
37+
}
38+
39+
impl Stream {
40+
pub(super) fn new(send: quinn::SendStream, recv: quinn::RecvStream) -> Self {
41+
Self {
42+
send,
43+
recv,
44+
close_result: None,
45+
}
46+
}
47+
}
48+
49+
impl AsyncRead for Stream {
50+
fn poll_read(
51+
mut self: Pin<&mut Self>,
52+
cx: &mut Context,
53+
buf: &mut [u8],
54+
) -> Poll<io::Result<usize>> {
55+
if let Some(close_result) = self.close_result {
56+
if close_result.is_err() {
57+
return Poll::Ready(Ok(0));
58+
}
59+
}
60+
Pin::new(&mut self.recv).poll_read(cx, buf)
61+
}
62+
}
63+
64+
impl AsyncWrite for Stream {
65+
fn poll_write(
66+
mut self: Pin<&mut Self>,
67+
cx: &mut Context,
68+
buf: &[u8],
69+
) -> Poll<io::Result<usize>> {
70+
Pin::new(&mut self.send).poll_write(cx, buf)
71+
}
72+
73+
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
74+
Pin::new(&mut self.send).poll_flush(cx)
75+
}
76+
77+
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
78+
if let Some(close_result) = self.close_result {
79+
// For some reason poll_close needs to be 'fuse'able
80+
return Poll::Ready(close_result.map_err(Into::into));
81+
}
82+
let close_result = futures::ready!(Pin::new(&mut self.send).poll_close(cx));
83+
self.close_result = Some(close_result.as_ref().map_err(|e| e.kind()).copied());
84+
Poll::Ready(close_result)
85+
}
86+
}

‎transports/quic/src/connection/substream.rs

-257
This file was deleted.

‎transports/quic/src/endpoint.rs

-674
This file was deleted.

‎transports/quic/src/hole_punching.rs

+14-22
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,39 @@
1-
use std::{net::SocketAddr, time::Duration};
1+
use crate::{provider::Provider, Error};
22

33
use futures::future::Either;
4+
45
use rand::{distributions, Rng};
56

6-
use crate::{
7-
endpoint::{self, ToEndpoint},
8-
Error, Provider,
7+
use std::{
8+
net::{SocketAddr, UdpSocket},
9+
time::Duration,
910
};
1011

1112
pub(crate) async fn hole_puncher<P: Provider>(
12-
endpoint_channel: endpoint::Channel,
13+
socket: UdpSocket,
1314
remote_addr: SocketAddr,
1415
timeout_duration: Duration,
1516
) -> Error {
16-
let punch_holes_future = punch_holes::<P>(endpoint_channel, remote_addr);
17+
let punch_holes_future = punch_holes::<P>(socket, remote_addr);
1718
futures::pin_mut!(punch_holes_future);
1819
match futures::future::select(P::sleep(timeout_duration), punch_holes_future).await {
1920
Either::Left(_) => Error::HandshakeTimedOut,
2021
Either::Right((hole_punch_err, _)) => hole_punch_err,
2122
}
2223
}
2324

24-
async fn punch_holes<P: Provider>(
25-
mut endpoint_channel: endpoint::Channel,
26-
remote_addr: SocketAddr,
27-
) -> Error {
25+
async fn punch_holes<P: Provider>(socket: UdpSocket, remote_addr: SocketAddr) -> Error {
2826
loop {
2927
let sleep_duration = Duration::from_millis(rand::thread_rng().gen_range(10..=200));
3028
P::sleep(sleep_duration).await;
3129

32-
let random_udp_packet = ToEndpoint::SendUdpPacket(quinn_proto::Transmit {
33-
destination: remote_addr,
34-
ecn: None,
35-
contents: rand::thread_rng()
36-
.sample_iter(distributions::Standard)
37-
.take(64)
38-
.collect(),
39-
segment_size: None,
40-
src_ip: None,
41-
});
30+
let contents: Vec<u8> = rand::thread_rng()
31+
.sample_iter(distributions::Standard)
32+
.take(64)
33+
.collect();
4234

43-
if endpoint_channel.send(random_udp_packet).await.is_err() {
44-
return Error::EndpointDriverCrashed;
35+
if let Err(e) = P::send_to(&socket, &contents, remote_addr).await {
36+
return Error::Io(e);
4537
}
4638
}
4739
}

‎transports/quic/src/lib.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -57,16 +57,17 @@
5757
5858
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
5959

60+
mod config;
6061
mod connection;
61-
mod endpoint;
6262
mod hole_punching;
6363
mod provider;
6464
mod transport;
6565

6666
use std::net::SocketAddr;
6767

68-
pub use connection::{Connecting, Connection, Substream};
69-
pub use endpoint::Config;
68+
pub use config::Config;
69+
pub use connection::{Connecting, Connection, Stream};
70+
7071
#[cfg(feature = "async-std")]
7172
pub use provider::async_std;
7273
#[cfg(feature = "tokio")]
@@ -89,8 +90,7 @@ pub enum Error {
8990
#[error(transparent)]
9091
Io(#[from] std::io::Error),
9192

92-
/// The task spawned in [`Provider::spawn`] to drive
93-
/// the quic endpoint has crashed.
93+
/// The task to drive a quic endpoint has crashed.
9494
#[error("Endpoint driver crashed")]
9595
EndpointDriverCrashed,
9696

@@ -110,9 +110,9 @@ pub enum Error {
110110
/// Dialing a remote peer failed.
111111
#[derive(Debug, thiserror::Error)]
112112
#[error(transparent)]
113-
pub struct ConnectError(#[from] quinn_proto::ConnectError);
113+
pub struct ConnectError(quinn::ConnectError);
114114

115115
/// Error on an established [`Connection`].
116116
#[derive(Debug, thiserror::Error)]
117117
#[error(transparent)]
118-
pub struct ConnectionError(#[from] quinn_proto::ConnectionError);
118+
pub struct ConnectionError(quinn::ConnectionError);

‎transports/quic/src/provider.rs

+19-32
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
1919
// DEALINGS IN THE SOFTWARE.
2020

21-
use futures::{future::BoxFuture, Future};
21+
use futures::future::BoxFuture;
2222
use if_watch::IfEvent;
2323
use std::{
2424
io,
25-
net::SocketAddr,
25+
net::{SocketAddr, UdpSocket},
2626
task::{Context, Poll},
2727
time::Duration,
2828
};
@@ -32,40 +32,20 @@ pub mod async_std;
3232
#[cfg(feature = "tokio")]
3333
pub mod tokio;
3434

35-
/// Size of the buffer for reading data 0x10000.
36-
#[cfg(any(feature = "async-std", feature = "tokio"))]
37-
const RECEIVE_BUFFER_SIZE: usize = 65536;
35+
pub enum Runtime {
36+
#[cfg(feature = "tokio")]
37+
Tokio,
38+
#[cfg(feature = "async-std")]
39+
AsyncStd,
40+
Dummy,
41+
}
3842

39-
/// Provider for non-blocking receiving and sending on a [`std::net::UdpSocket`]
40-
/// and spawning tasks.
43+
/// Provider for a corresponding quinn runtime and spawning tasks.
4144
pub trait Provider: Unpin + Send + Sized + 'static {
4245
type IfWatcher: Unpin + Send;
4346

44-
/// Create a new providing that is wrapping the socket.
45-
///
46-
/// Note: The socket must be set to non-blocking.
47-
fn from_socket(socket: std::net::UdpSocket) -> io::Result<Self>;
48-
49-
/// Receive a single packet.
50-
///
51-
/// Returns the message and the address the message came from.
52-
fn poll_recv_from(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<(Vec<u8>, SocketAddr)>>;
53-
54-
/// Set sending a packet on the socket.
55-
///
56-
/// Since only one packet can be sent at a time, this may only be called if a preceding
57-
/// call to [`Provider::poll_send_flush`] returned [`Poll::Ready`].
58-
fn start_send(&mut self, data: Vec<u8>, addr: SocketAddr);
59-
60-
/// Flush a packet send in [`Provider::start_send`].
61-
///
62-
/// If [`Poll::Ready`] is returned the socket is ready for sending a new packet.
63-
fn poll_send_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>;
64-
65-
/// Run the given future in the background until it ends.
66-
///
67-
/// This is used to spawn the task that is driving the endpoint.
68-
fn spawn(future: impl Future<Output = ()> + Send + 'static);
47+
/// Run the corresponding runtime
48+
fn runtime() -> Runtime;
6949

7050
/// Create a new [`if_watch`] watcher that reports [`IfEvent`]s for network interface changes.
7151
fn new_if_watcher() -> io::Result<Self::IfWatcher>;
@@ -78,4 +58,11 @@ pub trait Provider: Unpin + Send + Sized + 'static {
7858

7959
/// Sleep for specified amount of time.
8060
fn sleep(duration: Duration) -> BoxFuture<'static, ()>;
61+
62+
/// Sends data on the socket to the given address. On success, returns the number of bytes written.
63+
fn send_to<'a>(
64+
udp_socket: &'a UdpSocket,
65+
buf: &'a [u8],
66+
target: SocketAddr,
67+
) -> BoxFuture<'a, io::Result<usize>>;
8168
}

‎transports/quic/src/provider/async_std.rs

+16-102
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,10 @@
1818
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
1919
// DEALINGS IN THE SOFTWARE.
2020

21-
use async_std::{net::UdpSocket, task::spawn};
22-
use futures::{future::BoxFuture, ready, Future, FutureExt, Stream, StreamExt};
21+
use futures::{future::BoxFuture, FutureExt};
2322
use std::{
2423
io,
25-
net::SocketAddr,
26-
pin::Pin,
27-
sync::Arc,
24+
net::UdpSocket,
2825
task::{Context, Poll},
2926
time::Duration,
3027
};
@@ -34,65 +31,14 @@ use crate::GenTransport;
3431
/// Transport with [`async-std`] runtime.
3532
pub type Transport = GenTransport<Provider>;
3633

37-
/// Provider for reading / writing to a sockets and spawning
38-
/// tasks using [`async-std`].
39-
pub struct Provider {
40-
socket: Arc<UdpSocket>,
41-
// Future for sending a packet.
42-
// This is needed since [`async_Std::net::UdpSocket`] does not
43-
// provide a poll-style interface for sending a packet.
44-
send_packet: Option<BoxFuture<'static, Result<(), io::Error>>>,
45-
recv_stream: ReceiveStream,
46-
}
34+
/// Provider for quinn runtime and spawning tasks using [`async-std`].
35+
pub struct Provider;
4736

4837
impl super::Provider for Provider {
4938
type IfWatcher = if_watch::smol::IfWatcher;
5039

51-
fn from_socket(socket: std::net::UdpSocket) -> io::Result<Self> {
52-
let socket = Arc::new(socket.into());
53-
let recv_stream = ReceiveStream::new(Arc::clone(&socket));
54-
Ok(Provider {
55-
socket,
56-
send_packet: None,
57-
recv_stream,
58-
})
59-
}
60-
61-
fn poll_recv_from(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<(Vec<u8>, SocketAddr)>> {
62-
match self.recv_stream.poll_next_unpin(cx) {
63-
Poll::Ready(ready) => {
64-
Poll::Ready(ready.expect("ReceiveStream::poll_next never returns None."))
65-
}
66-
Poll::Pending => Poll::Pending,
67-
}
68-
}
69-
70-
fn start_send(&mut self, data: Vec<u8>, addr: SocketAddr) {
71-
let socket = self.socket.clone();
72-
let send = async move {
73-
socket.send_to(&data, addr).await?;
74-
Ok(())
75-
}
76-
.boxed();
77-
self.send_packet = Some(send)
78-
}
79-
80-
fn poll_send_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
81-
let pending = match self.send_packet.as_mut() {
82-
Some(pending) => pending,
83-
None => return Poll::Ready(Ok(())),
84-
};
85-
match pending.poll_unpin(cx) {
86-
Poll::Ready(result) => {
87-
self.send_packet = None;
88-
Poll::Ready(result)
89-
}
90-
Poll::Pending => Poll::Pending,
91-
}
92-
}
93-
94-
fn spawn(future: impl Future<Output = ()> + Send + 'static) {
95-
spawn(future);
40+
fn runtime() -> super::Runtime {
41+
super::Runtime::AsyncStd
9642
}
9743

9844
fn new_if_watcher() -> io::Result<Self::IfWatcher> {
@@ -109,48 +55,16 @@ impl super::Provider for Provider {
10955
fn sleep(duration: Duration) -> BoxFuture<'static, ()> {
11056
async_std::task::sleep(duration).boxed()
11157
}
112-
}
113-
114-
type ReceiveStreamItem = (
115-
Result<(usize, SocketAddr), io::Error>,
116-
Arc<UdpSocket>,
117-
Vec<u8>,
118-
);
11958

120-
/// Wrapper around the socket to implement `Stream` on it.
121-
struct ReceiveStream {
122-
/// Future for receiving a packet on the socket.
123-
// This is needed since [`async_Std::net::UdpSocket`] does not
124-
// provide a poll-style interface for receiving packets.
125-
fut: BoxFuture<'static, ReceiveStreamItem>,
126-
}
127-
128-
impl ReceiveStream {
129-
fn new(socket: Arc<UdpSocket>) -> Self {
130-
let fut = ReceiveStream::next(socket, vec![0; super::RECEIVE_BUFFER_SIZE]).boxed();
131-
Self { fut: fut.boxed() }
132-
}
133-
134-
async fn next(socket: Arc<UdpSocket>, mut socket_recv_buffer: Vec<u8>) -> ReceiveStreamItem {
135-
let recv = socket.recv_from(&mut socket_recv_buffer).await;
136-
(recv, socket, socket_recv_buffer)
137-
}
138-
}
139-
140-
impl Stream for ReceiveStream {
141-
type Item = Result<(Vec<u8>, SocketAddr), io::Error>;
142-
143-
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
144-
let (result, socket, buffer) = ready!(self.fut.poll_unpin(cx));
145-
146-
let result = result.map(|(packet_len, packet_src)| {
147-
debug_assert!(packet_len <= buffer.len());
148-
// Copies the bytes from the `socket_recv_buffer` they were written into.
149-
(buffer[..packet_len].into(), packet_src)
150-
});
151-
// Set the future for receiving the next packet on the stream.
152-
self.fut = ReceiveStream::next(socket, buffer).boxed();
153-
154-
Poll::Ready(Some(result))
59+
fn send_to<'a>(
60+
udp_socket: &'a UdpSocket,
61+
buf: &'a [u8],
62+
target: std::net::SocketAddr,
63+
) -> BoxFuture<'a, io::Result<usize>> {
64+
Box::pin(async move {
65+
async_std::net::UdpSocket::from(udp_socket.try_clone()?)
66+
.send_to(buf, target)
67+
.await
68+
})
15569
}
15670
}

‎transports/quic/src/provider/tokio.rs

+18-51
Original file line numberDiff line numberDiff line change
@@ -18,72 +18,27 @@
1818
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
1919
// DEALINGS IN THE SOFTWARE.
2020

21-
use futures::{future::BoxFuture, ready, Future, FutureExt};
21+
use futures::{future::BoxFuture, FutureExt};
2222
use std::{
2323
io,
24-
net::SocketAddr,
24+
net::{SocketAddr, UdpSocket},
2525
task::{Context, Poll},
2626
time::Duration,
2727
};
28-
use tokio::{io::ReadBuf, net::UdpSocket};
2928

3029
use crate::GenTransport;
3130

3231
/// Transport with [`tokio`] runtime.
3332
pub type Transport = GenTransport<Provider>;
3433

35-
/// Provider for reading / writing to a sockets and spawning
36-
/// tasks using [`tokio`].
37-
pub struct Provider {
38-
socket: UdpSocket,
39-
socket_recv_buffer: Vec<u8>,
40-
next_packet_out: Option<(Vec<u8>, SocketAddr)>,
41-
}
34+
/// Provider for quinn runtime and spawning tasks using [`tokio`].
35+
pub struct Provider;
4236

4337
impl super::Provider for Provider {
4438
type IfWatcher = if_watch::tokio::IfWatcher;
4539

46-
fn from_socket(socket: std::net::UdpSocket) -> std::io::Result<Self> {
47-
let socket = UdpSocket::from_std(socket)?;
48-
Ok(Provider {
49-
socket,
50-
socket_recv_buffer: vec![0; super::RECEIVE_BUFFER_SIZE],
51-
next_packet_out: None,
52-
})
53-
}
54-
55-
fn poll_send_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
56-
let (data, addr) = match self.next_packet_out.as_ref() {
57-
Some(pending) => pending,
58-
None => return Poll::Ready(Ok(())),
59-
};
60-
match self.socket.poll_send_to(cx, data.as_slice(), *addr) {
61-
Poll::Ready(result) => {
62-
self.next_packet_out = None;
63-
Poll::Ready(result.map(|_| ()))
64-
}
65-
Poll::Pending => Poll::Pending,
66-
}
67-
}
68-
69-
fn poll_recv_from(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<(Vec<u8>, SocketAddr)>> {
70-
let Self {
71-
socket,
72-
socket_recv_buffer,
73-
..
74-
} = self;
75-
let mut read_buf = ReadBuf::new(socket_recv_buffer.as_mut_slice());
76-
let packet_src = ready!(socket.poll_recv_from(cx, &mut read_buf)?);
77-
let bytes = read_buf.filled().to_vec();
78-
Poll::Ready(Ok((bytes, packet_src)))
79-
}
80-
81-
fn start_send(&mut self, data: Vec<u8>, addr: SocketAddr) {
82-
self.next_packet_out = Some((data, addr));
83-
}
84-
85-
fn spawn(future: impl Future<Output = ()> + Send + 'static) {
86-
tokio::spawn(future);
40+
fn runtime() -> super::Runtime {
41+
super::Runtime::Tokio
8742
}
8843

8944
fn new_if_watcher() -> io::Result<Self::IfWatcher> {
@@ -100,4 +55,16 @@ impl super::Provider for Provider {
10055
fn sleep(duration: Duration) -> BoxFuture<'static, ()> {
10156
tokio::time::sleep(duration).boxed()
10257
}
58+
59+
fn send_to<'a>(
60+
udp_socket: &'a UdpSocket,
61+
buf: &'a [u8],
62+
target: SocketAddr,
63+
) -> BoxFuture<'a, io::Result<usize>> {
64+
Box::pin(async move {
65+
tokio::net::UdpSocket::from_std(udp_socket.try_clone()?)?
66+
.send_to(buf, target)
67+
.await
68+
})
69+
}
10370
}

‎transports/quic/src/transport.rs

+140-240
Large diffs are not rendered by default.

‎transports/quic/tests/smoke.rs

+23-4
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,7 @@ async fn smoke<P: Provider>() {
428428
assert_eq!(b_connected, a_peer_id);
429429
}
430430

431-
async fn build_streams<P: Provider>() -> (SubstreamBox, SubstreamBox) {
431+
async fn build_streams<P: Provider + Spawn>() -> (SubstreamBox, SubstreamBox) {
432432
let (_, mut a_transport) = create_default_transport::<P>();
433433
let (_, mut b_transport) = create_default_transport::<P>();
434434

@@ -522,7 +522,7 @@ async fn start_listening(transport: &mut Boxed<(PeerId, StreamMuxerBox)>, addr:
522522
}
523523
}
524524

525-
fn prop<P: Provider + BlockOn>(
525+
fn prop<P: Provider + BlockOn + Spawn>(
526526
number_listeners: NonZeroU8,
527527
number_streams: NonZeroU8,
528528
) -> quickcheck::TestResult {
@@ -599,7 +599,7 @@ fn prop<P: Provider + BlockOn>(
599599
quickcheck::TestResult::passed()
600600
}
601601

602-
async fn answer_inbound_streams<P: Provider, const BUFFER_SIZE: usize>(
602+
async fn answer_inbound_streams<P: Provider + Spawn, const BUFFER_SIZE: usize>(
603603
mut connection: StreamMuxerBox,
604604
) {
605605
loop {
@@ -634,7 +634,7 @@ async fn answer_inbound_streams<P: Provider, const BUFFER_SIZE: usize>(
634634
}
635635
}
636636

637-
async fn open_outbound_streams<P: Provider, const BUFFER_SIZE: usize>(
637+
async fn open_outbound_streams<P: Provider + Spawn, const BUFFER_SIZE: usize>(
638638
mut connection: StreamMuxerBox,
639639
number_streams: usize,
640640
completed_streams_tx: mpsc::Sender<()>,
@@ -740,3 +740,22 @@ impl BlockOn for libp2p_quic::tokio::Provider {
740740
.unwrap()
741741
}
742742
}
743+
744+
trait Spawn {
745+
/// Run the given future in the background until it ends.
746+
fn spawn(future: impl Future<Output = ()> + Send + 'static);
747+
}
748+
749+
#[cfg(feature = "async-std")]
750+
impl Spawn for libp2p_quic::async_std::Provider {
751+
fn spawn(future: impl Future<Output = ()> + Send + 'static) {
752+
async_std::task::spawn(future);
753+
}
754+
}
755+
756+
#[cfg(feature = "tokio")]
757+
impl Spawn for libp2p_quic::tokio::Provider {
758+
fn spawn(future: impl Future<Output = ()> + Send + 'static) {
759+
tokio::spawn(future);
760+
}
761+
}

0 commit comments

Comments
 (0)
Please sign in to comment.