Skip to content

Make async-std compile for wasm32-unknown-unknown #225

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -32,6 +32,14 @@ matrix:
- rust: nightly-x86_64-pc-windows-msvc
os: windows

- name: wasm
rust: nightly
os: linux
before_script: |
rustup target add wasm32-unknown-unknown
script:
- cargo check --target wasm32-unknown-unknown --features unstable --all --benches --bins --examples --tests

- name: fmt
rust: nightly
os: linux
6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -36,13 +36,15 @@ futures-timer = "0.4.0"
lazy_static = "1.4.0"
log = { version = "0.4.8", features = ["kv_unstable"] }
memchr = "2.2.1"
mio = "0.6.19"
mio-uds = "0.6.7"
num_cpus = "1.10.1"
pin-utils = "0.1.0-alpha.4"
slab = "0.4.2"
kv-log-macro = "1.0.4"

[target.'cfg(not(target_os = "unknown"))'.dependencies]
mio = "0.6.19"
mio-uds = "0.6.7"

[dev-dependencies]
femme = "1.2.0"
surf = "1.0.2"
2 changes: 2 additions & 0 deletions src/net/driver/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![cfg(not(target_os = "unknown"))]

use std::fmt;
use std::sync::{Arc, Mutex};

142 changes: 104 additions & 38 deletions src/net/tcp/listener.rs
Original file line number Diff line number Diff line change
@@ -4,13 +4,20 @@ use std::pin::Pin;
use cfg_if::cfg_if;

use super::TcpStream;
use crate::future::{self, Future};
use crate::future::Future;
use crate::io;
use crate::net::driver::Watcher;
use crate::net::ToSocketAddrs;
use crate::stream::Stream;
use crate::task::{Context, Poll};

cfg_if! {
if #[cfg(not(target_os = "unknown"))] {
use crate::future;
use crate::net::driver::Watcher;
use super::stream::Inner as TcpStreamInner;
}
}

/// A TCP socket server, listening for connections.
///
/// After creating a `TcpListener` by [`bind`]ing it to a socket address, it listens for incoming
@@ -50,7 +57,19 @@ use crate::task::{Context, Poll};
/// ```
#[derive(Debug)]
pub struct TcpListener {
watcher: Watcher<mio::net::TcpListener>,
inner: Inner,
}

cfg_if! {
if #[cfg(not(target_os = "unknown"))] {
#[derive(Debug)]
struct Inner {
watcher: Watcher<mio::net::TcpListener>,
}
} else {
#[derive(Debug)]
struct Inner;
}
}

impl TcpListener {
@@ -76,25 +95,7 @@ impl TcpListener {
///
/// [`local_addr`]: #method.local_addr
pub async fn bind<A: ToSocketAddrs>(addrs: A) -> io::Result<TcpListener> {
let mut last_err = None;

for addr in addrs.to_socket_addrs().await? {
match mio::net::TcpListener::bind(&addr) {
Ok(mio_listener) => {
return Ok(TcpListener {
watcher: Watcher::new(mio_listener),
});
}
Err(err) => last_err = Some(err),
}
}

Err(last_err.unwrap_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"could not resolve to any addresses",
)
}))
bind(addrs).await
}

/// Accepts a new incoming connection to this listener.
@@ -114,15 +115,7 @@ impl TcpListener {
/// # Ok(()) }) }
/// ```
pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
let (io, addr) =
future::poll_fn(|cx| self.watcher.poll_read_with(cx, |inner| inner.accept_std()))
.await?;

let mio_stream = mio::net::TcpStream::from_stream(io)?;
let stream = TcpStream {
watcher: Watcher::new(mio_stream),
};
Ok((stream, addr))
accept(self).await
}

/// Returns a stream of incoming connections.
@@ -173,7 +166,7 @@ impl TcpListener {
/// # Ok(()) }) }
/// ```
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.watcher.get_ref().local_addr()
local_addr(self)
}
}

@@ -206,10 +199,7 @@ impl<'a> Stream for Incoming<'a> {
impl From<std::net::TcpListener> for TcpListener {
/// Converts a `std::net::TcpListener` into its asynchronous equivalent.
fn from(listener: std::net::TcpListener) -> TcpListener {
let mio_listener = mio::net::TcpListener::from_std(listener).unwrap();
TcpListener {
watcher: Watcher::new(mio_listener),
}
from(listener)
}
}

@@ -229,7 +219,7 @@ cfg_if! {
if #[cfg(any(unix, feature = "docs"))] {
impl AsRawFd for TcpListener {
fn as_raw_fd(&self) -> RawFd {
self.watcher.get_ref().as_raw_fd()
self.inner.watcher.get_ref().as_raw_fd()
}
}

@@ -241,7 +231,7 @@ cfg_if! {

impl IntoRawFd for TcpListener {
fn into_raw_fd(self) -> RawFd {
self.watcher.into_inner().into_raw_fd()
self.inner.watcher.into_inner().into_raw_fd()
}
}
}
@@ -269,3 +259,79 @@ cfg_if! {
// }
}
}

cfg_if! {
if #[cfg(not(target_os = "unknown"))] {
async fn bind<A: ToSocketAddrs>(addrs: A) -> io::Result<TcpListener> {
let mut last_err = None;

for addr in addrs.to_socket_addrs().await? {
match mio::net::TcpListener::bind(&addr) {
Ok(mio_listener) => {
return Ok(TcpListener {
inner: Inner {
watcher: Watcher::new(mio_listener),
},
});
}
Err(err) => last_err = Some(err),
}
}

Err(last_err.unwrap_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"could not resolve to any addresses",
)
}))
}

async fn accept(listener: &TcpListener) -> io::Result<(TcpStream, SocketAddr)> {
let (io, addr) =
future::poll_fn(|cx| listener.inner.watcher.poll_read_with(cx, |inner| inner.accept_std()))
.await?;

let mio_stream = mio::net::TcpStream::from_stream(io)?;
let stream = TcpStream {
inner: TcpStreamInner {
watcher: Watcher::new(mio_stream),
},
};
Ok((stream, addr))
}

fn local_addr(listener: &TcpListener) -> io::Result<SocketAddr> {
listener.inner.watcher.get_ref().local_addr()
}

fn from(listener: std::net::TcpListener) -> TcpListener {
let mio_listener = mio::net::TcpListener::from_std(listener).unwrap();
TcpListener {
inner: Inner {
watcher: Watcher::new(mio_listener),
},
}
}

} else {
async fn bind<A: ToSocketAddrs>(_: A) -> io::Result<TcpListener> {
Err(io::Error::new(
io::ErrorKind::Other,
"TCP sockets unsupported on this platform",
))
}

async fn accept(_: &TcpListener) -> io::Result<(TcpStream, SocketAddr)> {
unreachable!()
}

fn local_addr(_: &TcpListener) -> io::Result<SocketAddr> {
unreachable!()
}

fn from(_: std::net::TcpListener) -> TcpListener {
// We can never successfully build a `std::net::TcpListener` on an unknown OS.
unreachable!()
}
}
}
279 changes: 212 additions & 67 deletions src/net/tcp/stream.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
use std::io::{IoSlice, IoSliceMut, Read as _, Write as _};
use std::io::{IoSlice, IoSliceMut};
use std::net::SocketAddr;
use std::pin::Pin;

use cfg_if::cfg_if;

use crate::future;
use crate::io::{self, Read, Write};
use crate::net::driver::Watcher;
use crate::net::ToSocketAddrs;
use crate::task::blocking;
use crate::task::{Context, Poll};

cfg_if! {
if #[cfg(not(target_os = "unknown"))] {
use std::io::{Read as _, Write as _};

use crate::future;
use crate::net::driver::Watcher;
use crate::task::blocking;
}
}

/// A TCP stream between a local and a remote socket.
///
/// A `TcpStream` can either be created by connecting to an endpoint, via the [`connect`] method,
@@ -49,7 +56,19 @@ use crate::task::{Context, Poll};
/// ```
#[derive(Debug)]
pub struct TcpStream {
pub(super) watcher: Watcher<mio::net::TcpStream>,
pub(super) inner: Inner,
}

cfg_if! {
if #[cfg(not(target_os = "unknown"))] {
#[derive(Debug)]
pub(super) struct Inner {
pub(super) watcher: Watcher<mio::net::TcpStream>,
}
} else {
#[derive(Debug)]
pub(super) struct Inner;
}
}

impl TcpStream {
@@ -73,30 +92,7 @@ impl TcpStream {
/// # Ok(()) }) }
/// ```
pub async fn connect<A: ToSocketAddrs>(addrs: A) -> io::Result<TcpStream> {
let mut last_err = None;

for addr in addrs.to_socket_addrs().await? {
let res = blocking::spawn(async move {
let std_stream = std::net::TcpStream::connect(addr)?;
let mio_stream = mio::net::TcpStream::from_stream(std_stream)?;
Ok(TcpStream {
watcher: Watcher::new(mio_stream),
})
})
.await;

match res {
Ok(stream) => return Ok(stream),
Err(err) => last_err = Some(err),
}
}

Err(last_err.unwrap_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"could not resolve to any addresses",
)
}))
connect(addrs).await
}

/// Returns the local address that this stream is connected to.
@@ -114,7 +110,7 @@ impl TcpStream {
/// # Ok(()) }) }
/// ```
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.watcher.get_ref().local_addr()
local_addr(self)
}

/// Returns the remote address that this stream is connected to.
@@ -132,7 +128,7 @@ impl TcpStream {
/// # Ok(()) }) }
/// ```
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
self.watcher.get_ref().peer_addr()
peer_addr(self)
}

/// Gets the value of the `IP_TTL` option for this socket.
@@ -156,7 +152,7 @@ impl TcpStream {
/// # Ok(()) }) }
/// ```
pub fn ttl(&self) -> io::Result<u32> {
self.watcher.get_ref().ttl()
ttl(self)
}

/// Sets the value for the `IP_TTL` option on this socket.
@@ -179,7 +175,7 @@ impl TcpStream {
/// # Ok(()) }) }
/// ```
pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
self.watcher.get_ref().set_ttl(ttl)
set_ttl(self, ttl)
}

/// Receives data on the socket from the remote address to which it is connected, without
@@ -205,7 +201,7 @@ impl TcpStream {
/// # Ok(()) }) }
/// ```
pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
future::poll_fn(|cx| self.watcher.poll_read_with(cx, |inner| inner.peek(buf))).await
peek(self, buf).await
}

/// Gets the value of the `TCP_NODELAY` option on this socket.
@@ -229,7 +225,7 @@ impl TcpStream {
/// # Ok(()) }) }
/// ```
pub fn nodelay(&self) -> io::Result<bool> {
self.watcher.get_ref().nodelay()
nodelay(self)
}

/// Sets the value of the `TCP_NODELAY` option on this socket.
@@ -255,7 +251,7 @@ impl TcpStream {
/// # Ok(()) }) }
/// ```
pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
self.watcher.get_ref().set_nodelay(nodelay)
set_nodelay(self, nodelay)
}

/// Shuts down the read, write, or both halves of this connection.
@@ -280,7 +276,7 @@ impl TcpStream {
/// # Ok(()) }) }
/// ```
pub fn shutdown(&self, how: std::net::Shutdown) -> std::io::Result<()> {
self.watcher.get_ref().shutdown(how)
shutdown(self, how)
}
}

@@ -302,16 +298,6 @@ impl Read for TcpStream {
}
}

impl Read for &TcpStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
self.watcher.poll_read_with(cx, |mut inner| inner.read(buf))
}
}

impl Write for TcpStream {
fn poll_write(
self: Pin<&mut Self>,
@@ -338,32 +324,72 @@ impl Write for TcpStream {
}
}

impl Write for &TcpStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.watcher
.poll_write_with(cx, |mut inner| inner.write(buf))
}
cfg_if! {
if #[cfg(not(target_os = "unknown"))] {
impl Read for &TcpStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
self.inner.watcher.poll_read_with(cx, |mut inner| inner.read(buf))
}
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.watcher.poll_write_with(cx, |mut inner| inner.flush())
}
impl Write for &TcpStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.inner.watcher
.poll_write_with(cx, |mut inner| inner.write(buf))
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.inner.watcher.poll_write_with(cx, |mut inner| inner.flush())
}

fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}

} else {
impl Read for &TcpStream {
fn poll_read(
self: Pin<&mut Self>,
_: &mut Context<'_>,
_: &mut [u8],
) -> Poll<io::Result<usize>> {
unreachable!()
}
}

impl Write for &TcpStream {
fn poll_write(
self: Pin<&mut Self>,
_: &mut Context<'_>,
_: &[u8],
) -> Poll<io::Result<usize>> {
unreachable!()
}

fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
unreachable!()
}

fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
unreachable!()
}
}
}
}

impl From<std::net::TcpStream> for TcpStream {
/// Converts a `std::net::TcpStream` into its asynchronous equivalent.
fn from(stream: std::net::TcpStream) -> TcpStream {
let mio_stream = mio::net::TcpStream::from_stream(stream).unwrap();
TcpStream {
watcher: Watcher::new(mio_stream),
}
from(stream)
}
}

@@ -383,7 +409,7 @@ cfg_if! {
if #[cfg(any(unix, feature = "docs"))] {
impl AsRawFd for TcpStream {
fn as_raw_fd(&self) -> RawFd {
self.watcher.get_ref().as_raw_fd()
self.inner.watcher.get_ref().as_raw_fd()
}
}

@@ -395,7 +421,7 @@ cfg_if! {

impl IntoRawFd for TcpStream {
fn into_raw_fd(self) -> RawFd {
self.watcher.into_inner().into_raw_fd()
self.inner.watcher.into_inner().into_raw_fd()
}
}
}
@@ -423,3 +449,122 @@ cfg_if! {
// }
}
}

cfg_if! {
if #[cfg(not(target_os = "unknown"))] {
async fn connect<A: ToSocketAddrs>(addrs: A) -> io::Result<TcpStream> {
let mut last_err = None;

for addr in addrs.to_socket_addrs().await? {
let res = blocking::spawn(async move {
let std_stream = std::net::TcpStream::connect(addr)?;
let mio_stream = mio::net::TcpStream::from_stream(std_stream)?;
Ok(TcpStream {
inner: Inner {
watcher: Watcher::new(mio_stream),
},
})
})
.await;

match res {
Ok(stream) => return Ok(stream),
Err(err) => last_err = Some(err),
}
}

Err(last_err.unwrap_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"could not resolve to any addresses",
)
}))
}

fn local_addr(socket: &TcpStream) -> io::Result<SocketAddr> {
socket.inner.watcher.get_ref().local_addr()
}

fn peer_addr(socket: &TcpStream) -> io::Result<SocketAddr> {
socket.inner.watcher.get_ref().peer_addr()
}

fn ttl(socket: &TcpStream) -> io::Result<u32> {
socket.inner.watcher.get_ref().ttl()
}

fn set_ttl(socket: &TcpStream, ttl: u32) -> io::Result<()> {
socket.inner.watcher.get_ref().set_ttl(ttl)
}

async fn peek(socket: &TcpStream, buf: &mut [u8]) -> io::Result<usize> {
future::poll_fn(|cx| socket.inner.watcher.poll_read_with(cx, |inner| inner.peek(buf))).await
}

fn nodelay(socket: &TcpStream) -> io::Result<bool> {
socket.inner.watcher.get_ref().nodelay()
}

fn set_nodelay(socket: &TcpStream, nodelay: bool) -> io::Result<()> {
socket.inner.watcher.get_ref().set_nodelay(nodelay)
}

fn shutdown(socket: &TcpStream, how: std::net::Shutdown) -> std::io::Result<()> {
socket.inner.watcher.get_ref().shutdown(how)
}

fn from(stream: std::net::TcpStream) -> TcpStream {
let mio_stream = mio::net::TcpStream::from_stream(stream).unwrap();
TcpStream {
inner: Inner {
watcher: Watcher::new(mio_stream),
},
}
}

} else {
async fn connect<A: ToSocketAddrs>(_: A) -> io::Result<TcpStream> {
Err(io::Error::new(
io::ErrorKind::Other,
"TCP sockets unsupported on this platform",
))
}

fn local_addr(_: &TcpStream) -> io::Result<SocketAddr> {
unreachable!()
}

fn peer_addr(_: &TcpStream) -> io::Result<SocketAddr> {
unreachable!()
}

fn ttl(_: &TcpStream) -> io::Result<u32> {
unreachable!()
}

fn set_ttl(_: &TcpStream, _: u32) -> io::Result<()> {
unreachable!()
}

async fn peek(_: &TcpStream, _: &mut [u8]) -> io::Result<usize> {
unreachable!()
}

fn nodelay(_: &TcpStream) -> io::Result<bool> {
unreachable!()
}

fn set_nodelay(_: &TcpStream, _: bool) -> io::Result<()> {
unreachable!()
}

fn shutdown(_: &TcpStream, _: std::net::Shutdown) -> std::io::Result<()> {
unreachable!()
}

fn from(_: std::net::TcpStream) -> TcpStream {
// We can never successfully build a `std::net::TcpStream` on an unknown OS.
unreachable!()
}
}
}
382 changes: 294 additions & 88 deletions src/net/udp/mod.rs

Large diffs are not rendered by default.