diff --git a/Cargo.toml b/Cargo.toml index 9e1790a9d7f..27658346475 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ tokio-io = "0.1" [target.'cfg(not(any(target_os = "emscripten", target_os = "unknown")))'.dependencies] libp2p-dns = { version = "0.2.0", path = "./transports/dns" } libp2p-mdns = { version = "0.2.0", path = "./misc/mdns" } +libp2p-quic = { version = "0.1.0", path = "./transports/quic" } libp2p-tcp = { version = "0.2.0", path = "./transports/tcp" } [target.'cfg(any(target_os = "emscripten", target_os = "unknown"))'.dependencies] @@ -46,7 +47,10 @@ stdweb = { version = "0.4", default-features = false } [dev-dependencies] env_logger = "0.6.0" +openssl = "0.10" +quicli = "0.4" rand = "0.6" +structopt = "0.2" tokio = "0.1" tokio-stdin-stdout = "0.1" void = "1.0" @@ -71,6 +75,7 @@ members = [ "protocols/plaintext", "protocols/secio", "transports/dns", + "transports/quic", "transports/ratelimit", "transports/tcp", "transports/uds", diff --git a/core/src/lib.rs b/core/src/lib.rs index c0a8160c685..93ec1d5da2e 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -87,7 +87,7 @@ pub use self::peer_id::PeerId; pub use self::protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent}; pub use self::public_key::PublicKey; pub use self::swarm::Swarm; -pub use self::transport::Transport; +pub use self::transport::{Transport, TransportError}; pub use self::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, UpgradeError, ProtocolName}; #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] diff --git a/examples/chat-quic.rs b/examples/chat-quic.rs new file mode 100644 index 00000000000..d22e81de87c --- /dev/null +++ b/examples/chat-quic.rs @@ -0,0 +1,164 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use env_logger; +use futures::prelude::*; +use libp2p::{self, NetworkBehaviour, core::PublicKey, tokio_codec::{FramedRead, LinesCodec}}; +use openssl::{rsa::Rsa, x509::X509}; +use quicli::prelude::*; +use structopt::StructOpt; +use std::io; + +// Brief usage instructions: +// +// 1. Generate a private RSA key and self-signed certificate for the listener: +// $ openssl req -newkey rsa:4096 -nodes -keyout serverkey.pem -x509 -out servercert.pem +// +// 2. Generate a private RSA key and self-signed certificate for the dialer: +// $ openssl req -newkey rsa:4096 -nodes -keyout clientkey.pem -x509 -out clientcert.pem +// +// 3. Get the PeerId of the listener: +// $ cargo run --example chat-quic -- peer -c servercert.pem +// +// 4. Start the listener: +// $ cargo run --example chat-quic -- run -k serverkey.pem -c servercert.pem +// +// 5. Notice the listen port in the output of 4. and start the dialer by using the peer ID from 3. +// $ cargo run --example chat-quic -- run -k clientkey.pem -c clientcert.pem -d "/ip4/127.0.0.1/udp//quic/p2p/" + +#[derive(Debug, StructOpt)] +enum Cli { + #[structopt(name = "peer")] + PeerId(PeerId), + #[structopt(name = "run")] + Run(Run) +} + +#[derive(Debug, StructOpt)] +struct PeerId { + #[structopt(long = "cert", short = "c")] + cert: String +} + +#[derive(Debug, StructOpt)] +struct Run { + #[structopt(long = "key", short = "k")] + key: String, + + #[structopt(long = "cert", short = "c")] + cert: String, + + #[structopt(long = "dial", short = "d")] + dial: Option +} + +fn main() -> CliResult { + env_logger::init(); + + match Cli::from_args() { + Cli::PeerId(args) => peerid(args), + Cli::Run(args) => run(args) + } +} + +fn peerid(args: PeerId) -> CliResult { + let certificate = X509::from_pem(read_file(&args.cert)?.as_bytes())?; + let public_key = PublicKey::Rsa(certificate.public_key()?.public_key_to_der()?); + println!("Peer ID: {}", public_key.into_peer_id().to_base58()); + Ok(()) +} + +fn run(args: Run) -> CliResult { + let private_key = Rsa::private_key_from_pem(read_file(&args.key)?.as_bytes())?; + let certificate = X509::from_pem(read_file(&args.cert)?.as_bytes())?; + let public_key = PublicKey::Rsa(certificate.public_key()?.public_key_to_der()?); + + let rt = tokio::runtime::Runtime::new()?; + + let transport = libp2p_quic::QuicConfig::new(rt.executor(), &private_key, &certificate)?; + + let floodsub_topic = libp2p::floodsub::TopicBuilder::new("chat").build(); + + #[derive(NetworkBehaviour)] + struct MyBehaviour { + floodsub: libp2p::floodsub::Floodsub + } + + impl libp2p::core::swarm::NetworkBehaviourEventProcess for MyBehaviour { + // Called when `floodsub` produces an event. + fn inject_event(&mut self, message: libp2p::floodsub::FloodsubEvent) { + if let libp2p::floodsub::FloodsubEvent::Message(message) = message { + println!("Received: '{:?}' from {:?}", String::from_utf8_lossy(&message.data), message.source); + } + } + } + + // Create a Swarm to manage peers and events + let mut swarm = { + let mut behaviour = MyBehaviour { + floodsub: libp2p::floodsub::Floodsub::new(public_key.clone().into_peer_id()) + }; + behaviour.floodsub.subscribe(floodsub_topic.clone()); + libp2p::Swarm::new(transport, behaviour, libp2p::core::topology::MemoryTopology::empty(public_key)) + }; + + let addr = libp2p::Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/udp/0/quic".parse()?)?; + println!("Listening on {:?}", addr); + + // Reach out to another node if specified + if let Some(to_dial) = args.dial { + match to_dial.parse() { + Ok(addr) => { + match libp2p::Swarm::dial_addr(&mut swarm, addr) { + Ok(_) => println!("Dialed {:?}", to_dial), + Err(e) => println!("Dial {:?} failed: {:?}", to_dial, e) + } + } + Err(err) => println!("Failed to parse address to dial: {:?}", err) + } + } + + // Read full lines from stdin + let stdin = tokio_stdin_stdout::stdin(0); + let mut framed_stdin = FramedRead::new(stdin, LinesCodec::new()); + + // Kick it off + rt.block_on_all(futures::future::poll_fn(move || { + loop { + match framed_stdin.poll()? { + Async::Ready(Some(line)) => swarm.floodsub.publish(&floodsub_topic, line.as_bytes()), + Async::Ready(None) => return Err(io::Error::new(io::ErrorKind::Other, "stdin closed")), + Async::NotReady => break + }; + } + + loop { + match swarm.poll()? { + Async::Ready(Some(_)) => {} + Async::Ready(None) | Async::NotReady => break + } + } + + Ok(Async::NotReady) + }))?; + + Ok(()) +} + diff --git a/src/lib.rs b/src/lib.rs index 2392355da34..bffc6c37125 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -162,6 +162,9 @@ pub use libp2p_mdns as mdns; pub use libp2p_ping as ping; #[doc(inline)] pub use libp2p_plaintext as plaintext; +#[cfg(not(any(target_os = "emscripten", target_os = "unknown")))] +#[doc(inline)] +pub use libp2p_quic as quic; #[doc(inline)] pub use libp2p_ratelimit as ratelimit; #[doc(inline)] diff --git a/transports/quic/Cargo.toml b/transports/quic/Cargo.toml new file mode 100644 index 00000000000..98c1102da63 --- /dev/null +++ b/transports/quic/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "libp2p-quic" +version = "0.1.0" +authors = ["Parity Technologies "] +license = "MIT" +edition = "2018" + +[dependencies] +bytes = "0.4" +failure = "0.1" +libp2p-core = { path = "../../core" } +log = "0.4" +fnv = "1.0" +futures = "0.1" +multiaddr = { package = "parity-multiaddr", version = "0.1.0", path = "../../misc/multiaddr" } +multihash = { package = "parity-multihash", version = "0.1.0", path = "../../misc/multihash" } +openssl = "0.10" +parking_lot = "0.5" +picoquic = { git = "https://github.com/bkchr/picoquic-rs", rev = "60016b1be4c417a9c4e357201afdd3c42ca31e82" } +tokio-executor = "0.1" diff --git a/transports/quic/src/error.rs b/transports/quic/src/error.rs new file mode 100644 index 00000000000..e10b5374795 --- /dev/null +++ b/transports/quic/src/error.rs @@ -0,0 +1,78 @@ +use failure::Fail; +use openssl::error::ErrorStack; +use picoquic; +use std::{error::Error, fmt, io}; + +#[derive(Debug)] +pub struct QuicError { + kind: ErrorKind, + source: Option> +} + +#[derive(Debug)] +pub enum ErrorKind { + Io, + PicoQuic, + OpenSsl, + InvalidPeerId, + PeerIdMismatch, + #[doc(hidden)] + __Nonexhaustive +} + +impl Into for ErrorKind { + fn into(self) -> QuicError { + QuicError { kind: self, source: None } + } +} + +impl fmt::Display for QuicError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self.kind { + ErrorKind::Io => write!(f, "i/o: {:?}", self.source), + ErrorKind::PicoQuic => write!(f, "picoquic: {:?}", self.source), + ErrorKind::OpenSsl => write!(f, "openssl: {:?}", self.source), + ErrorKind::InvalidPeerId => f.write_str("invalid peer ID"), + ErrorKind::PeerIdMismatch => f.write_str("peer IDs do not match"), + ErrorKind::__Nonexhaustive => f.write_str("__Nonexhaustive") + } + } +} + +impl Error for QuicError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + if let Some(ref s) = self.source { + Some(s.as_ref()) + } else { + None + } + } +} + +impl From for QuicError { + fn from(e: io::Error) -> Self { + QuicError { + kind: ErrorKind::Io, + source: Some(Box::new(e)) + } + } +} + +impl From for QuicError { + fn from(e: ErrorStack) -> Self { + QuicError { + kind: ErrorKind::OpenSsl, + source: Some(Box::new(e)) + } + } +} + +impl From for QuicError { + fn from(e: picoquic::Error) -> Self { + QuicError { + kind: ErrorKind::PicoQuic, + source: Some(Box::new(e.compat())) + } + } +} + diff --git a/transports/quic/src/lib.rs b/transports/quic/src/lib.rs new file mode 100644 index 00000000000..d4cd178fdea --- /dev/null +++ b/transports/quic/src/lib.rs @@ -0,0 +1,492 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Implementation of the libp2p `Transport` trait for QUIC over UDP. + +mod error; + +use bytes::BytesMut; +use crate::error::{QuicError, ErrorKind}; +use fnv::FnvHashMap; +use futures::{future::{self, FutureResult}, prelude::*}; +use libp2p_core::{muxing::Shutdown, PeerId, PublicKey, StreamMuxer, Transport, TransportError}; +use log::{debug, trace, warn}; +use multiaddr::{Multiaddr, Protocol}; +use multihash::Multihash; +use openssl::{error::ErrorStack, pkey::Private, rsa::Rsa, stack::StackRef, x509::{X509Ref, X509}}; +use parking_lot::Mutex; +use picoquic; +use std::{ + cmp, + fmt, io, iter, + net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, + sync::Arc +}; +use tokio_executor::{Executor, SpawnError}; + +/// Represents the configuration for a QUIC transport capability for libp2p. +#[derive(Clone)] +pub struct QuicConfig { + /// task executor + executor: Exec, + /// RSA private key + private_key: Vec, + /// self-signed ceritficate + certificates: Vec>, + /// Address to use when establishing an outgoing IPv4 connection. Port can be 0 for "any port". + /// If the port is 0, it will be different for each outgoing connection. + ipv4_src_addr: SocketAddrV4, + /// Equivalent for `ipv4_src_addr` for IPv6. + ipv6_src_addr: SocketAddrV6, + /// picoquic context used for dialing (lazily initialised) + dialing_context: Arc>>, + /// The certificate verifier puts the public key of a dialed connection in here. + public_keys: Arc>>, + /// Should the peer ID be verified. + verify_peer_id: bool +} + +impl fmt::Debug for QuicConfig { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("QuicConfig") + .field("ipv4_src_addr", &self.ipv4_src_addr) + .field("ipv6_src_addr", &self.ipv6_src_addr) + .finish() + } +} + +impl QuicConfig { + /// Creates a new configuration object for QUIC. + pub fn new(e: E, key: &Rsa, cert: &X509) -> Result + where + E: Executor + Send + 'static + { + Ok(QuicConfig { + executor: Exec { inner: Arc::new(Mutex::new(e))}, + private_key: key.private_key_to_der()?, + certificates: vec![cert.to_der()?], + ipv4_src_addr: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0), + ipv6_src_addr: SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0), 0, 0, 0), + dialing_context: Arc::new(Mutex::new(None)), + public_keys: Arc::new(Mutex::new(Default::default())), + verify_peer_id: true + }) + } + + /// Sets the source port to use for outgoing connections. + /// + /// If 0, means a different port for each new connection. + pub fn source_port(mut self, port: u16) -> Self { + self.ipv4_src_addr.set_port(port); + self.ipv6_src_addr.set_port(port); + self + } + + /// Disable or enable peer ID verification. + /// + /// By default this setting is enabled and requires multi-addresses to end + /// whith `/p2p/`. If disabled, there is no + /// such requirement but the public key received from remote will not be + /// verified in any way, so there is no guarantee it actually is the public + /// key of the peer. + pub fn verify_peer_id(mut self, value: bool) -> Self { + self.verify_peer_id = value; + self + } + + fn set_dialing_context(&self, a: &SocketAddr) -> Result<(), QuicError> { + if self.dialing_context.lock().is_some() { + return Ok(()) + } + + let mut config = picoquic::Config::new(); + config.set_private_key(self.private_key.clone(), picoquic::FileFormat::DER); + config.set_certificate_chain(self.certificates.clone(), picoquic::FileFormat::DER); + config.set_verify_certificate_handler(PublicKeySaver(self.public_keys.clone())); + + *self.dialing_context.lock() = + Some(picoquic::Context::new(&a, self.executor.clone(), config)?); + + Ok(()) + } +} + +impl Transport for QuicConfig { + type Output = (PeerId, QuicMuxer); + type Error = QuicError; + type Listener = QuicListenStream; + type ListenerUpgrade = FutureResult; + type Dial = QuicDialFut; + + fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), TransportError> { + let listen_addr = + if let Ok((sa, _)) = multiaddr_to_socketaddr(&addr) { + sa + } else { + return Err(TransportError::MultiaddrNotSupported(addr)) + }; + + let public_keys = Arc::new(Mutex::new(Default::default())); + + let mut config = picoquic::Config::new(); + config.set_private_key(self.private_key.clone(), picoquic::FileFormat::DER); + config.set_certificate_chain(self.certificates.clone(), picoquic::FileFormat::DER); + config.enable_client_authentication(); + config.set_verify_certificate_handler(PublicKeySaver(public_keys.clone())); + + let context = picoquic::Context::new(&listen_addr, self.executor.clone(), config) + .map_err(|e| TransportError::Other(e.into()))?; + + let actual_addr = socket_addr_to_quic(context.local_addr()); + debug!("Listening on {}; actual_addr = {}", listen_addr, actual_addr); + + Ok((QuicListenStream { inner: context, public_keys }, actual_addr)) + } + + fn dial(self, addr: Multiaddr) -> Result> { + let (target_addr, hash) = + match multiaddr_to_socketaddr(&addr) { + Ok(val) => val, + Err(_) => return Err(TransportError::MultiaddrNotSupported(addr)) + }; + + // As an optimization, we check that the address is not of the form `0.0.0.0`. + // If so, we instantly refuse dialing instead of going through the kernel. + if target_addr.port() == 0 || target_addr.ip().is_unspecified() { + debug!("Instantly refusing dialing {}, as it is invalid", addr); + return Err(TransportError::MultiaddrNotSupported(addr)) + } + + let listen_addr = if target_addr.is_ipv4() { + SocketAddr::from(self.ipv4_src_addr.clone()) + } else { + SocketAddr::from(self.ipv6_src_addr.clone()) + }; + + let peer_id = + if self.verify_peer_id { + if let Some(h) = hash { + Some(PeerId::from_multihash(h).map_err(|_| { + TransportError::Other(ErrorKind::InvalidPeerId.into()) + })?) + } else { + return Err(TransportError::Other(ErrorKind::InvalidPeerId.into())) + } + } else { + None + }; + + self.set_dialing_context(&listen_addr).map_err(TransportError::Other)?; + + let connection = self.dialing_context.lock() + .as_mut() + .expect("dialing context has been set one line above") + .new_connection(target_addr, String::new()); + + debug!("Dialing {}", addr); + + Ok(QuicDialFut { + peer_id, + context: Some(self.dialing_context.clone()), + connection, + public_keys: self.public_keys.clone() + }) + } + + fn nat_traversal(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option { + // TODO: implement after https://github.com/libp2p/rust-libp2p/pull/550 + None + } +} + +/// Wrapper around `Executor` to derive `Clone`. +#[derive(Clone)] +struct Exec { + inner: Arc> +} + +impl Executor for Exec { + fn spawn(&mut self, fut: Box + Send>) -> Result<(), SpawnError> { + self.inner.lock().spawn(fut) + } +} + +/// An open connection. Implements `StreamMuxer`. +pub struct QuicMuxer { + _context: Option>>>, + inner: Mutex +} + +impl fmt::Debug for QuicMuxer { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str("QuicMuxer") + } +} + +/// A QUIC substream. +pub struct QuicMuxerSubstream { + /// The actual stream from picoquic. + inner: picoquic::Stream, + /// Data waiting to be read. + pending_read: BytesMut +} + +impl fmt::Debug for QuicMuxerSubstream { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str("QuicMuxerSubstream") + } +} + +/// A QUIC substream being opened. +pub struct QuicMuxerOutboundSubstream { + /// The actual stream from picoquic. + inner: picoquic::NewStreamFuture +} + +impl fmt::Debug for QuicMuxerOutboundSubstream { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str("QuicMuxerOutboundSubstream") + } +} + +impl StreamMuxer for QuicMuxer { + type Substream = QuicMuxerSubstream; + type OutboundSubstream = QuicMuxerOutboundSubstream; + + fn poll_inbound(&self) -> Poll, io::Error> { + match self.inner.lock().poll().map_err(convert_err)? { + Async::Ready(Some(substream)) => Ok(Async::Ready(Some(QuicMuxerSubstream { + inner: substream, + pending_read: BytesMut::new(), + }))), + Async::Ready(None) => Ok(Async::Ready(None)), + Async::NotReady => Ok(Async::NotReady), + } + } + + fn open_outbound(&self) -> Self::OutboundSubstream { + QuicMuxerOutboundSubstream { + inner: self.inner.lock().new_bidirectional_stream() + } + } + + fn poll_outbound(&self, sub: &mut Self::OutboundSubstream) -> Poll, io::Error> { + Ok(sub.inner.poll() + .map_err(convert_err)? + .map(|sub| { + Some(QuicMuxerSubstream { inner: sub, pending_read: BytesMut::new() }) + })) + } + + fn destroy_outbound(&self, _: Self::OutboundSubstream) {} + + fn read_substream(&self, sub: &mut Self::Substream, buf: &mut [u8]) -> Poll { + while sub.pending_read.is_empty() { + match sub.inner.poll().map_err(convert_err)? { + Async::Ready(Some(data)) => sub.pending_read = data, + Async::Ready(None) => return Ok(Async::Ready(0)), + Async::NotReady => return Ok(Async::NotReady) + } + } + let n = cmp::min(buf.len(), sub.pending_read.len()); + (&mut buf[.. n]).copy_from_slice(&sub.pending_read[.. n]); + sub.pending_read.advance(n); + Ok(Async::Ready(n)) + } + + fn write_substream(&self, sub: &mut Self::Substream, buf: &[u8]) -> Poll { + let len = buf.len(); + match sub.inner.start_send(buf.to_vec().into()).map_err(convert_err)? { + AsyncSink::Ready => Ok(Async::Ready(len)), + AsyncSink::NotReady(_) => Ok(Async::NotReady) + } + } + + fn flush_substream(&self, substream: &mut Self::Substream) -> Poll<(), io::Error> { + substream.inner.poll_complete().map_err(convert_err) + } + + fn shutdown_substream(&self, _: &mut Self::Substream, _: Shutdown) -> Poll<(), io::Error> { + Ok(Async::Ready(())) + } + + fn destroy_substream(&self, _: Self::Substream) {} + + fn shutdown(&self, _: Shutdown) -> Poll<(), io::Error> { + Ok(Async::Ready(())) + } + + fn flush_all(&self) -> Poll<(), io::Error> { + Ok(Async::Ready(())) + } +} + +/// If `addr` is a QUIC address, returns the corresponding `SocketAddr`. +fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result<(SocketAddr, Option), ()> { + let mut iter = addr.iter(); + let proto1 = iter.next().ok_or(())?; + let proto2 = iter.next().ok_or(())?; + let proto3 = iter.next().ok_or(())?; + let proto4 = iter.next(); + + if iter.next().is_some() { + return Err(()); + } + + match (proto1, proto2, proto3, proto4) { + (Protocol::Ip4(ip), Protocol::Udp(port), Protocol::Quic, None) => { + Ok((SocketAddr::new(ip.into(), port), None)) + } + (Protocol::Ip6(ip), Protocol::Udp(port), Protocol::Quic, None) => { + Ok((SocketAddr::new(ip.into(), port), None)) + } + (Protocol::Ip4(ip), Protocol::Udp(port), Protocol::Quic, Some(Protocol::P2p(hash))) => { + Ok((SocketAddr::new(ip.into(), port), Some(hash))) + } + (Protocol::Ip6(ip), Protocol::Udp(port), Protocol::Quic, Some(Protocol::P2p(hash))) => { + Ok((SocketAddr::new(ip.into(), port), Some(hash))) + } + _ => Err(()), + } +} + +/// Converts a `SocketAddr` into a QUIC multiaddr. +fn socket_addr_to_quic(addr: SocketAddr) -> Multiaddr { + iter::once(Protocol::from(addr.ip())) + .chain(iter::once(Protocol::Udp(addr.port()))) + .chain(iter::once(Protocol::Quic)) + .collect() +} + +/// Future that dials an address. +#[must_use = "futures do nothing unless polled"] +pub struct QuicDialFut { + peer_id: Option, + context: Option>>>, + connection: picoquic::NewConnectionFuture, + public_keys: Arc>> +} + +impl fmt::Debug for QuicDialFut { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("QuicDialFut").field("peer_id", &self.peer_id).finish() + } +} + +impl Future for QuicDialFut { + type Item = (PeerId, QuicMuxer); + type Error = QuicError; + + fn poll(&mut self) -> Poll { + match self.connection.poll() { + Ok(Async::Ready(stream)) => { + let public_key = self.public_keys.lock() + .remove(&stream.id()) + .expect("picoquic calls certificate validator which saves the public key"); + let peer_id = public_key.into_peer_id(); + if let Some(ref id) = self.peer_id { + if id != &peer_id { + warn!("peer id mismatch: {:?} != {:?}", self.peer_id, peer_id); + return Err(ErrorKind::PeerIdMismatch.into()) + } + } + trace!("outgoing connection to {:?}", peer_id); + let muxer = QuicMuxer { + _context: self.context.take(), + inner: Mutex::new(stream) + }; + Ok(Async::Ready((peer_id, muxer))) + } + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(e) => { + warn!("dial error: {}", e); + Err(e.into()) + } + } + } +} + +/// Stream that listens on an TCP/IP address. +#[must_use = "futures do nothing unless polled"] +pub struct QuicListenStream { + inner: picoquic::Context, + public_keys: Arc>>, +} + +impl fmt::Debug for QuicListenStream { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "QuicListenStream") + } +} + +impl Stream for QuicListenStream { + type Item = (FutureResult<(PeerId, QuicMuxer), QuicError>, Multiaddr); + type Error = QuicError; + + fn poll(&mut self) -> Poll, Self::Error> { + match self.inner.poll() { + Ok(Async::Ready(Some(stream))) => { + let public_key = self.public_keys.lock() + .remove(&stream.id()) + .expect("picoquic calls certificate validator which saves the public key"); + let peer_id = public_key.into_peer_id(); + trace!("incoming connection to {:?}", peer_id); + let addr = socket_addr_to_quic(stream.peer_addr()); + let muxer = QuicMuxer { + _context: None, + inner: Mutex::new(stream) + }; + Ok(Async::Ready(Some((future::ok((peer_id, muxer)), addr)))) + } + Ok(Async::Ready(None)) => Ok(Async::Ready(None)), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(e) => { + warn!("listen error: {}", e); + Err(e.into()) + } + } + } +} + +/// Implementation of `picoquic::VerifyCertificate` thas just saves the +/// peer ID of the given connection. +struct PublicKeySaver(Arc>>); + +impl picoquic::VerifyCertificate for PublicKeySaver { + fn verify( + &mut self, + id: picoquic::ConnectionId, + _: picoquic::ConnectionType, + cert: &X509Ref, + _: &StackRef + ) -> Result + { + let public_key = PublicKey::Rsa(cert.public_key()?.public_key_to_der()?); + self.0.lock().insert(id, public_key); + Ok(true) + } +} + +/// Converts a picoquic error into an IO error. +// TODO: eventually remove ; this is bad design +fn convert_err(error: picoquic::Error) -> io::Error { + io::Error::new(io::ErrorKind::Other, error.to_string()) +} +