Skip to content

Commit a714864

Browse files
authoredNov 17, 2022
feat: Add WebRTC transport (#2622)
Hey 👋 This is a WebRTC transport implemented in accordance w/ the [spec](libp2p/specs#412). It's based on the [webrtc-rs](https://github.com/webrtc-rs/webrtc) library. Resolves: #1066.
1 parent 43fdfe2 commit a714864

26 files changed

+4220
-3
lines changed
 

‎Cargo.toml

+6-2
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ full = [
4444
"wasm-bindgen",
4545
"wasm-ext",
4646
"wasm-ext-websocket",
47+
"webrtc",
4748
"websocket",
4849
"yamux",
4950
]
@@ -75,11 +76,12 @@ rsa = ["libp2p-core/rsa"]
7576
secp256k1 = ["libp2p-core/secp256k1"]
7677
serde = ["libp2p-core/serde", "libp2p-kad?/serde", "libp2p-gossipsub?/serde"]
7778
tcp = ["dep:libp2p-tcp"]
78-
tokio = ["libp2p-swarm/tokio", "libp2p-mdns?/tokio", "libp2p-tcp?/tokio", "libp2p-dns?/tokio", "libp2p-quic?/tokio"]
79+
tokio = ["libp2p-swarm/tokio", "libp2p-mdns?/tokio", "libp2p-tcp?/tokio", "libp2p-dns?/tokio", "libp2p-quic?/tokio", "libp2p-webrtc?/tokio"]
7980
uds = ["dep:libp2p-uds"]
8081
wasm-bindgen = ["futures-timer/wasm-bindgen", "instant/wasm-bindgen", "getrandom/js"]
8182
wasm-ext = ["dep:libp2p-wasm-ext"]
8283
wasm-ext-websocket = ["wasm-ext", "libp2p-wasm-ext?/websocket"]
84+
webrtc = ["dep:libp2p-webrtc", "libp2p-webrtc?/pem"]
8385
websocket = ["dep:libp2p-websocket"]
8486
yamux = ["dep:libp2p-yamux"]
8587

@@ -108,6 +110,7 @@ libp2p-request-response = { version = "0.23.0", path = "protocols/request-respon
108110
libp2p-swarm = { version = "0.41.0", path = "swarm" }
109111
libp2p-uds = { version = "0.37.0", path = "transports/uds", optional = true }
110112
libp2p-wasm-ext = { version = "0.38.0", path = "transports/wasm-ext", optional = true }
113+
libp2p-webrtc = { version = "0.1.0-alpha", path = "transports/webrtc", optional = true }
111114
libp2p-yamux = { version = "0.42.0", path = "muxers/yamux", optional = true }
112115
multiaddr = { version = "0.16.0" }
113116
parking_lot = "0.12.0"
@@ -168,7 +171,8 @@ members = [
168171
"transports/tcp",
169172
"transports/uds",
170173
"transports/websocket",
171-
"transports/wasm-ext"
174+
"transports/wasm-ext",
175+
"transports/webrtc"
172176
]
173177

174178
[[example]]

‎misc/prost-codec/CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1+
12
# 0.3.0 [unreleased]
23

4+
- Implement `From` trait for `std::io::Error`. See [PR 2622].
35
- Don't leak `prost` dependency in `Error` type. See [PR 3058].
46

7+
[PR 2622]: https://github.com/libp2p/rust-libp2p/pull/2622/
58
[PR 3058]: https://github.com/libp2p/rust-libp2p/pull/3058/
69

710
# 0.2.0

‎misc/prost-codec/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] }
2020
[dev-dependencies]
2121
prost-build = "0.11"
2222

23-
# Passing arguments to the docsrs builder in order to properly document cfg's.
23+
# Passing arguments to the docsrs builder in order to properly document cfg's.
2424
# More information: https://docs.rs/about/builds#cross-compiling
2525
[package.metadata.docs.rs]
2626
all-features = true

‎misc/prost-codec/src/lib.rs

+6
Original file line numberDiff line numberDiff line change
@@ -65,3 +65,9 @@ impl<In, Out: Message + Default> Decoder for Codec<In, Out> {
6565
#[derive(thiserror::Error, Debug)]
6666
#[error("Failed to encode/decode message")]
6767
pub struct Error(#[from] std::io::Error);
68+
69+
impl From<Error> for std::io::Error {
70+
fn from(e: Error) -> Self {
71+
e.0
72+
}
73+
}

‎src/lib.rs

+4
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,10 @@ pub use libp2p_uds as uds;
125125
#[cfg(feature = "wasm-ext")]
126126
#[doc(inline)]
127127
pub use libp2p_wasm_ext as wasm_ext;
128+
#[cfg(feature = "webrtc")]
129+
#[cfg_attr(docsrs, doc(cfg(feature = "webrtc")))]
130+
#[doc(inline)]
131+
pub use libp2p_webrtc as webrtc;
128132
#[cfg(feature = "websocket")]
129133
#[cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))]
130134
#[doc(inline)]

‎transports/webrtc/Cargo.toml

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
[package]
2+
name = "libp2p-webrtc"
3+
version = "0.1.0-alpha"
4+
authors = ["Parity Technologies <admin@parity.io>"]
5+
description = "WebRTC transport for libp2p"
6+
repository = "https://github.com/libp2p/rust-libp2p"
7+
license = "MIT"
8+
edition = "2021"
9+
keywords = ["peer-to-peer", "libp2p", "networking"]
10+
categories = ["network-programming", "asynchronous"]
11+
12+
[dependencies]
13+
async-trait = "0.1"
14+
asynchronous-codec = "0.6.1"
15+
bytes = "1"
16+
futures = "0.3"
17+
futures-timer = "3"
18+
hex = "0.4"
19+
if-watch = "2.0"
20+
libp2p-core = { version = "0.38.0", path = "../../core" }
21+
libp2p-noise = { version = "0.41.0", path = "../../transports/noise" }
22+
log = "0.4"
23+
multihash = { version = "0.16", default-features = false, features = ["sha2"] }
24+
prost = "0.11"
25+
prost-codec = { version = "0.3.0", path = "../../misc/prost-codec" }
26+
rand = "0.8"
27+
rcgen = "0.9.3"
28+
serde = { version = "1.0", features = ["derive"] }
29+
stun = "0.4"
30+
thiserror = "1"
31+
tinytemplate = "1.2"
32+
tokio = { version = "1.19", features = ["net"], optional = true}
33+
tokio-util = { version = "0.7", features = ["compat"], optional = true }
34+
webrtc = { version = "0.6.0", optional = true }
35+
36+
[features]
37+
tokio = ["dep:tokio", "dep:tokio-util", "dep:webrtc"]
38+
pem = ["webrtc?/pem"]
39+
40+
[build-dependencies]
41+
prost-build = "0.11"
42+
43+
[dev-dependencies]
44+
anyhow = "1.0"
45+
env_logger = "0.9"
46+
hex-literal = "0.3"
47+
libp2p = { path = "../..", features = ["full"] }
48+
tokio = { version = "1.19", features = ["full"] }
49+
unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] }
50+
void = "1"
51+
52+
[[test]]
53+
name = "smoke"
54+
required-features = ["tokio"]
55+
56+
[[example]]
57+
name = "listen_ping"
58+
required-features = ["tokio"]

‎transports/webrtc/build.rs

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
// Copyright 2022 Protocol Labs.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a
4+
// copy of this software and associated documentation files (the "Software"),
5+
// to deal in the Software without restriction, including without limitation
6+
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7+
// and/or sell copies of the Software, and to permit persons to whom the
8+
// Software is furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14+
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18+
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19+
// DEALINGS IN THE SOFTWARE.
20+
21+
fn main() {
22+
prost_build::compile_protos(&["src/message.proto"], &["src"]).unwrap();
23+
}
+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
use anyhow::Result;
2+
use futures::StreamExt;
3+
use libp2p::swarm::{keep_alive, NetworkBehaviour};
4+
use libp2p::Transport;
5+
use libp2p::{ping, Swarm};
6+
use libp2p_core::identity;
7+
use libp2p_core::muxing::StreamMuxerBox;
8+
use rand::thread_rng;
9+
use void::Void;
10+
11+
/// An example WebRTC server that will accept connections and run the ping protocol on them.
12+
#[tokio::main]
13+
async fn main() -> Result<()> {
14+
let mut swarm = create_swarm()?;
15+
16+
swarm.listen_on("/ip4/127.0.0.1/udp/0/webrtc".parse()?)?;
17+
18+
loop {
19+
let event = swarm.next().await.unwrap();
20+
eprintln!("New event: {event:?}")
21+
}
22+
}
23+
24+
fn create_swarm() -> Result<Swarm<Behaviour>> {
25+
let id_keys = identity::Keypair::generate_ed25519();
26+
let peer_id = id_keys.public().to_peer_id();
27+
let transport = libp2p_webrtc::tokio::Transport::new(
28+
id_keys,
29+
libp2p_webrtc::tokio::Certificate::generate(&mut thread_rng())?,
30+
);
31+
32+
let transport = transport
33+
.map(|(peer_id, conn), _| (peer_id, StreamMuxerBox::new(conn)))
34+
.boxed();
35+
36+
Ok(Swarm::with_tokio_executor(
37+
transport,
38+
Behaviour::default(),
39+
peer_id,
40+
))
41+
}
42+
43+
#[derive(NetworkBehaviour, Default)]
44+
#[behaviour(out_event = "Event", event_process = false)]
45+
struct Behaviour {
46+
ping: ping::Behaviour,
47+
keep_alive: keep_alive::Behaviour,
48+
}
49+
50+
#[derive(Debug)]
51+
#[allow(clippy::large_enum_variant)]
52+
enum Event {
53+
Ping(ping::Event),
54+
}
55+
56+
impl From<ping::Event> for Event {
57+
fn from(e: ping::Event) -> Self {
58+
Event::Ping(e)
59+
}
60+
}
61+
62+
impl From<Void> for Event {
63+
fn from(event: Void) -> Self {
64+
void::unreachable(event)
65+
}
66+
}

‎transports/webrtc/src/lib.rs

+90
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
// Copyright 2022 Parity Technologies (UK) Ltd.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a
4+
// copy of this software and associated documentation files (the "Software"),
5+
// to deal in the Software without restriction, including without limitation
6+
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7+
// and/or sell copies of the Software, and to permit persons to whom the
8+
// Software is furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14+
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18+
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19+
// DEALINGS IN THE SOFTWARE.
20+
21+
//! Implementation of the [`libp2p_core::Transport`] trait for WebRTC protocol without a signaling
22+
//! server.
23+
//!
24+
//! # Overview
25+
//!
26+
//! ## ICE
27+
//!
28+
//! RFCs: 8839, 8445 See also:
29+
//! <https://tools.ietf.org/id/draft-ietf-rtcweb-sdp-08.html#rfc.section.5.2.3>
30+
//!
31+
//! The WebRTC protocol uses ICE in order to establish a connection.
32+
//!
33+
//! In a typical ICE setup, there are two endpoints, called agents, that want to communicate. One
34+
//! of these two agents can be the local browser, while the other agent is the target of the
35+
//! connection.
36+
//!
37+
//! Even though in this specific context all we want is a simple client-server communication, it is
38+
//! helpful to keep in mind that ICE was designed to solve the problem of NAT traversal.
39+
//!
40+
//! The ICE workflow works as follows:
41+
//!
42+
//! - An "offerer" determines ways in which it could be accessible (either an
43+
//! IP address or through a relay using a TURN server), which are called "candidates". It then
44+
//! generates a small text payload in a format called SDP, that describes the request for a
45+
//! connection.
46+
//! - The offerer sends this SDP-encoded message to the answerer. The medium through which this
47+
//! exchange is done is out of scope of the ICE protocol.
48+
//! - The answerer then finds its own candidates, and generates an answer, again in the SDP format.
49+
//! This answer is sent back to the offerer.
50+
//! - Each agent then tries to connect to the remote's candidates.
51+
//!
52+
//! We pretend to send the offer to the remote agent (the target of the connection), then pretend
53+
//! that it has found a valid IP address for itself (i.e. a candidate), then pretend that the SDP
54+
//! answer containing this candidate has been sent back. This will cause the offerer to execute
55+
//! step 4: try to connect to the remote's candidate.
56+
//!
57+
//! ## TCP or UDP
58+
//!
59+
//! WebRTC by itself doesn't hardcode any specific protocol for media streams. Instead, it is the
60+
//! SDP message of the offerer that specifies which protocol to use. In our use case (one or more
61+
//! data channels), we know that the offerer will always request either TCP+DTLS+SCTP, or
62+
//! UDP+DTLS+SCTP.
63+
//!
64+
//! The implementation only supports UDP at the moment, so if the offerer requests TCP+DTLS+SCTP, it
65+
//! will not respond. Support for TCP may be added in the future (see
66+
//! <https://github.com/webrtc-rs/webrtc/issues/132>).
67+
//!
68+
//! ## DTLS+SCTP
69+
//!
70+
//! RFCs: 8841, 8832
71+
//!
72+
//! In both cases (TCP or UDP), the next layer is DTLS. DTLS is similar to the well-known TLS
73+
//! protocol, except that it doesn't guarantee ordering of delivery (as this is instead provided by
74+
//! the SCTP layer on top of DTLS). In other words, once the TCP or UDP connection is established,
75+
//! the browser will try to perform a DTLS handshake.
76+
//!
77+
//! During the ICE negotiation, each agent must include in its SDP packet a hash of the self-signed
78+
//! certificate that it will use during the DTLS handshake. In our use-case, where we try to
79+
//! hand-crate the SDP answer generated by the remote, this is problematic. A way to solve this
80+
//! is to make the hash a part of the remote's multiaddr. On the server side, we turn
81+
//! certificate verification off.
82+
83+
mod message_proto {
84+
#![allow(clippy::derive_partial_eq_without_eq)]
85+
86+
include!(concat!(env!("OUT_DIR"), "/webrtc.pb.rs"));
87+
}
88+
89+
#[cfg(feature = "tokio")]
90+
pub mod tokio;

‎transports/webrtc/src/message.proto

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
syntax = "proto2";
2+
3+
package webrtc.pb;
4+
5+
message Message {
6+
enum Flag {
7+
// The sender will no longer send messages on the stream.
8+
FIN = 0;
9+
// The sender will no longer read messages on the stream. Incoming data is
10+
// being discarded on receipt.
11+
STOP_SENDING = 1;
12+
// The sender abruptly terminates the sending part of the stream. The
13+
// receiver can discard any data that it already received on that stream.
14+
RESET = 2;
15+
}
16+
17+
optional Flag flag=1;
18+
19+
optional bytes message = 2;
20+
}
+120
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
// Copyright 2022 Parity Technologies (UK) Ltd.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a
4+
// copy of this software and associated documentation files (the "Software"),
5+
// to deal in the Software without restriction, including without limitation
6+
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7+
// and/or sell copies of the Software, and to permit persons to whom the
8+
// Software is furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14+
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18+
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19+
// DEALINGS IN THE SOFTWARE.
20+
21+
use rand::{distributions::DistString, CryptoRng, Rng};
22+
use webrtc::peer_connection::certificate::RTCCertificate;
23+
24+
use crate::tokio::fingerprint::Fingerprint;
25+
26+
#[derive(Debug, Clone, PartialEq)]
27+
pub struct Certificate {
28+
inner: RTCCertificate,
29+
}
30+
31+
impl Certificate {
32+
/// Generate new certificate.
33+
///
34+
/// `_rng` argument is ignored for now. See <https://github.com/melekes/rust-libp2p/pull/12>.
35+
pub fn generate<R>(_rng: &mut R) -> Result<Self, Error>
36+
where
37+
R: CryptoRng + Rng,
38+
{
39+
let mut params = rcgen::CertificateParams::new(vec![
40+
rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 16)
41+
]);
42+
params.alg = &rcgen::PKCS_ECDSA_P256_SHA256;
43+
Ok(Self {
44+
inner: RTCCertificate::from_params(params).expect("default params to work"),
45+
})
46+
}
47+
48+
/// Returns SHA-256 fingerprint of this certificate.
49+
///
50+
/// # Panics
51+
///
52+
/// This function will panic if there's no fingerprint with the SHA-256 algorithm (see
53+
/// [`RTCCertificate::get_fingerprints`]).
54+
pub fn fingerprint(&self) -> Fingerprint {
55+
let fingerprints = self.inner.get_fingerprints();
56+
let sha256_fingerprint = fingerprints
57+
.iter()
58+
.find(|f| f.algorithm == "sha-256")
59+
.expect("a SHA-256 fingerprint");
60+
61+
Fingerprint::try_from_rtc_dtls(sha256_fingerprint).expect("we filtered by sha-256")
62+
}
63+
64+
/// Parses a certificate from the ASCII PEM format.
65+
///
66+
/// See [`RTCCertificate::from_pem`]
67+
#[cfg(feature = "pem")]
68+
pub fn from_pem(pem_str: &str) -> Result<Self, Error> {
69+
Ok(Self {
70+
inner: RTCCertificate::from_pem(pem_str).map_err(Kind::InvalidPEM)?,
71+
})
72+
}
73+
74+
/// Serializes the certificate (including the private key) in PKCS#8 format in PEM.
75+
///
76+
/// See [`RTCCertificate::serialize_pem`]
77+
#[cfg(feature = "pem")]
78+
pub fn serialize_pem(&self) -> String {
79+
self.inner.serialize_pem()
80+
}
81+
82+
/// Extract the [`RTCCertificate`] from this wrapper.
83+
///
84+
/// This function is `pub(crate)` to avoid leaking the `webrtc` dependency to our users.
85+
pub(crate) fn to_rtc_certificate(&self) -> RTCCertificate {
86+
self.inner.clone()
87+
}
88+
}
89+
90+
#[derive(thiserror::Error, Debug)]
91+
#[error("Failed to generate certificate")]
92+
pub struct Error(#[from] Kind);
93+
94+
#[derive(thiserror::Error, Debug)]
95+
enum Kind {
96+
#[error(transparent)]
97+
InvalidPEM(#[from] webrtc::Error),
98+
}
99+
100+
#[cfg(test)]
101+
mod test {
102+
#[cfg(feature = "pem")]
103+
use anyhow::Result;
104+
105+
#[cfg(feature = "pem")]
106+
#[test]
107+
fn test_certificate_serialize_pem_and_from_pem() -> Result<()> {
108+
use super::*;
109+
use rand::thread_rng;
110+
111+
let cert = Certificate::generate(&mut thread_rng()).unwrap();
112+
113+
let pem = cert.serialize_pem();
114+
let loaded_cert = Certificate::from_pem(&pem)?;
115+
116+
assert_eq!(loaded_cert, cert);
117+
118+
Ok(())
119+
}
120+
}
+299
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,299 @@
1+
// Copyright 2022 Parity Technologies (UK) Ltd.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a
4+
// copy of this software and associated documentation files (the "Software"),
5+
// to deal in the Software without restriction, including without limitation
6+
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7+
// and/or sell copies of the Software, and to permit persons to whom the
8+
// Software is furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14+
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18+
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19+
// DEALINGS IN THE SOFTWARE.
20+
21+
use futures::stream::FuturesUnordered;
22+
use futures::{
23+
channel::{
24+
mpsc,
25+
oneshot::{self, Sender},
26+
},
27+
lock::Mutex as FutMutex,
28+
StreamExt,
29+
{future::BoxFuture, ready},
30+
};
31+
use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent};
32+
use webrtc::data::data_channel::DataChannel as DetachedDataChannel;
33+
use webrtc::data_channel::RTCDataChannel;
34+
use webrtc::peer_connection::RTCPeerConnection;
35+
36+
use std::task::Waker;
37+
use std::{
38+
pin::Pin,
39+
sync::Arc,
40+
task::{Context, Poll},
41+
};
42+
43+
use crate::tokio::{error::Error, substream, substream::Substream};
44+
45+
/// Maximum number of unprocessed data channels.
46+
/// See [`Connection::poll_inbound`].
47+
const MAX_DATA_CHANNELS_IN_FLIGHT: usize = 10;
48+
49+
/// A WebRTC connection, wrapping [`RTCPeerConnection`] and implementing [`StreamMuxer`] trait.
50+
pub struct Connection {
51+
/// [`RTCPeerConnection`] to the remote peer.
52+
///
53+
/// Uses futures mutex because used in async code (see poll_outbound and poll_close).
54+
peer_conn: Arc<FutMutex<RTCPeerConnection>>,
55+
56+
/// Channel onto which incoming data channels are put.
57+
incoming_data_channels_rx: mpsc::Receiver<Arc<DetachedDataChannel>>,
58+
59+
/// Future, which, once polled, will result in an outbound substream.
60+
outbound_fut: Option<BoxFuture<'static, Result<Arc<DetachedDataChannel>, Error>>>,
61+
62+
/// Future, which, once polled, will result in closing the entire connection.
63+
close_fut: Option<BoxFuture<'static, Result<(), Error>>>,
64+
65+
/// A list of futures, which, once completed, signal that a [`Substream`] has been dropped.
66+
drop_listeners: FuturesUnordered<substream::DropListener>,
67+
no_drop_listeners_waker: Option<Waker>,
68+
}
69+
70+
impl Unpin for Connection {}
71+
72+
impl Connection {
73+
/// Creates a new connection.
74+
pub(crate) async fn new(rtc_conn: RTCPeerConnection) -> Self {
75+
let (data_channel_tx, data_channel_rx) = mpsc::channel(MAX_DATA_CHANNELS_IN_FLIGHT);
76+
77+
Connection::register_incoming_data_channels_handler(
78+
&rtc_conn,
79+
Arc::new(FutMutex::new(data_channel_tx)),
80+
)
81+
.await;
82+
83+
Self {
84+
peer_conn: Arc::new(FutMutex::new(rtc_conn)),
85+
incoming_data_channels_rx: data_channel_rx,
86+
outbound_fut: None,
87+
close_fut: None,
88+
drop_listeners: FuturesUnordered::default(),
89+
no_drop_listeners_waker: None,
90+
}
91+
}
92+
93+
/// Registers a handler for incoming data channels.
94+
///
95+
/// NOTE: `mpsc::Sender` is wrapped in `Arc` because cloning a raw sender would make the channel
96+
/// unbounded. "The channel’s capacity is equal to buffer + num-senders. In other words, each
97+
/// sender gets a guaranteed slot in the channel capacity..."
98+
/// See <https://docs.rs/futures/latest/futures/channel/mpsc/fn.channel.html>
99+
async fn register_incoming_data_channels_handler(
100+
rtc_conn: &RTCPeerConnection,
101+
tx: Arc<FutMutex<mpsc::Sender<Arc<DetachedDataChannel>>>>,
102+
) {
103+
rtc_conn.on_data_channel(Box::new(move |data_channel: Arc<RTCDataChannel>| {
104+
log::debug!("Incoming data channel {}", data_channel.id());
105+
106+
let tx = tx.clone();
107+
108+
Box::pin(async move {
109+
data_channel.on_open({
110+
let data_channel = data_channel.clone();
111+
Box::new(move || {
112+
log::debug!("Data channel {} open", data_channel.id());
113+
114+
Box::pin(async move {
115+
let data_channel = data_channel.clone();
116+
let id = data_channel.id();
117+
match data_channel.detach().await {
118+
Ok(detached) => {
119+
let mut tx = tx.lock().await;
120+
if let Err(e) = tx.try_send(detached.clone()) {
121+
log::error!("Can't send data channel {}: {}", id, e);
122+
// We're not accepting data channels fast enough =>
123+
// close this channel.
124+
//
125+
// Ideally we'd refuse to accept a data channel
126+
// during the negotiation process, but it's not
127+
// possible with the current API.
128+
if let Err(e) = detached.close().await {
129+
log::error!(
130+
"Failed to close data channel {}: {}",
131+
id,
132+
e
133+
);
134+
}
135+
}
136+
}
137+
Err(e) => {
138+
log::error!("Can't detach data channel {}: {}", id, e);
139+
}
140+
};
141+
})
142+
})
143+
});
144+
})
145+
}));
146+
}
147+
}
148+
149+
impl StreamMuxer for Connection {
150+
type Substream = Substream;
151+
type Error = Error;
152+
153+
fn poll_inbound(
154+
mut self: Pin<&mut Self>,
155+
cx: &mut Context<'_>,
156+
) -> Poll<Result<Self::Substream, Self::Error>> {
157+
match ready!(self.incoming_data_channels_rx.poll_next_unpin(cx)) {
158+
Some(detached) => {
159+
log::trace!("Incoming substream {}", detached.stream_identifier());
160+
161+
let (substream, drop_listener) = Substream::new(detached);
162+
self.drop_listeners.push(drop_listener);
163+
if let Some(waker) = self.no_drop_listeners_waker.take() {
164+
waker.wake()
165+
}
166+
167+
Poll::Ready(Ok(substream))
168+
}
169+
None => {
170+
debug_assert!(
171+
false,
172+
"Sender-end of channel should be owned by `RTCPeerConnection`"
173+
);
174+
175+
Poll::Pending // Return `Pending` without registering a waker: If the channel is closed, we don't need to be called anymore.
176+
}
177+
}
178+
}
179+
180+
fn poll(
181+
mut self: Pin<&mut Self>,
182+
cx: &mut Context<'_>,
183+
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
184+
loop {
185+
match ready!(self.drop_listeners.poll_next_unpin(cx)) {
186+
Some(Ok(())) => {}
187+
Some(Err(e)) => {
188+
log::debug!("a DropListener failed: {e}")
189+
}
190+
None => {
191+
self.no_drop_listeners_waker = Some(cx.waker().clone());
192+
return Poll::Pending;
193+
}
194+
}
195+
}
196+
}
197+
198+
fn poll_outbound(
199+
mut self: Pin<&mut Self>,
200+
cx: &mut Context<'_>,
201+
) -> Poll<Result<Self::Substream, Self::Error>> {
202+
let peer_conn = self.peer_conn.clone();
203+
let fut = self.outbound_fut.get_or_insert(Box::pin(async move {
204+
let peer_conn = peer_conn.lock().await;
205+
206+
let data_channel = peer_conn.create_data_channel("", None).await?;
207+
208+
// No need to hold the lock during the DTLS handshake.
209+
drop(peer_conn);
210+
211+
log::trace!("Opening data channel {}", data_channel.id());
212+
213+
let (tx, rx) = oneshot::channel::<Arc<DetachedDataChannel>>();
214+
215+
// Wait until the data channel is opened and detach it.
216+
register_data_channel_open_handler(data_channel, tx).await;
217+
218+
// Wait until data channel is opened and ready to use
219+
match rx.await {
220+
Ok(detached) => Ok(detached),
221+
Err(e) => Err(Error::Internal(e.to_string())),
222+
}
223+
}));
224+
225+
match ready!(fut.as_mut().poll(cx)) {
226+
Ok(detached) => {
227+
self.outbound_fut = None;
228+
229+
log::trace!("Outbound substream {}", detached.stream_identifier());
230+
231+
let (substream, drop_listener) = Substream::new(detached);
232+
self.drop_listeners.push(drop_listener);
233+
if let Some(waker) = self.no_drop_listeners_waker.take() {
234+
waker.wake()
235+
}
236+
237+
Poll::Ready(Ok(substream))
238+
}
239+
Err(e) => {
240+
self.outbound_fut = None;
241+
Poll::Ready(Err(e))
242+
}
243+
}
244+
}
245+
246+
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
247+
log::debug!("Closing connection");
248+
249+
let peer_conn = self.peer_conn.clone();
250+
let fut = self.close_fut.get_or_insert(Box::pin(async move {
251+
let peer_conn = peer_conn.lock().await;
252+
peer_conn.close().await?;
253+
254+
Ok(())
255+
}));
256+
257+
match ready!(fut.as_mut().poll(cx)) {
258+
Ok(()) => {
259+
self.incoming_data_channels_rx.close();
260+
self.close_fut = None;
261+
Poll::Ready(Ok(()))
262+
}
263+
Err(e) => {
264+
self.close_fut = None;
265+
Poll::Ready(Err(e))
266+
}
267+
}
268+
}
269+
}
270+
271+
pub(crate) async fn register_data_channel_open_handler(
272+
data_channel: Arc<RTCDataChannel>,
273+
data_channel_tx: Sender<Arc<DetachedDataChannel>>,
274+
) {
275+
data_channel.on_open({
276+
let data_channel = data_channel.clone();
277+
Box::new(move || {
278+
log::debug!("Data channel {} open", data_channel.id());
279+
280+
Box::pin(async move {
281+
let data_channel = data_channel.clone();
282+
let id = data_channel.id();
283+
match data_channel.detach().await {
284+
Ok(detached) => {
285+
if let Err(e) = data_channel_tx.send(detached.clone()) {
286+
log::error!("Can't send data channel {}: {:?}", id, e);
287+
if let Err(e) = detached.close().await {
288+
log::error!("Failed to close data channel {}: {}", id, e);
289+
}
290+
}
291+
}
292+
Err(e) => {
293+
log::error!("Can't detach data channel {}: {}", id, e);
294+
}
295+
};
296+
})
297+
})
298+
});
299+
}

‎transports/webrtc/src/tokio/error.rs

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// Copyright 2022 Parity Technologies (UK) Ltd.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a
4+
// copy of this software and associated documentation files (the "Software"),
5+
// to deal in the Software without restriction, including without limitation
6+
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7+
// and/or sell copies of the Software, and to permit persons to whom the
8+
// Software is furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14+
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18+
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19+
// DEALINGS IN THE SOFTWARE.
20+
21+
use libp2p_core::PeerId;
22+
use thiserror::Error;
23+
24+
/// Error in WebRTC.
25+
#[derive(Error, Debug)]
26+
pub enum Error {
27+
#[error(transparent)]
28+
WebRTC(#[from] webrtc::Error),
29+
#[error("IO error")]
30+
Io(#[from] std::io::Error),
31+
#[error("failed to authenticate peer")]
32+
Authentication(#[from] libp2p_noise::NoiseError),
33+
34+
// Authentication errors.
35+
#[error("invalid peer ID (expected {expected}, got {got})")]
36+
InvalidPeerID { expected: PeerId, got: PeerId },
37+
38+
#[error("no active listeners, can not dial without a previous listen")]
39+
NoListeners,
40+
41+
#[error("UDP mux error: {0}")]
42+
UDPMux(std::io::Error),
43+
44+
#[error("internal error: {0} (see debug logs)")]
45+
Internal(String),
46+
}
+116
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
// Copyright 2022 Parity Technologies (UK) Ltd.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a
4+
// copy of this software and associated documentation files (the "Software"),
5+
// to deal in the Software without restriction, including without limitation
6+
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7+
// and/or sell copies of the Software, and to permit persons to whom the
8+
// Software is furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14+
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18+
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19+
// DEALINGS IN THE SOFTWARE.
20+
21+
use multihash::{Code, Hasher, Multihash, MultihashDigest};
22+
use webrtc::dtls_transport::dtls_fingerprint::RTCDtlsFingerprint;
23+
24+
use std::fmt;
25+
26+
const SHA256: &str = "sha-256";
27+
28+
/// A certificate fingerprint that is assumed to be created using the SHA256 hash algorithm.
29+
#[derive(Eq, PartialEq, Copy, Clone)]
30+
pub struct Fingerprint([u8; 32]);
31+
32+
impl Fingerprint {
33+
pub(crate) const FF: Fingerprint = Fingerprint([0xFF; 32]);
34+
35+
#[cfg(test)]
36+
pub fn raw(bytes: [u8; 32]) -> Self {
37+
Self(bytes)
38+
}
39+
40+
/// Creates a fingerprint from a raw certificate.
41+
pub fn from_certificate(bytes: &[u8]) -> Self {
42+
let mut h = multihash::Sha2_256::default();
43+
h.update(bytes);
44+
45+
let mut bytes: [u8; 32] = [0; 32];
46+
bytes.copy_from_slice(h.finalize());
47+
48+
Fingerprint(bytes)
49+
}
50+
51+
/// Converts [`RTCDtlsFingerprint`] to [`Fingerprint`].
52+
pub fn try_from_rtc_dtls(fp: &RTCDtlsFingerprint) -> Option<Self> {
53+
if fp.algorithm != SHA256 {
54+
return None;
55+
}
56+
57+
let mut buf = [0; 32];
58+
hex::decode_to_slice(fp.value.replace(':', ""), &mut buf).ok()?;
59+
60+
Some(Self(buf))
61+
}
62+
63+
/// Converts [`type@Multihash`] to [`Fingerprint`].
64+
pub fn try_from_multihash(hash: Multihash) -> Option<Self> {
65+
if hash.code() != u64::from(Code::Sha2_256) {
66+
// Only support SHA256 for now.
67+
return None;
68+
}
69+
70+
let bytes = hash.digest().try_into().ok()?;
71+
72+
Some(Self(bytes))
73+
}
74+
75+
/// Converts this fingerprint to [`type@Multihash`].
76+
pub fn to_multihash(self) -> Multihash {
77+
Code::Sha2_256
78+
.wrap(&self.0)
79+
.expect("fingerprint's len to be 32 bytes")
80+
}
81+
82+
/// Formats this fingerprint as uppercase hex, separated by colons (`:`).
83+
///
84+
/// This is the format described in <https://www.rfc-editor.org/rfc/rfc4572#section-5>.
85+
pub fn to_sdp_format(self) -> String {
86+
self.0.map(|byte| format!("{:02X}", byte)).join(":")
87+
}
88+
89+
/// Returns the algorithm used (e.g. "sha-256").
90+
/// See <https://datatracker.ietf.org/doc/html/rfc8122#section-5>
91+
pub fn algorithm(&self) -> String {
92+
SHA256.to_owned()
93+
}
94+
}
95+
96+
impl fmt::Debug for Fingerprint {
97+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
98+
f.write_str(&hex::encode(self.0))
99+
}
100+
}
101+
102+
#[cfg(test)]
103+
mod tests {
104+
use super::*;
105+
106+
#[test]
107+
fn sdp_format() {
108+
let fp = Fingerprint::raw(hex_literal::hex!(
109+
"7DE3D83F81A680592A471E6B6ABB0747ABD35385A8093FDFE112C1EEBB6CC6AC"
110+
));
111+
112+
let sdp_format = fp.to_sdp_format();
113+
114+
assert_eq!(sdp_format, "7D:E3:D8:3F:81:A6:80:59:2A:47:1E:6B:6A:BB:07:47:AB:D3:53:85:A8:09:3F:DF:E1:12:C1:EE:BB:6C:C6:AC")
115+
}
116+
}

‎transports/webrtc/src/tokio/mod.rs

+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Copyright 2022 Parity Technologies (UK) Ltd.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a
4+
// copy of this software and associated documentation files (the "Software"),
5+
// to deal in the Software without restriction, including without limitation
6+
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7+
// and/or sell copies of the Software, and to permit persons to whom the
8+
// Software is furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14+
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18+
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19+
// DEALINGS IN THE SOFTWARE.
20+
21+
pub mod certificate;
22+
mod connection;
23+
mod error;
24+
mod fingerprint;
25+
mod req_res_chan;
26+
mod sdp;
27+
mod substream;
28+
mod transport;
29+
mod udp_mux;
30+
mod upgrade;
31+
32+
pub use certificate::Certificate;
33+
pub use connection::Connection;
34+
pub use error::Error;
35+
pub use transport::Transport;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
// Copyright 2022 Parity Technologies (UK) Ltd.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a
4+
// copy of this software and associated documentation files (the "Software"),
5+
// to deal in the Software without restriction, including without limitation
6+
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7+
// and/or sell copies of the Software, and to permit persons to whom the
8+
// Software is furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14+
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18+
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19+
// DEALINGS IN THE SOFTWARE.
20+
21+
use futures::{
22+
channel::{mpsc, oneshot},
23+
SinkExt, StreamExt,
24+
};
25+
26+
use std::{
27+
io,
28+
task::{Context, Poll},
29+
};
30+
31+
pub fn new<Req, Res>(capacity: usize) -> (Sender<Req, Res>, Receiver<Req, Res>) {
32+
let (sender, receiver) = mpsc::channel(capacity);
33+
34+
(
35+
Sender {
36+
inner: futures::lock::Mutex::new(sender),
37+
},
38+
Receiver { inner: receiver },
39+
)
40+
}
41+
42+
pub struct Sender<Req, Res> {
43+
inner: futures::lock::Mutex<mpsc::Sender<(Req, oneshot::Sender<Res>)>>,
44+
}
45+
46+
impl<Req, Res> Sender<Req, Res> {
47+
pub async fn send(&self, req: Req) -> io::Result<Res> {
48+
let (sender, receiver) = oneshot::channel();
49+
50+
self.inner
51+
.lock()
52+
.await
53+
.send((req, sender))
54+
.await
55+
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
56+
let res = receiver
57+
.await
58+
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
59+
60+
Ok(res)
61+
}
62+
}
63+
64+
pub struct Receiver<Req, Res> {
65+
inner: mpsc::Receiver<(Req, oneshot::Sender<Res>)>,
66+
}
67+
68+
impl<Req, Res> Receiver<Req, Res> {
69+
pub fn poll_next_unpin(
70+
&mut self,
71+
cx: &mut Context<'_>,
72+
) -> Poll<Option<(Req, oneshot::Sender<Res>)>> {
73+
self.inner.poll_next_unpin(cx)
74+
}
75+
}

‎transports/webrtc/src/tokio/sdp.rs

+252
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,252 @@
1+
// Copyright 2022 Parity Technologies (UK) Ltd.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a
4+
// copy of this software and associated documentation files (the "Software"),
5+
// to deal in the Software without restriction, including without limitation
6+
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7+
// and/or sell copies of the Software, and to permit persons to whom the
8+
// Software is furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14+
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18+
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19+
// DEALINGS IN THE SOFTWARE.
20+
21+
use serde::Serialize;
22+
use tinytemplate::TinyTemplate;
23+
use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
24+
25+
use std::net::{IpAddr, SocketAddr};
26+
27+
use crate::tokio::fingerprint::Fingerprint;
28+
29+
/// Creates the SDP answer used by the client.
30+
pub fn answer(
31+
addr: SocketAddr,
32+
server_fingerprint: &Fingerprint,
33+
client_ufrag: &str,
34+
) -> RTCSessionDescription {
35+
RTCSessionDescription::answer(render_description(
36+
SERVER_SESSION_DESCRIPTION,
37+
addr,
38+
server_fingerprint,
39+
client_ufrag,
40+
))
41+
.unwrap()
42+
}
43+
44+
/// Creates the SDP offer used by the server.
45+
///
46+
/// Certificate verification is disabled which is why we hardcode a dummy fingerprint here.
47+
pub fn offer(addr: SocketAddr, client_ufrag: &str) -> RTCSessionDescription {
48+
RTCSessionDescription::offer(render_description(
49+
CLIENT_SESSION_DESCRIPTION,
50+
addr,
51+
&Fingerprint::FF,
52+
client_ufrag,
53+
))
54+
.unwrap()
55+
}
56+
57+
// An SDP message that constitutes the offer.
58+
//
59+
// Main RFC: <https://datatracker.ietf.org/doc/html/rfc8866>
60+
// `sctp-port` and `max-message-size` attrs RFC: <https://datatracker.ietf.org/doc/html/rfc8841>
61+
// `group` and `mid` attrs RFC: <https://datatracker.ietf.org/doc/html/rfc9143>
62+
// `ice-ufrag`, `ice-pwd` and `ice-options` attrs RFC: <https://datatracker.ietf.org/doc/html/rfc8839>
63+
// `setup` attr RFC: <https://datatracker.ietf.org/doc/html/rfc8122>
64+
//
65+
// Short description:
66+
//
67+
// v=<protocol-version> -> always 0
68+
// o=<username> <sess-id> <sess-version> <nettype> <addrtype> <unicast-address>
69+
//
70+
// <username> identifies the creator of the SDP document. We are allowed to use dummy values
71+
// (`-` and `0.0.0.0` as <addrtype>) to remain anonymous, which we do. Note that "IN" means
72+
// "Internet".
73+
//
74+
// s=<session name>
75+
//
76+
// We are allowed to pass a dummy `-`.
77+
//
78+
// c=<nettype> <addrtype> <connection-address>
79+
//
80+
// Indicates the IP address of the remote.
81+
// Note that "IN" means "Internet".
82+
//
83+
// t=<start-time> <stop-time>
84+
//
85+
// Start and end of the validity of the session. `0 0` means that the session never expires.
86+
//
87+
// m=<media> <port> <proto> <fmt> ...
88+
//
89+
// A `m=` line describes a request to establish a certain protocol. The protocol in this line
90+
// (i.e. `TCP/DTLS/SCTP` or `UDP/DTLS/SCTP`) must always be the same as the one in the offer.
91+
// We know that this is true because we tweak the offer to match the protocol. The `<fmt>`
92+
// component must always be `webrtc-datachannel` for WebRTC.
93+
// RFCs: 8839, 8866, 8841
94+
//
95+
// a=mid:<MID>
96+
//
97+
// Media ID - uniquely identifies this media stream (RFC9143).
98+
//
99+
// a=ice-options:ice2
100+
//
101+
// Indicates that we are complying with RFC8839 (as oppposed to the legacy RFC5245).
102+
//
103+
// a=ice-ufrag:<ICE user>
104+
// a=ice-pwd:<ICE password>
105+
//
106+
// ICE username and password, which are used for establishing and
107+
// maintaining the ICE connection. (RFC8839)
108+
// MUST match ones used by the answerer (server).
109+
//
110+
// a=fingerprint:sha-256 <fingerprint>
111+
//
112+
// Fingerprint of the certificate that the remote will use during the TLS
113+
// handshake. (RFC8122)
114+
//
115+
// a=setup:actpass
116+
//
117+
// The endpoint that is the offerer MUST use the setup attribute value of setup:actpass and be
118+
// prepared to receive a client_hello before it receives the answer.
119+
//
120+
// a=sctp-port:<value>
121+
//
122+
// The SCTP port (RFC8841)
123+
// Note it's different from the "m=" line port value, which indicates the port of the
124+
// underlying transport-layer protocol (UDP or TCP).
125+
//
126+
// a=max-message-size:<value>
127+
//
128+
// The maximum SCTP user message size (in bytes). (RFC8841)
129+
const CLIENT_SESSION_DESCRIPTION: &str = "v=0
130+
o=- 0 0 IN {ip_version} {target_ip}
131+
s=-
132+
c=IN {ip_version} {target_ip}
133+
t=0 0
134+
135+
m=application {target_port} UDP/DTLS/SCTP webrtc-datachannel
136+
a=mid:0
137+
a=ice-options:ice2
138+
a=ice-ufrag:{ufrag}
139+
a=ice-pwd:{pwd}
140+
a=fingerprint:{fingerprint_algorithm} {fingerprint_value}
141+
a=setup:actpass
142+
a=sctp-port:5000
143+
a=max-message-size:16384
144+
";
145+
146+
// See [`CLIENT_SESSION_DESCRIPTION`].
147+
//
148+
// a=ice-lite
149+
//
150+
// A lite implementation is only appropriate for devices that will *always* be connected to
151+
// the public Internet and have a public IP address at which it can receive packets from any
152+
// correspondent. ICE will not function when a lite implementation is placed behind a NAT
153+
// (RFC8445).
154+
//
155+
// a=tls-id:<id>
156+
//
157+
// "TLS ID" uniquely identifies a TLS association.
158+
// The ICE protocol uses a "TLS ID" system to indicate whether a fresh DTLS connection
159+
// must be reopened in case of ICE renegotiation. Considering that ICE renegotiations
160+
// never happen in our use case, we can simply put a random value and not care about
161+
// it. Note however that the TLS ID in the answer must be present if and only if the
162+
// offer contains one. (RFC8842)
163+
// TODO: is it true that renegotiations never happen? what about a connection closing?
164+
// "tls-id" attribute MUST be present in the initial offer and respective answer (RFC8839).
165+
// XXX: but right now browsers don't send it.
166+
//
167+
// a=setup:passive
168+
//
169+
// "passive" indicates that the remote DTLS server will only listen for incoming
170+
// connections. (RFC5763)
171+
// The answerer (server) MUST not be located behind a NAT (RFC6135).
172+
//
173+
// The answerer MUST use either a setup attribute value of setup:active or setup:passive.
174+
// Note that if the answerer uses setup:passive, then the DTLS handshake will not begin until
175+
// the answerer is received, which adds additional latency. setup:active allows the answer and
176+
// the DTLS handshake to occur in parallel. Thus, setup:active is RECOMMENDED.
177+
//
178+
// a=candidate:<foundation> <component-id> <transport> <priority> <connection-address> <port> <cand-type>
179+
//
180+
// A transport address for a candidate that can be used for connectivity checks (RFC8839).
181+
//
182+
// a=end-of-candidates
183+
//
184+
// Indicate that no more candidates will ever be sent (RFC8838).
185+
const SERVER_SESSION_DESCRIPTION: &str = "v=0
186+
o=- 0 0 IN {ip_version} {target_ip}
187+
s=-
188+
t=0 0
189+
a=ice-lite
190+
m=application {target_port} UDP/DTLS/SCTP webrtc-datachannel
191+
c=IN {ip_version} {target_ip}
192+
a=mid:0
193+
a=ice-options:ice2
194+
a=ice-ufrag:{ufrag}
195+
a=ice-pwd:{pwd}
196+
a=fingerprint:{fingerprint_algorithm} {fingerprint_value}
197+
198+
a=setup:passive
199+
a=sctp-port:5000
200+
a=max-message-size:16384
201+
a=candidate:1 1 UDP 1 {target_ip} {target_port} typ host
202+
a=end-of-candidates
203+
";
204+
205+
/// Indicates the IP version used in WebRTC: `IP4` or `IP6`.
206+
#[derive(Serialize)]
207+
enum IpVersion {
208+
IP4,
209+
IP6,
210+
}
211+
212+
/// Context passed to the templating engine, which replaces the above placeholders (e.g.
213+
/// `{IP_VERSION}`) with real values.
214+
#[derive(Serialize)]
215+
struct DescriptionContext {
216+
pub ip_version: IpVersion,
217+
pub target_ip: IpAddr,
218+
pub target_port: u16,
219+
pub fingerprint_algorithm: String,
220+
pub fingerprint_value: String,
221+
pub ufrag: String,
222+
pub pwd: String,
223+
}
224+
225+
/// Renders a [`TinyTemplate`] description using the provided arguments.
226+
fn render_description(
227+
description: &str,
228+
addr: SocketAddr,
229+
fingerprint: &Fingerprint,
230+
ufrag: &str,
231+
) -> String {
232+
let mut tt = TinyTemplate::new();
233+
tt.add_template("description", description).unwrap();
234+
235+
let context = DescriptionContext {
236+
ip_version: {
237+
if addr.is_ipv4() {
238+
IpVersion::IP4
239+
} else {
240+
IpVersion::IP6
241+
}
242+
},
243+
target_ip: addr.ip(),
244+
target_port: addr.port(),
245+
fingerprint_algorithm: fingerprint.algorithm(),
246+
fingerprint_value: fingerprint.to_sdp_format(),
247+
// NOTE: ufrag is equal to pwd.
248+
ufrag: ufrag.to_owned(),
249+
pwd: ufrag.to_owned(),
250+
};
251+
tt.render("description", &context).unwrap()
252+
}
+295
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,295 @@
1+
// Copyright 2022 Parity Technologies (UK) Ltd.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a
4+
// copy of this software and associated documentation files (the "Software"),
5+
// to deal in the Software without restriction, including without limitation
6+
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7+
// and/or sell copies of the Software, and to permit persons to whom the
8+
// Software is furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14+
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18+
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19+
// DEALINGS IN THE SOFTWARE.
20+
21+
use asynchronous_codec::Framed;
22+
use bytes::Bytes;
23+
use futures::{channel::oneshot, prelude::*, ready};
24+
use tokio_util::compat::Compat;
25+
use webrtc::data::data_channel::{DataChannel, PollDataChannel};
26+
27+
use std::{
28+
io,
29+
pin::Pin,
30+
sync::Arc,
31+
task::{Context, Poll},
32+
};
33+
34+
use crate::message_proto::{message::Flag, Message};
35+
use crate::tokio::{
36+
substream::drop_listener::GracefullyClosed,
37+
substream::framed_dc::FramedDc,
38+
substream::state::{Closing, State},
39+
};
40+
41+
mod drop_listener;
42+
mod framed_dc;
43+
mod state;
44+
45+
/// Maximum length of a message.
46+
///
47+
/// "As long as message interleaving is not supported, the sender SHOULD limit the maximum message
48+
/// size to 16 KB to avoid monopolization."
49+
/// Source: <https://www.rfc-editor.org/rfc/rfc8831#name-transferring-user-data-on-a>
50+
const MAX_MSG_LEN: usize = 16384; // 16kiB
51+
/// Length of varint, in bytes.
52+
const VARINT_LEN: usize = 2;
53+
/// Overhead of the protobuf encoding, in bytes.
54+
const PROTO_OVERHEAD: usize = 5;
55+
/// Maximum length of data, in bytes.
56+
const MAX_DATA_LEN: usize = MAX_MSG_LEN - VARINT_LEN - PROTO_OVERHEAD;
57+
58+
pub use drop_listener::DropListener;
59+
60+
/// A substream on top of a WebRTC data channel.
61+
///
62+
/// To be a proper libp2p substream, we need to implement [`AsyncRead`] and [`AsyncWrite`] as well
63+
/// as support a half-closed state which we do by framing messages in a protobuf envelope.
64+
pub struct Substream {
65+
io: FramedDc,
66+
state: State,
67+
read_buffer: Bytes,
68+
/// Dropping this will close the oneshot and notify the receiver by emitting `Canceled`.
69+
drop_notifier: Option<oneshot::Sender<GracefullyClosed>>,
70+
}
71+
72+
impl Substream {
73+
/// Returns a new `Substream` and a listener, which will notify the receiver when/if the substream
74+
/// is dropped.
75+
pub(crate) fn new(data_channel: Arc<DataChannel>) -> (Self, DropListener) {
76+
let (sender, receiver) = oneshot::channel();
77+
78+
let substream = Self {
79+
io: framed_dc::new(data_channel.clone()),
80+
state: State::Open,
81+
read_buffer: Bytes::default(),
82+
drop_notifier: Some(sender),
83+
};
84+
let listener = DropListener::new(framed_dc::new(data_channel), receiver);
85+
86+
(substream, listener)
87+
}
88+
89+
/// Gracefully closes the "read-half" of the substream.
90+
pub fn poll_close_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
91+
loop {
92+
match self.state.close_read_barrier()? {
93+
Some(Closing::Requested) => {
94+
ready!(self.io.poll_ready_unpin(cx))?;
95+
96+
self.io.start_send_unpin(Message {
97+
flag: Some(Flag::StopSending.into()),
98+
message: None,
99+
})?;
100+
self.state.close_read_message_sent();
101+
102+
continue;
103+
}
104+
Some(Closing::MessageSent) => {
105+
ready!(self.io.poll_flush_unpin(cx))?;
106+
107+
self.state.read_closed();
108+
109+
return Poll::Ready(Ok(()));
110+
}
111+
None => return Poll::Ready(Ok(())),
112+
}
113+
}
114+
}
115+
}
116+
117+
impl AsyncRead for Substream {
118+
fn poll_read(
119+
mut self: Pin<&mut Self>,
120+
cx: &mut Context<'_>,
121+
buf: &mut [u8],
122+
) -> Poll<io::Result<usize>> {
123+
loop {
124+
self.state.read_barrier()?;
125+
126+
if !self.read_buffer.is_empty() {
127+
let n = std::cmp::min(self.read_buffer.len(), buf.len());
128+
let data = self.read_buffer.split_to(n);
129+
buf[0..n].copy_from_slice(&data[..]);
130+
131+
return Poll::Ready(Ok(n));
132+
}
133+
134+
let Self {
135+
read_buffer,
136+
io,
137+
state,
138+
..
139+
} = &mut *self;
140+
141+
match ready!(io_poll_next(io, cx))? {
142+
Some((flag, message)) => {
143+
if let Some(flag) = flag {
144+
state.handle_inbound_flag(flag, read_buffer);
145+
}
146+
147+
debug_assert!(read_buffer.is_empty());
148+
if let Some(message) = message {
149+
*read_buffer = message.into();
150+
}
151+
}
152+
None => {
153+
state.handle_inbound_flag(Flag::Fin, read_buffer);
154+
return Poll::Ready(Ok(0));
155+
}
156+
}
157+
}
158+
}
159+
}
160+
161+
impl AsyncWrite for Substream {
162+
fn poll_write(
163+
mut self: Pin<&mut Self>,
164+
cx: &mut Context<'_>,
165+
buf: &[u8],
166+
) -> Poll<io::Result<usize>> {
167+
while self.state.read_flags_in_async_write() {
168+
// TODO: In case AsyncRead::poll_read encountered an error or returned None earlier, we will poll the
169+
// underlying I/O resource once more. Is that allowed? How about introducing a state IoReadClosed?
170+
171+
let Self {
172+
read_buffer,
173+
io,
174+
state,
175+
..
176+
} = &mut *self;
177+
178+
match io_poll_next(io, cx)? {
179+
Poll::Ready(Some((Some(flag), message))) => {
180+
// Read side is closed. Discard any incoming messages.
181+
drop(message);
182+
// But still handle flags, e.g. a `Flag::StopSending`.
183+
state.handle_inbound_flag(flag, read_buffer)
184+
}
185+
Poll::Ready(Some((None, message))) => drop(message),
186+
Poll::Ready(None) | Poll::Pending => break,
187+
}
188+
}
189+
190+
self.state.write_barrier()?;
191+
192+
ready!(self.io.poll_ready_unpin(cx))?;
193+
194+
let n = usize::min(buf.len(), MAX_DATA_LEN);
195+
196+
Pin::new(&mut self.io).start_send(Message {
197+
flag: None,
198+
message: Some(buf[0..n].into()),
199+
})?;
200+
201+
Poll::Ready(Ok(n))
202+
}
203+
204+
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
205+
self.io.poll_flush_unpin(cx).map_err(Into::into)
206+
}
207+
208+
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
209+
loop {
210+
match self.state.close_write_barrier()? {
211+
Some(Closing::Requested) => {
212+
ready!(self.io.poll_ready_unpin(cx))?;
213+
214+
self.io.start_send_unpin(Message {
215+
flag: Some(Flag::Fin.into()),
216+
message: None,
217+
})?;
218+
self.state.close_write_message_sent();
219+
220+
continue;
221+
}
222+
Some(Closing::MessageSent) => {
223+
ready!(self.io.poll_flush_unpin(cx))?;
224+
225+
self.state.write_closed();
226+
let _ = self
227+
.drop_notifier
228+
.take()
229+
.expect("to not close twice")
230+
.send(GracefullyClosed {});
231+
232+
return Poll::Ready(Ok(()));
233+
}
234+
None => return Poll::Ready(Ok(())),
235+
}
236+
}
237+
}
238+
}
239+
240+
fn io_poll_next(
241+
io: &mut Framed<Compat<PollDataChannel>, prost_codec::Codec<Message>>,
242+
cx: &mut Context<'_>,
243+
) -> Poll<io::Result<Option<(Option<Flag>, Option<Vec<u8>>)>>> {
244+
match ready!(io.poll_next_unpin(cx))
245+
.transpose()
246+
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?
247+
{
248+
Some(Message { flag, message }) => {
249+
let flag = flag
250+
.map(|f| {
251+
Flag::from_i32(f).ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, ""))
252+
})
253+
.transpose()?;
254+
255+
Poll::Ready(Ok(Some((flag, message))))
256+
}
257+
None => Poll::Ready(Ok(None)),
258+
}
259+
}
260+
261+
#[cfg(test)]
262+
mod tests {
263+
use super::*;
264+
use asynchronous_codec::Encoder;
265+
use bytes::BytesMut;
266+
use prost::Message;
267+
use unsigned_varint::codec::UviBytes;
268+
269+
#[test]
270+
fn max_data_len() {
271+
// Largest possible message.
272+
let message = [0; MAX_DATA_LEN];
273+
274+
let protobuf = crate::message_proto::Message {
275+
flag: Some(crate::message_proto::message::Flag::Fin.into()),
276+
message: Some(message.to_vec()),
277+
};
278+
279+
let mut encoded_msg = BytesMut::new();
280+
protobuf
281+
.encode(&mut encoded_msg)
282+
.expect("BytesMut to have sufficient capacity.");
283+
assert_eq!(encoded_msg.len(), message.len() + PROTO_OVERHEAD);
284+
285+
let mut uvi = UviBytes::default();
286+
let mut dst = BytesMut::new();
287+
uvi.encode(encoded_msg.clone().freeze(), &mut dst).unwrap();
288+
289+
// Ensure the varint prefixed and protobuf encoded largest message is no longer than the
290+
// maximum limit specified in the libp2p WebRTC specification.
291+
assert_eq!(dst.len(), MAX_MSG_LEN);
292+
293+
assert_eq!(dst.len() - encoded_msg.len(), VARINT_LEN);
294+
}
295+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
// Copyright 2022 Parity Technologies (UK) Ltd.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a
4+
// copy of this software and associated documentation files (the "Software"),
5+
// to deal in the Software without restriction, including without limitation
6+
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7+
// and/or sell copies of the Software, and to permit persons to whom the
8+
// Software is furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14+
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18+
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19+
// DEALINGS IN THE SOFTWARE.
20+
21+
use futures::channel::oneshot;
22+
use futures::channel::oneshot::Canceled;
23+
use futures::{FutureExt, SinkExt};
24+
25+
use std::future::Future;
26+
use std::io;
27+
use std::pin::Pin;
28+
use std::task::{Context, Poll};
29+
30+
use crate::message_proto::{message::Flag, Message};
31+
use crate::tokio::substream::framed_dc::FramedDc;
32+
33+
#[must_use]
34+
pub struct DropListener {
35+
state: State,
36+
}
37+
38+
impl DropListener {
39+
pub fn new(stream: FramedDc, receiver: oneshot::Receiver<GracefullyClosed>) -> Self {
40+
let substream_id = stream.get_ref().stream_identifier();
41+
42+
Self {
43+
state: State::Idle {
44+
stream,
45+
receiver,
46+
substream_id,
47+
},
48+
}
49+
}
50+
}
51+
52+
enum State {
53+
/// The [`DropListener`] is idle and waiting to be activated.
54+
Idle {
55+
stream: FramedDc,
56+
receiver: oneshot::Receiver<GracefullyClosed>,
57+
substream_id: u16,
58+
},
59+
/// The stream got dropped and we are sending a reset flag.
60+
SendingReset {
61+
stream: FramedDc,
62+
},
63+
Flushing {
64+
stream: FramedDc,
65+
},
66+
/// Bad state transition.
67+
Poisoned,
68+
}
69+
70+
impl Future for DropListener {
71+
type Output = io::Result<()>;
72+
73+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
74+
let state = &mut self.get_mut().state;
75+
76+
loop {
77+
match std::mem::replace(state, State::Poisoned) {
78+
State::Idle {
79+
stream,
80+
substream_id,
81+
mut receiver,
82+
} => match receiver.poll_unpin(cx) {
83+
Poll::Ready(Ok(GracefullyClosed {})) => {
84+
return Poll::Ready(Ok(()));
85+
}
86+
Poll::Ready(Err(Canceled)) => {
87+
log::info!("Substream {substream_id} dropped without graceful close, sending Reset");
88+
*state = State::SendingReset { stream };
89+
continue;
90+
}
91+
Poll::Pending => {
92+
*state = State::Idle {
93+
stream,
94+
substream_id,
95+
receiver,
96+
};
97+
return Poll::Pending;
98+
}
99+
},
100+
State::SendingReset { mut stream } => match stream.poll_ready_unpin(cx)? {
101+
Poll::Ready(()) => {
102+
stream.start_send_unpin(Message {
103+
flag: Some(Flag::Reset.into()),
104+
message: None,
105+
})?;
106+
*state = State::Flushing { stream };
107+
continue;
108+
}
109+
Poll::Pending => {
110+
*state = State::SendingReset { stream };
111+
return Poll::Pending;
112+
}
113+
},
114+
State::Flushing { mut stream } => match stream.poll_flush_unpin(cx)? {
115+
Poll::Ready(()) => return Poll::Ready(Ok(())),
116+
Poll::Pending => {
117+
*state = State::Flushing { stream };
118+
return Poll::Pending;
119+
}
120+
},
121+
State::Poisoned => {
122+
unreachable!()
123+
}
124+
}
125+
}
126+
}
127+
}
128+
129+
/// Indicates that our substream got gracefully closed.
130+
pub struct GracefullyClosed {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright 2022 Parity Technologies (UK) Ltd.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a
4+
// copy of this software and associated documentation files (the "Software"),
5+
// to deal in the Software without restriction, including without limitation
6+
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7+
// and/or sell copies of the Software, and to permit persons to whom the
8+
// Software is furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14+
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18+
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19+
// DEALINGS IN THE SOFTWARE.
20+
21+
use asynchronous_codec::Framed;
22+
use tokio_util::compat::Compat;
23+
use tokio_util::compat::TokioAsyncReadCompatExt;
24+
use webrtc::data::data_channel::{DataChannel, PollDataChannel};
25+
26+
use std::sync::Arc;
27+
28+
use super::{MAX_DATA_LEN, MAX_MSG_LEN, VARINT_LEN};
29+
use crate::message_proto::Message;
30+
31+
pub type FramedDc = Framed<Compat<PollDataChannel>, prost_codec::Codec<Message>>;
32+
33+
pub fn new(data_channel: Arc<DataChannel>) -> FramedDc {
34+
let mut inner = PollDataChannel::new(data_channel);
35+
inner.set_read_buf_capacity(MAX_MSG_LEN);
36+
37+
let mut framed = Framed::new(
38+
inner.compat(),
39+
prost_codec::Codec::new(MAX_MSG_LEN - VARINT_LEN),
40+
);
41+
// If not set, `Framed` buffers up to 131kB of data before sending, which leads to "outbound
42+
// packet larger than maximum message size" error in webrtc-rs.
43+
framed.set_send_high_water_mark(MAX_DATA_LEN);
44+
framed
45+
}

‎transports/webrtc/src/tokio/substream/state.rs

+510
Large diffs are not rendered by default.

‎transports/webrtc/src/tokio/transport.rs

+604
Large diffs are not rendered by default.

‎transports/webrtc/src/tokio/udp_mux.rs

+577
Large diffs are not rendered by default.
+240
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
// Copyright 2022 Parity Technologies (UK) Ltd.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a
4+
// copy of this software and associated documentation files (the "Software"),
5+
// to deal in the Software without restriction, including without limitation
6+
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7+
// and/or sell copies of the Software, and to permit persons to whom the
8+
// Software is furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14+
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18+
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19+
// DEALINGS IN THE SOFTWARE.
20+
21+
mod noise;
22+
23+
use futures::channel::oneshot;
24+
use futures::future::Either;
25+
use futures_timer::Delay;
26+
use libp2p_core::{identity, PeerId};
27+
use rand::distributions::Alphanumeric;
28+
use rand::{thread_rng, Rng};
29+
use webrtc::api::setting_engine::SettingEngine;
30+
use webrtc::api::APIBuilder;
31+
use webrtc::data::data_channel::DataChannel;
32+
use webrtc::data_channel::data_channel_init::RTCDataChannelInit;
33+
use webrtc::dtls_transport::dtls_role::DTLSRole;
34+
use webrtc::ice::network_type::NetworkType;
35+
use webrtc::ice::udp_mux::UDPMux;
36+
use webrtc::ice::udp_network::UDPNetwork;
37+
use webrtc::peer_connection::configuration::RTCConfiguration;
38+
use webrtc::peer_connection::RTCPeerConnection;
39+
40+
use std::{net::SocketAddr, sync::Arc, time::Duration};
41+
42+
use crate::tokio::{error::Error, fingerprint::Fingerprint, sdp, substream::Substream, Connection};
43+
44+
/// Creates a new outbound WebRTC connection.
45+
pub async fn outbound(
46+
addr: SocketAddr,
47+
config: RTCConfiguration,
48+
udp_mux: Arc<dyn UDPMux + Send + Sync>,
49+
client_fingerprint: Fingerprint,
50+
server_fingerprint: Fingerprint,
51+
id_keys: identity::Keypair,
52+
) -> Result<(PeerId, Connection), Error> {
53+
log::debug!("new outbound connection to {addr})");
54+
55+
let (peer_connection, ufrag) = new_outbound_connection(addr, config, udp_mux).await?;
56+
57+
let offer = peer_connection.create_offer(None).await?;
58+
log::debug!("created SDP offer for outbound connection: {:?}", offer.sdp);
59+
peer_connection.set_local_description(offer).await?;
60+
61+
let answer = sdp::answer(addr, &server_fingerprint, &ufrag);
62+
log::debug!(
63+
"calculated SDP answer for outbound connection: {:?}",
64+
answer
65+
);
66+
peer_connection.set_remote_description(answer).await?; // This will start the gathering of ICE candidates.
67+
68+
let data_channel = create_substream_for_noise_handshake(&peer_connection).await?;
69+
let peer_id = noise::outbound(
70+
id_keys,
71+
data_channel,
72+
server_fingerprint,
73+
client_fingerprint,
74+
)
75+
.await?;
76+
77+
Ok((peer_id, Connection::new(peer_connection).await))
78+
}
79+
80+
/// Creates a new inbound WebRTC connection.
81+
pub async fn inbound(
82+
addr: SocketAddr,
83+
config: RTCConfiguration,
84+
udp_mux: Arc<dyn UDPMux + Send + Sync>,
85+
server_fingerprint: Fingerprint,
86+
remote_ufrag: String,
87+
id_keys: identity::Keypair,
88+
) -> Result<(PeerId, Connection), Error> {
89+
log::debug!("new inbound connection from {addr} (ufrag: {remote_ufrag})");
90+
91+
let peer_connection = new_inbound_connection(addr, config, udp_mux, &remote_ufrag).await?;
92+
93+
let offer = sdp::offer(addr, &remote_ufrag);
94+
log::debug!("calculated SDP offer for inbound connection: {:?}", offer);
95+
peer_connection.set_remote_description(offer).await?;
96+
97+
let answer = peer_connection.create_answer(None).await?;
98+
log::debug!("created SDP answer for inbound connection: {:?}", answer);
99+
peer_connection.set_local_description(answer).await?; // This will start the gathering of ICE candidates.
100+
101+
let data_channel = create_substream_for_noise_handshake(&peer_connection).await?;
102+
let client_fingerprint = get_remote_fingerprint(&peer_connection).await;
103+
let peer_id = noise::inbound(
104+
id_keys,
105+
data_channel,
106+
client_fingerprint,
107+
server_fingerprint,
108+
)
109+
.await?;
110+
111+
Ok((peer_id, Connection::new(peer_connection).await))
112+
}
113+
114+
async fn new_outbound_connection(
115+
addr: SocketAddr,
116+
config: RTCConfiguration,
117+
udp_mux: Arc<dyn UDPMux + Send + Sync>,
118+
) -> Result<(RTCPeerConnection, String), Error> {
119+
let ufrag = random_ufrag();
120+
let se = setting_engine(udp_mux, &ufrag, addr);
121+
122+
let connection = APIBuilder::new()
123+
.with_setting_engine(se)
124+
.build()
125+
.new_peer_connection(config)
126+
.await?;
127+
128+
Ok((connection, ufrag))
129+
}
130+
131+
async fn new_inbound_connection(
132+
addr: SocketAddr,
133+
config: RTCConfiguration,
134+
udp_mux: Arc<dyn UDPMux + Send + Sync>,
135+
ufrag: &str,
136+
) -> Result<RTCPeerConnection, Error> {
137+
let mut se = setting_engine(udp_mux, ufrag, addr);
138+
{
139+
se.set_lite(true);
140+
se.disable_certificate_fingerprint_verification(true);
141+
// Act as a DTLS server (one which waits for a connection).
142+
//
143+
// NOTE: removing this seems to break DTLS setup (both sides send `ClientHello` messages,
144+
// but none end up responding).
145+
se.set_answering_dtls_role(DTLSRole::Server)?;
146+
}
147+
148+
let connection = APIBuilder::new()
149+
.with_setting_engine(se)
150+
.build()
151+
.new_peer_connection(config)
152+
.await?;
153+
154+
Ok(connection)
155+
}
156+
157+
/// Generates a random ufrag and adds a prefix according to the spec.
158+
fn random_ufrag() -> String {
159+
format!(
160+
"libp2p+webrtc+v1/{}",
161+
thread_rng()
162+
.sample_iter(&Alphanumeric)
163+
.take(64)
164+
.map(char::from)
165+
.collect::<String>()
166+
)
167+
}
168+
169+
fn setting_engine(
170+
udp_mux: Arc<dyn UDPMux + Send + Sync>,
171+
ufrag: &str,
172+
addr: SocketAddr,
173+
) -> SettingEngine {
174+
let mut se = SettingEngine::default();
175+
176+
// Set both ICE user and password to our fingerprint because that's what the client is
177+
// expecting..
178+
se.set_ice_credentials(ufrag.to_owned(), ufrag.to_owned());
179+
180+
se.set_udp_network(UDPNetwork::Muxed(udp_mux.clone()));
181+
182+
// Allow detaching data channels.
183+
se.detach_data_channels();
184+
185+
// Set the desired network type.
186+
//
187+
// NOTE: if not set, a [`webrtc_ice::agent::Agent`] might pick a wrong local candidate
188+
// (e.g. IPv6 `[::1]` while dialing an IPv4 `10.11.12.13`).
189+
let network_type = match addr {
190+
SocketAddr::V4(_) => NetworkType::Udp4,
191+
SocketAddr::V6(_) => NetworkType::Udp6,
192+
};
193+
se.set_network_types(vec![network_type]);
194+
195+
se
196+
}
197+
198+
/// Returns the SHA-256 fingerprint of the remote.
199+
async fn get_remote_fingerprint(conn: &RTCPeerConnection) -> Fingerprint {
200+
let cert_bytes = conn.sctp().transport().get_remote_certificate().await;
201+
202+
Fingerprint::from_certificate(&cert_bytes)
203+
}
204+
205+
async fn create_substream_for_noise_handshake(
206+
conn: &RTCPeerConnection,
207+
) -> Result<Substream, Error> {
208+
// NOTE: the data channel w/ `negotiated` flag set to `true` MUST be created on both ends.
209+
let data_channel = conn
210+
.create_data_channel(
211+
"",
212+
Some(RTCDataChannelInit {
213+
negotiated: Some(0), // 0 is reserved for the Noise substream
214+
..RTCDataChannelInit::default()
215+
}),
216+
)
217+
.await?;
218+
219+
let (tx, rx) = oneshot::channel::<Arc<DataChannel>>();
220+
221+
// Wait until the data channel is opened and detach it.
222+
crate::tokio::connection::register_data_channel_open_handler(data_channel, tx).await;
223+
224+
let channel = match futures::future::select(rx, Delay::new(Duration::from_secs(10))).await {
225+
Either::Left((Ok(channel), _)) => channel,
226+
Either::Left((Err(_), _)) => {
227+
return Err(Error::Internal("failed to open data channel".to_owned()))
228+
}
229+
Either::Right(((), _)) => {
230+
return Err(Error::Internal(
231+
"data channel opening took longer than 10 seconds (see logs)".into(),
232+
))
233+
}
234+
};
235+
236+
let (substream, drop_listener) = Substream::new(channel);
237+
drop(drop_listener); // Don't care about cancelled substreams during initial handshake.
238+
239+
Ok(substream)
240+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
// Copyright 2022 Parity Technologies (UK) Ltd.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a
4+
// copy of this software and associated documentation files (the "Software"),
5+
// to deal in the Software without restriction, including without limitation
6+
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7+
// and/or sell copies of the Software, and to permit persons to whom the
8+
// Software is furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14+
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18+
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19+
// DEALINGS IN THE SOFTWARE.
20+
21+
use futures::{AsyncRead, AsyncWrite, AsyncWriteExt};
22+
use libp2p_core::{identity, InboundUpgrade, OutboundUpgrade, PeerId, UpgradeInfo};
23+
use libp2p_noise::{Keypair, NoiseConfig, X25519Spec};
24+
25+
use crate::tokio::fingerprint::Fingerprint;
26+
use crate::tokio::Error;
27+
28+
pub async fn inbound<T>(
29+
id_keys: identity::Keypair,
30+
stream: T,
31+
client_fingerprint: Fingerprint,
32+
server_fingerprint: Fingerprint,
33+
) -> Result<PeerId, Error>
34+
where
35+
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
36+
{
37+
let dh_keys = Keypair::<X25519Spec>::new()
38+
.into_authentic(&id_keys)
39+
.unwrap();
40+
let noise = NoiseConfig::xx(dh_keys)
41+
.with_prologue(noise_prologue(client_fingerprint, server_fingerprint));
42+
let info = noise.protocol_info().next().unwrap();
43+
// Note the roles are reversed because it allows the server (webrtc connection responder) to
44+
// send application data 0.5 RTT earlier.
45+
let (peer_id, mut channel) = noise
46+
.into_authenticated()
47+
.upgrade_outbound(stream, info)
48+
.await?;
49+
50+
channel.close().await?;
51+
52+
Ok(peer_id)
53+
}
54+
55+
pub async fn outbound<T>(
56+
id_keys: identity::Keypair,
57+
stream: T,
58+
server_fingerprint: Fingerprint,
59+
client_fingerprint: Fingerprint,
60+
) -> Result<PeerId, Error>
61+
where
62+
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
63+
{
64+
let dh_keys = Keypair::<X25519Spec>::new()
65+
.into_authentic(&id_keys)
66+
.unwrap();
67+
let noise = NoiseConfig::xx(dh_keys)
68+
.with_prologue(noise_prologue(client_fingerprint, server_fingerprint));
69+
let info = noise.protocol_info().next().unwrap();
70+
// Note the roles are reversed because it allows the server (webrtc connection responder) to
71+
// send application data 0.5 RTT earlier.
72+
let (peer_id, mut channel) = noise
73+
.into_authenticated()
74+
.upgrade_inbound(stream, info)
75+
.await?;
76+
77+
channel.close().await?;
78+
79+
Ok(peer_id)
80+
}
81+
82+
pub fn noise_prologue(client_fingerprint: Fingerprint, server_fingerprint: Fingerprint) -> Vec<u8> {
83+
let client = client_fingerprint.to_multihash().to_bytes();
84+
let server = server_fingerprint.to_multihash().to_bytes();
85+
const PREFIX: &[u8] = b"libp2p-webrtc-noise:";
86+
let mut out = Vec::with_capacity(PREFIX.len() + client.len() + server.len());
87+
out.extend_from_slice(PREFIX);
88+
out.extend_from_slice(&client);
89+
out.extend_from_slice(&server);
90+
out
91+
}
92+
93+
#[cfg(test)]
94+
mod tests {
95+
use super::*;
96+
use hex_literal::hex;
97+
98+
#[test]
99+
fn noise_prologue_tests() {
100+
let a = Fingerprint::raw(hex!(
101+
"3e79af40d6059617a0d83b83a52ce73b0c1f37a72c6043ad2969e2351bdca870"
102+
));
103+
let b = Fingerprint::raw(hex!(
104+
"30fc9f469c207419dfdd0aab5f27a86c973c94e40548db9375cca2e915973b99"
105+
));
106+
107+
let prologue1 = noise_prologue(a, b);
108+
let prologue2 = noise_prologue(b, a);
109+
110+
assert_eq!(hex::encode(&prologue1), "6c69627032702d7765627274632d6e6f6973653a12203e79af40d6059617a0d83b83a52ce73b0c1f37a72c6043ad2969e2351bdca870122030fc9f469c207419dfdd0aab5f27a86c973c94e40548db9375cca2e915973b99");
111+
assert_eq!(hex::encode(&prologue2), "6c69627032702d7765627274632d6e6f6973653a122030fc9f469c207419dfdd0aab5f27a86c973c94e40548db9375cca2e915973b9912203e79af40d6059617a0d83b83a52ce73b0c1f37a72c6043ad2969e2351bdca870");
112+
}
113+
}

‎transports/webrtc/tests/smoke.rs

+486
Large diffs are not rendered by default.

0 commit comments

Comments
 (0)
Please sign in to comment.