diff --git a/.travis.yml b/.travis.yml index caf523662e..957dccb596 100644 --- a/.travis.yml +++ b/.travis.yml @@ -64,7 +64,6 @@ matrix: - cargo build --manifest-path futures-core/Cargo.toml --no-default-features - cargo build --manifest-path futures-channel/Cargo.toml --no-default-features - cargo build --manifest-path futures-executor/Cargo.toml --no-default-features - - cargo build --manifest-path futures-io/Cargo.toml --no-default-features - cargo build --manifest-path futures-sink/Cargo.toml --no-default-features - cargo build --manifest-path futures-util/Cargo.toml --no-default-features diff --git a/Cargo.toml b/Cargo.toml index 430205d8ca..fbb8598321 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,6 @@ members = [ "futures-core", "futures-channel", "futures-executor", - "futures-io", "futures-select-macro", "futures-sink", "futures-util", diff --git a/futures-core/src/io.rs b/futures-core/src/io.rs new file mode 100644 index 0000000000..c034067fb9 --- /dev/null +++ b/futures-core/src/io.rs @@ -0,0 +1,647 @@ +//! Asynchronous I/O +//! +//! This module contains the `AsyncRead`, `AsyncWrite`, `AsyncSeek`, and +//! `AsyncBufRead` traits, the asynchronous analogs to +//! `std::io::{Read, Write, Seek, BufRead}`. The primary difference is +//! that these traits integrate with the asynchronous task system. +//! +//! All items of this module are only available when the `std` feature of this +//! library is activated, and it is activated by default. + +use std::cmp; +use std::io; +use std::ops::DerefMut; +use std::pin::Pin; +use std::ptr; +use std::task::{Context, Poll}; + +// Re-export some types from `std::io` so that users don't have to deal +// with conflicts when `use`ing `futures::io` and `std::io`. +pub use self::io::{ + Error as Error, + ErrorKind as ErrorKind, + Result as Result, + IoSlice as IoSlice, + IoSliceMut as IoSliceMut, + SeekFrom as SeekFrom, +}; + +/// A type used to conditionally initialize buffers passed to `AsyncRead` +/// methods, modeled after `std`. +#[derive(Debug)] +pub struct Initializer(bool); + +impl Initializer { + /// Returns a new `Initializer` which will zero out buffers. + #[inline] + pub fn zeroing() -> Initializer { + Initializer(true) + } + + /// Returns a new `Initializer` which will not zero out buffers. + /// + /// # Safety + /// + /// This method may only be called by `AsyncRead`ers which guarantee + /// that they will not read from the buffers passed to `AsyncRead` + /// methods, and that the return value of the method accurately reflects + /// the number of bytes that have been written to the head of the buffer. + #[inline] + pub unsafe fn nop() -> Initializer { + Initializer(false) + } + + /// Indicates if a buffer should be initialized. + #[inline] + pub fn should_initialize(&self) -> bool { + self.0 + } + + /// Initializes a buffer if necessary. + #[inline] + pub fn initialize(&self, buf: &mut [u8]) { + if self.should_initialize() { + unsafe { ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len()) } + } + } +} + +/// Read bytes asynchronously. +/// +/// This trait is analogous to the `std::io::Read` trait, but integrates +/// with the asynchronous task system. In particular, the `poll_read` +/// method, unlike `Read::read`, will automatically queue the current task +/// for wakeup and return if data is not yet available, rather than blocking +/// the calling thread. +pub trait AsyncRead { + /// Determines if this `AsyncRead`er can work with buffers of + /// uninitialized memory. + /// + /// The default implementation returns an initializer which will zero + /// buffers. + /// + /// # Safety + /// + /// This method is `unsafe` because and `AsyncRead`er could otherwise + /// return a non-zeroing `Initializer` from another `AsyncRead` type + /// without an `unsafe` block. + #[inline] + unsafe fn initializer(&self) -> Initializer { + Initializer::zeroing() + } + + /// Attempt to read from the `AsyncRead` into `buf`. + /// + /// On success, returns `Poll::Ready(Ok(num_bytes_read))`. + /// + /// If no data is available for reading, the method returns + /// `Poll::Pending` and arranges for the current task (via + /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes + /// readable or is closed. + /// + /// # Implementation + /// + /// This function may not return errors of kind `WouldBlock` or + /// `Interrupted`. Implementations must convert `WouldBlock` into + /// `Poll::Pending` and either internally retry or convert + /// `Interrupted` into another error kind. + fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) + -> Poll>; + + /// Attempt to read from the `AsyncRead` into `bufs` using vectored + /// IO operations. + /// + /// This method is similar to `poll_read`, but allows data to be read + /// into multiple buffers using a single operation. + /// + /// On success, returns `Poll::Ready(Ok(num_bytes_read))`. + /// + /// If no data is available for reading, the method returns + /// `Poll::Pending` and arranges for the current task (via + /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes + /// readable or is closed. + /// By default, this method delegates to using `poll_read` on the first + /// buffer in `bufs`. Objects which support vectored IO should override + /// this method. + /// + /// # Implementation + /// + /// This function may not return errors of kind `WouldBlock` or + /// `Interrupted`. Implementations must convert `WouldBlock` into + /// `Poll::Pending` and either internally retry or convert + /// `Interrupted` into another error kind. + fn poll_read_vectored(self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>]) + -> Poll> + { + if let Some(ref mut first_iovec) = bufs.get_mut(0) { + self.poll_read(cx, first_iovec) + } else { + // `bufs` is empty. + Poll::Ready(Ok(0)) + } + } +} + +/// Write bytes asynchronously. +/// +/// This trait is analogous to the `std::io::Write` trait, but integrates +/// with the asynchronous task system. In particular, the `poll_write` +/// method, unlike `Write::write`, will automatically queue the current task +/// for wakeup and return if data is not yet available, rather than blocking +/// the calling thread. +pub trait AsyncWrite { + /// Attempt to write bytes from `buf` into the object. + /// + /// On success, returns `Poll::Ready(Ok(num_bytes_written))`. + /// + /// If the object is not ready for writing, the method returns + /// `Poll::Pending` and arranges for the current task (via + /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes + /// writable or is closed. + /// + /// # Implementation + /// + /// This function may not return errors of kind `WouldBlock` or + /// `Interrupted`. Implementations must convert `WouldBlock` into + /// `Poll::Pending` and either internally retry or convert + /// `Interrupted` into another error kind. + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) + -> Poll>; + + /// Attempt to write bytes from `bufs` into the object using vectored + /// IO operations. + /// + /// This method is similar to `poll_write`, but allows data from multiple buffers to be written + /// using a single operation. + /// + /// On success, returns `Poll::Ready(Ok(num_bytes_written))`. + /// + /// If the object is not ready for writing, the method returns + /// `Poll::Pending` and arranges for the current task (via + /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes + /// writable or is closed. + /// + /// By default, this method delegates to using `poll_write` on the first + /// buffer in `bufs`. Objects which support vectored IO should override + /// this method. + /// + /// # Implementation + /// + /// This function may not return errors of kind `WouldBlock` or + /// `Interrupted`. Implementations must convert `WouldBlock` into + /// `Poll::Pending` and either internally retry or convert + /// `Interrupted` into another error kind. + fn poll_write_vectored(self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>]) + -> Poll> + { + if let Some(ref first_iovec) = bufs.get(0) { + self.poll_write(cx, &*first_iovec) + } else { + // `bufs` is empty. + Poll::Ready(Ok(0)) + } + } + + /// Attempt to flush the object, ensuring that any buffered data reach + /// their destination. + /// + /// On success, returns `Poll::Ready(Ok(()))`. + /// + /// If flushing cannot immediately complete, this method returns + /// `Poll::Pending` and arranges for the current task (via + /// `cx.waker().wake_by_ref()`) to receive a notification when the object can make + /// progress towards flushing. + /// + /// # Implementation + /// + /// This function may not return errors of kind `WouldBlock` or + /// `Interrupted`. Implementations must convert `WouldBlock` into + /// `Poll::Pending` and either internally retry or convert + /// `Interrupted` into another error kind. + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; + + /// Attempt to close the object. + /// + /// On success, returns `Poll::Ready(Ok(()))`. + /// + /// If closing cannot immediately complete, this function returns + /// `Poll::Pending` and arranges for the current task (via + /// `cx.waker().wake_by_ref()`) to receive a notification when the object can make + /// progress towards closing. + /// + /// # Implementation + /// + /// This function may not return errors of kind `WouldBlock` or + /// `Interrupted`. Implementations must convert `WouldBlock` into + /// `Poll::Pending` and either internally retry or convert + /// `Interrupted` into another error kind. + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; +} + +/// Seek bytes asynchronously. +/// +/// This trait is analogous to the `std::io::Seek` trait, but integrates +/// with the asynchronous task system. In particular, the `poll_seek` +/// method, unlike `Seek::seek`, will automatically queue the current task +/// for wakeup and return if data is not yet available, rather than blocking +/// the calling thread. +pub trait AsyncSeek { + /// Attempt to seek to an offset, in bytes, in a stream. + /// + /// A seek beyond the end of a stream is allowed, but behavior is defined + /// by the implementation. + /// + /// If the seek operation completed successfully, + /// this method returns the new position from the start of the stream. + /// That position can be used later with [`SeekFrom::Start`]. + /// + /// # Errors + /// + /// Seeking to a negative offset is considered an error. + /// + /// # Implementation + /// + /// This function may not return errors of kind `WouldBlock` or + /// `Interrupted`. Implementations must convert `WouldBlock` into + /// `Poll::Pending` and either internally retry or convert + /// `Interrupted` into another error kind. + fn poll_seek(self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom) + -> Poll>; +} + +/// Read bytes asynchronously. +/// +/// This trait is analogous to the `std::io::BufRead` trait, but integrates +/// with the asynchronous task system. In particular, the `poll_fill_buf` +/// method, unlike `BufRead::fill_buf`, will automatically queue the current task +/// for wakeup and return if data is not yet available, rather than blocking +/// the calling thread. +pub trait AsyncBufRead: AsyncRead { + /// Attempt to return the contents of the internal buffer, filling it with more data + /// from the inner reader if it is empty. + /// + /// On success, returns `Poll::Ready(Ok(buf))`. + /// + /// If no data is available for reading, the method returns + /// `Poll::Pending` and arranges for the current task (via + /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes + /// readable or is closed. + /// + /// This function is a lower-level call. It needs to be paired with the + /// [`consume`] method to function properly. When calling this + /// method, none of the contents will be "read" in the sense that later + /// calling [`poll_read`] may return the same contents. As such, [`consume`] must + /// be called with the number of bytes that are consumed from this buffer to + /// ensure that the bytes are never returned twice. + /// + /// [`poll_read`]: AsyncRead::poll_read + /// [`consume`]: AsyncBufRead::consume + /// + /// An empty buffer returned indicates that the stream has reached EOF. + /// + /// # Implementation + /// + /// This function may not return errors of kind `WouldBlock` or + /// `Interrupted`. Implementations must convert `WouldBlock` into + /// `Poll::Pending` and either internally retry or convert + /// `Interrupted` into another error kind. + fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, cx: &mut Context<'_>) + -> Poll>; + + /// Tells this buffer that `amt` bytes have been consumed from the buffer, + /// so they should no longer be returned in calls to [`poll_read`]. + /// + /// This function is a lower-level call. It needs to be paired with the + /// [`poll_fill_buf`] method to function properly. This function does + /// not perform any I/O, it simply informs this object that some amount of + /// its buffer, returned from [`poll_fill_buf`], has been consumed and should + /// no longer be returned. As such, this function may do odd things if + /// [`poll_fill_buf`] isn't called before calling it. + /// + /// The `amt` must be `<=` the number of bytes in the buffer returned by + /// [`poll_fill_buf`]. + /// + /// [`poll_read`]: AsyncRead::poll_read + /// [`poll_fill_buf`]: AsyncBufRead::poll_fill_buf + fn consume(self: Pin<&mut Self>, amt: usize); +} + +macro_rules! deref_async_read { + () => { + unsafe fn initializer(&self) -> Initializer { + (**self).initializer() + } + + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) + -> Poll> + { + Pin::new(&mut **self).poll_read(cx, buf) + } + + fn poll_read_vectored(mut self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>]) + -> Poll> + { + Pin::new(&mut **self).poll_read_vectored(cx, bufs) + } + } +} + +impl AsyncRead for Box { + deref_async_read!(); +} + +impl AsyncRead for &mut T { + deref_async_read!(); +} + +impl

AsyncRead for Pin

+where + P: DerefMut + Unpin, + P::Target: AsyncRead, +{ + unsafe fn initializer(&self) -> Initializer { + (**self).initializer() + } + + fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) + -> Poll> + { + self.get_mut().as_mut().poll_read(cx, buf) + } + + fn poll_read_vectored(self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>]) + -> Poll> + { + self.get_mut().as_mut().poll_read_vectored(cx, bufs) + } +} + +/// `unsafe` because the `io::Read` type must not access the buffer +/// before reading data into it. +macro_rules! unsafe_delegate_async_read_to_stdio { + () => { + unsafe fn initializer(&self) -> Initializer { + Initializer::nop() + } + + fn poll_read(mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &mut [u8]) + -> Poll> + { + Poll::Ready(io::Read::read(&mut *self, buf)) + } + + fn poll_read_vectored(mut self: Pin<&mut Self>, _: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>]) + -> Poll> + { + Poll::Ready(io::Read::read_vectored(&mut *self, bufs)) + } + } +} + +impl AsyncRead for &[u8] { + unsafe_delegate_async_read_to_stdio!(); +} + +impl AsyncRead for io::Repeat { + unsafe_delegate_async_read_to_stdio!(); +} + +impl AsyncRead for io::Empty { + unsafe_delegate_async_read_to_stdio!(); +} + +impl + Unpin> AsyncRead for io::Cursor { + unsafe_delegate_async_read_to_stdio!(); +} + +macro_rules! deref_async_write { + () => { + fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) + -> Poll> + { + Pin::new(&mut **self).poll_write(cx, buf) + } + + fn poll_write_vectored(mut self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>]) + -> Poll> + { + Pin::new(&mut **self).poll_write_vectored(cx, bufs) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut **self).poll_flush(cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut **self).poll_close(cx) + } + } +} + +impl AsyncWrite for Box { + deref_async_write!(); +} + +impl AsyncWrite for &mut T { + deref_async_write!(); +} + +impl

AsyncWrite for Pin

+where + P: DerefMut + Unpin, + P::Target: AsyncWrite, +{ + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) + -> Poll> + { + self.get_mut().as_mut().poll_write(cx, buf) + } + + fn poll_write_vectored(self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>]) + -> Poll> + { + self.get_mut().as_mut().poll_write_vectored(cx, bufs) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.get_mut().as_mut().poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.get_mut().as_mut().poll_close(cx) + } +} + +macro_rules! delegate_async_write_to_stdio { + () => { + fn poll_write(mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8]) + -> Poll> + { + Poll::Ready(io::Write::write(&mut *self, buf)) + } + + fn poll_write_vectored(mut self: Pin<&mut Self>, _: &mut Context<'_>, bufs: &[IoSlice<'_>]) + -> Poll> + { + Poll::Ready(io::Write::write_vectored(&mut *self, bufs)) + } + + fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(io::Write::flush(&mut *self)) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_flush(cx) + } + } +} + +impl + Unpin> AsyncWrite for io::Cursor { + fn poll_write( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let position = self.position(); + let result = { + let out = (&mut *self).get_mut().as_mut(); + let pos = cmp::min(out.len() as u64, position) as usize; + io::Write::write(&mut &mut out[pos..], buf) + }; + if let Ok(offset) = result { + self.get_mut().set_position(position + offset as u64); + } + Poll::Ready(result) + } + + fn poll_write_vectored(self: Pin<&mut Self>, _: &mut Context<'_>, bufs: &[IoSlice<'_>]) + -> Poll> + { + Poll::Ready(io::Write::write_vectored(&mut self.get_mut().get_mut().as_mut(), bufs)) + } + + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(io::Write::flush(&mut self.get_mut().get_mut().as_mut())) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_flush(cx) + } +} + +impl AsyncWrite for Vec { + delegate_async_write_to_stdio!(); +} + +impl AsyncWrite for io::Sink { + delegate_async_write_to_stdio!(); +} + +macro_rules! deref_async_seek { + () => { + fn poll_seek(mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom) + -> Poll> + { + Pin::new(&mut **self).poll_seek(cx, pos) + } + } +} + +impl AsyncSeek for Box { + deref_async_seek!(); +} + +impl AsyncSeek for &mut T { + deref_async_seek!(); +} + + +impl

AsyncSeek for Pin

+where + P: DerefMut + Unpin, + P::Target: AsyncSeek, +{ + fn poll_seek(self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom) + -> Poll> + { + self.get_mut().as_mut().poll_seek(cx, pos) + } +} + +macro_rules! delegate_async_seek_to_stdio { + () => { + fn poll_seek(mut self: Pin<&mut Self>, _: &mut Context<'_>, pos: SeekFrom) + -> Poll> + { + Poll::Ready(io::Seek::seek(&mut *self, pos)) + } + } +} + +impl + Unpin> AsyncSeek for io::Cursor { + delegate_async_seek_to_stdio!(); +} + +macro_rules! deref_async_buf_read { + () => { + fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, cx: &mut Context<'_>) + -> Poll> + { + Pin::new(&mut **self.get_mut()).poll_fill_buf(cx) + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + Pin::new(&mut **self.get_mut()).consume(amt) + } + } +} + +impl AsyncBufRead for Box { + deref_async_buf_read!(); +} + +impl AsyncBufRead for &mut T { + deref_async_buf_read!(); +} + +impl

AsyncBufRead for Pin

+where + P: DerefMut + Unpin, + P::Target: AsyncBufRead, +{ + fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, cx: &mut Context<'_>) + -> Poll> + { + self.get_mut().as_mut().poll_fill_buf(cx) + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + self.get_mut().as_mut().consume(amt) + } +} + +macro_rules! delegate_async_buf_read_to_stdio { + () => { + fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, _: &mut Context<'_>) + -> Poll> + { + Poll::Ready(io::BufRead::fill_buf(self.get_mut())) + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + io::BufRead::consume(self.get_mut(), amt) + } + } +} + +impl AsyncBufRead for &[u8] { + delegate_async_buf_read_to_stdio!(); +} + +impl AsyncBufRead for io::Empty { + delegate_async_buf_read_to_stdio!(); +} + +impl + Unpin> AsyncBufRead for io::Cursor { + delegate_async_buf_read_to_stdio!(); +} diff --git a/futures-core/src/lib.rs b/futures-core/src/lib.rs index 95331a2f29..9709a1bad1 100644 --- a/futures-core/src/lib.rs +++ b/futures-core/src/lib.rs @@ -27,3 +27,8 @@ pub mod stream; pub mod task; #[doc(hidden)] pub use self::task::Poll; + +#[cfg(feature = "std")] +pub mod io; +#[cfg(feature = "std")] +#[doc(hidden)] pub use self::io::{AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead}; diff --git a/futures-io/Cargo.toml b/futures-io/Cargo.toml deleted file mode 100644 index 6060521d4e..0000000000 --- a/futures-io/Cargo.toml +++ /dev/null @@ -1,26 +0,0 @@ -[package] -name = "futures-io-preview" -edition = "2018" -version = "0.3.0-alpha.16" -authors = ["Alex Crichton "] -license = "MIT OR Apache-2.0" -repository = "https://github.com/rust-lang-nursery/futures-rs" -homepage = "https://rust-lang-nursery.github.io/futures-rs" -documentation = "https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.16/futures_io" -description = """ -The `AsyncRead` and `AsyncWrite` traits for the futures-rs library. -""" - -[lib] -name = "futures_io" - -[features] -std = ["futures-core-preview/std"] -default = ["std"] - -[dependencies] -futures-core-preview = { path = "../futures-core", version = "=0.3.0-alpha.16", default-features = false } - -[dev-dependencies] -futures-preview = { path = "../futures", version = "=0.3.0-alpha.16" } -assert_matches = "1.3.0" diff --git a/futures-io/LICENSE-APACHE b/futures-io/LICENSE-APACHE deleted file mode 120000 index 965b606f33..0000000000 --- a/futures-io/LICENSE-APACHE +++ /dev/null @@ -1 +0,0 @@ -../LICENSE-APACHE \ No newline at end of file diff --git a/futures-io/LICENSE-MIT b/futures-io/LICENSE-MIT deleted file mode 120000 index 76219eb72e..0000000000 --- a/futures-io/LICENSE-MIT +++ /dev/null @@ -1 +0,0 @@ -../LICENSE-MIT \ No newline at end of file diff --git a/futures-io/src/lib.rs b/futures-io/src/lib.rs deleted file mode 100644 index a73c3f6bc0..0000000000 --- a/futures-io/src/lib.rs +++ /dev/null @@ -1,665 +0,0 @@ -//! Asynchronous I/O -//! -//! This crate contains the `AsyncRead`, `AsyncWrite`, `AsyncSeek`, and -//! `AsyncBufRead` traits, the asynchronous analogs to -//! `std::io::{Read, Write, Seek, BufRead}`. The primary difference is -//! that these traits integrate with the asynchronous task system. -//! -//! All items of this library are only available when the `std` feature of this -//! library is activated, and it is activated by default. - -#![cfg_attr(not(feature = "std"), no_std)] - -#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)] -// It cannot be included in the published code because this lints have false positives in the minimum required version. -#![cfg_attr(test, warn(single_use_lifetimes))] -#![warn(clippy::all)] - -#![doc(test(attr(deny(warnings), allow(dead_code, unused_assignments, unused_variables))))] - -#![doc(html_root_url = "https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.16/futures_io")] - -#[cfg(feature = "std")] -mod if_std { - use futures_core::task::{Context, Poll}; - use std::cmp; - use std::io as StdIo; - use std::ops::DerefMut; - use std::pin::Pin; - use std::ptr; - - // Re-export some types from `std::io` so that users don't have to deal - // with conflicts when `use`ing `futures::io` and `std::io`. - #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 - pub use self::StdIo::{ - Error as Error, - ErrorKind as ErrorKind, - Result as Result, - IoSlice as IoSlice, - IoSliceMut as IoSliceMut, - SeekFrom as SeekFrom, - }; - - /// A type used to conditionally initialize buffers passed to `AsyncRead` - /// methods, modeled after `std`. - #[derive(Debug)] - pub struct Initializer(bool); - - impl Initializer { - /// Returns a new `Initializer` which will zero out buffers. - #[inline] - pub fn zeroing() -> Initializer { - Initializer(true) - } - - /// Returns a new `Initializer` which will not zero out buffers. - /// - /// # Safety - /// - /// This method may only be called by `AsyncRead`ers which guarantee - /// that they will not read from the buffers passed to `AsyncRead` - /// methods, and that the return value of the method accurately reflects - /// the number of bytes that have been written to the head of the buffer. - #[inline] - pub unsafe fn nop() -> Initializer { - Initializer(false) - } - - /// Indicates if a buffer should be initialized. - #[inline] - pub fn should_initialize(&self) -> bool { - self.0 - } - - /// Initializes a buffer if necessary. - #[inline] - pub fn initialize(&self, buf: &mut [u8]) { - if self.should_initialize() { - unsafe { ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len()) } - } - } - } - - /// Read bytes asynchronously. - /// - /// This trait is analogous to the `std::io::Read` trait, but integrates - /// with the asynchronous task system. In particular, the `poll_read` - /// method, unlike `Read::read`, will automatically queue the current task - /// for wakeup and return if data is not yet available, rather than blocking - /// the calling thread. - pub trait AsyncRead { - /// Determines if this `AsyncRead`er can work with buffers of - /// uninitialized memory. - /// - /// The default implementation returns an initializer which will zero - /// buffers. - /// - /// # Safety - /// - /// This method is `unsafe` because and `AsyncRead`er could otherwise - /// return a non-zeroing `Initializer` from another `AsyncRead` type - /// without an `unsafe` block. - #[inline] - unsafe fn initializer(&self) -> Initializer { - Initializer::zeroing() - } - - /// Attempt to read from the `AsyncRead` into `buf`. - /// - /// On success, returns `Poll::Ready(Ok(num_bytes_read))`. - /// - /// If no data is available for reading, the method returns - /// `Poll::Pending` and arranges for the current task (via - /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes - /// readable or is closed. - /// - /// # Implementation - /// - /// This function may not return errors of kind `WouldBlock` or - /// `Interrupted`. Implementations must convert `WouldBlock` into - /// `Poll::Pending` and either internally retry or convert - /// `Interrupted` into another error kind. - fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) - -> Poll>; - - /// Attempt to read from the `AsyncRead` into `bufs` using vectored - /// IO operations. - /// - /// This method is similar to `poll_read`, but allows data to be read - /// into multiple buffers using a single operation. - /// - /// On success, returns `Poll::Ready(Ok(num_bytes_read))`. - /// - /// If no data is available for reading, the method returns - /// `Poll::Pending` and arranges for the current task (via - /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes - /// readable or is closed. - /// By default, this method delegates to using `poll_read` on the first - /// buffer in `bufs`. Objects which support vectored IO should override - /// this method. - /// - /// # Implementation - /// - /// This function may not return errors of kind `WouldBlock` or - /// `Interrupted`. Implementations must convert `WouldBlock` into - /// `Poll::Pending` and either internally retry or convert - /// `Interrupted` into another error kind. - fn poll_read_vectored(self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>]) - -> Poll> - { - if let Some(ref mut first_iovec) = bufs.get_mut(0) { - self.poll_read(cx, first_iovec) - } else { - // `bufs` is empty. - Poll::Ready(Ok(0)) - } - } - } - - /// Write bytes asynchronously. - /// - /// This trait is analogous to the `std::io::Write` trait, but integrates - /// with the asynchronous task system. In particular, the `poll_write` - /// method, unlike `Write::write`, will automatically queue the current task - /// for wakeup and return if data is not yet available, rather than blocking - /// the calling thread. - pub trait AsyncWrite { - /// Attempt to write bytes from `buf` into the object. - /// - /// On success, returns `Poll::Ready(Ok(num_bytes_written))`. - /// - /// If the object is not ready for writing, the method returns - /// `Poll::Pending` and arranges for the current task (via - /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes - /// writable or is closed. - /// - /// # Implementation - /// - /// This function may not return errors of kind `WouldBlock` or - /// `Interrupted`. Implementations must convert `WouldBlock` into - /// `Poll::Pending` and either internally retry or convert - /// `Interrupted` into another error kind. - fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) - -> Poll>; - - /// Attempt to write bytes from `bufs` into the object using vectored - /// IO operations. - /// - /// This method is similar to `poll_write`, but allows data from multiple buffers to be written - /// using a single operation. - /// - /// On success, returns `Poll::Ready(Ok(num_bytes_written))`. - /// - /// If the object is not ready for writing, the method returns - /// `Poll::Pending` and arranges for the current task (via - /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes - /// writable or is closed. - /// - /// By default, this method delegates to using `poll_write` on the first - /// buffer in `bufs`. Objects which support vectored IO should override - /// this method. - /// - /// # Implementation - /// - /// This function may not return errors of kind `WouldBlock` or - /// `Interrupted`. Implementations must convert `WouldBlock` into - /// `Poll::Pending` and either internally retry or convert - /// `Interrupted` into another error kind. - fn poll_write_vectored(self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>]) - -> Poll> - { - if let Some(ref first_iovec) = bufs.get(0) { - self.poll_write(cx, &*first_iovec) - } else { - // `bufs` is empty. - Poll::Ready(Ok(0)) - } - } - - /// Attempt to flush the object, ensuring that any buffered data reach - /// their destination. - /// - /// On success, returns `Poll::Ready(Ok(()))`. - /// - /// If flushing cannot immediately complete, this method returns - /// `Poll::Pending` and arranges for the current task (via - /// `cx.waker().wake_by_ref()`) to receive a notification when the object can make - /// progress towards flushing. - /// - /// # Implementation - /// - /// This function may not return errors of kind `WouldBlock` or - /// `Interrupted`. Implementations must convert `WouldBlock` into - /// `Poll::Pending` and either internally retry or convert - /// `Interrupted` into another error kind. - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; - - /// Attempt to close the object. - /// - /// On success, returns `Poll::Ready(Ok(()))`. - /// - /// If closing cannot immediately complete, this function returns - /// `Poll::Pending` and arranges for the current task (via - /// `cx.waker().wake_by_ref()`) to receive a notification when the object can make - /// progress towards closing. - /// - /// # Implementation - /// - /// This function may not return errors of kind `WouldBlock` or - /// `Interrupted`. Implementations must convert `WouldBlock` into - /// `Poll::Pending` and either internally retry or convert - /// `Interrupted` into another error kind. - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; - } - - /// Seek bytes asynchronously. - /// - /// This trait is analogous to the `std::io::Seek` trait, but integrates - /// with the asynchronous task system. In particular, the `poll_seek` - /// method, unlike `Seek::seek`, will automatically queue the current task - /// for wakeup and return if data is not yet available, rather than blocking - /// the calling thread. - pub trait AsyncSeek { - /// Attempt to seek to an offset, in bytes, in a stream. - /// - /// A seek beyond the end of a stream is allowed, but behavior is defined - /// by the implementation. - /// - /// If the seek operation completed successfully, - /// this method returns the new position from the start of the stream. - /// That position can be used later with [`SeekFrom::Start`]. - /// - /// # Errors - /// - /// Seeking to a negative offset is considered an error. - /// - /// # Implementation - /// - /// This function may not return errors of kind `WouldBlock` or - /// `Interrupted`. Implementations must convert `WouldBlock` into - /// `Poll::Pending` and either internally retry or convert - /// `Interrupted` into another error kind. - fn poll_seek(self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom) - -> Poll>; - } - - /// Read bytes asynchronously. - /// - /// This trait is analogous to the `std::io::BufRead` trait, but integrates - /// with the asynchronous task system. In particular, the `poll_fill_buf` - /// method, unlike `BufRead::fill_buf`, will automatically queue the current task - /// for wakeup and return if data is not yet available, rather than blocking - /// the calling thread. - pub trait AsyncBufRead: AsyncRead { - /// Attempt to return the contents of the internal buffer, filling it with more data - /// from the inner reader if it is empty. - /// - /// On success, returns `Poll::Ready(Ok(buf))`. - /// - /// If no data is available for reading, the method returns - /// `Poll::Pending` and arranges for the current task (via - /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes - /// readable or is closed. - /// - /// This function is a lower-level call. It needs to be paired with the - /// [`consume`] method to function properly. When calling this - /// method, none of the contents will be "read" in the sense that later - /// calling [`poll_read`] may return the same contents. As such, [`consume`] must - /// be called with the number of bytes that are consumed from this buffer to - /// ensure that the bytes are never returned twice. - /// - /// [`poll_read`]: AsyncRead::poll_read - /// [`consume`]: AsyncBufRead::consume - /// - /// An empty buffer returned indicates that the stream has reached EOF. - /// - /// # Implementation - /// - /// This function may not return errors of kind `WouldBlock` or - /// `Interrupted`. Implementations must convert `WouldBlock` into - /// `Poll::Pending` and either internally retry or convert - /// `Interrupted` into another error kind. - fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, cx: &mut Context<'_>) - -> Poll>; - - /// Tells this buffer that `amt` bytes have been consumed from the buffer, - /// so they should no longer be returned in calls to [`poll_read`]. - /// - /// This function is a lower-level call. It needs to be paired with the - /// [`poll_fill_buf`] method to function properly. This function does - /// not perform any I/O, it simply informs this object that some amount of - /// its buffer, returned from [`poll_fill_buf`], has been consumed and should - /// no longer be returned. As such, this function may do odd things if - /// [`poll_fill_buf`] isn't called before calling it. - /// - /// The `amt` must be `<=` the number of bytes in the buffer returned by - /// [`poll_fill_buf`]. - /// - /// [`poll_read`]: AsyncRead::poll_read - /// [`poll_fill_buf`]: AsyncBufRead::poll_fill_buf - fn consume(self: Pin<&mut Self>, amt: usize); - } - - macro_rules! deref_async_read { - () => { - unsafe fn initializer(&self) -> Initializer { - (**self).initializer() - } - - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) - -> Poll> - { - Pin::new(&mut **self).poll_read(cx, buf) - } - - fn poll_read_vectored(mut self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>]) - -> Poll> - { - Pin::new(&mut **self).poll_read_vectored(cx, bufs) - } - } - } - - impl AsyncRead for Box { - deref_async_read!(); - } - - impl AsyncRead for &mut T { - deref_async_read!(); - } - - impl

AsyncRead for Pin

- where - P: DerefMut + Unpin, - P::Target: AsyncRead, - { - unsafe fn initializer(&self) -> Initializer { - (**self).initializer() - } - - fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) - -> Poll> - { - self.get_mut().as_mut().poll_read(cx, buf) - } - - fn poll_read_vectored(self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>]) - -> Poll> - { - self.get_mut().as_mut().poll_read_vectored(cx, bufs) - } - } - - /// `unsafe` because the `StdIo::Read` type must not access the buffer - /// before reading data into it. - macro_rules! unsafe_delegate_async_read_to_stdio { - () => { - unsafe fn initializer(&self) -> Initializer { - Initializer::nop() - } - - fn poll_read(mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &mut [u8]) - -> Poll> - { - Poll::Ready(StdIo::Read::read(&mut *self, buf)) - } - - fn poll_read_vectored(mut self: Pin<&mut Self>, _: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>]) - -> Poll> - { - Poll::Ready(StdIo::Read::read_vectored(&mut *self, bufs)) - } - } - } - - impl AsyncRead for &[u8] { - unsafe_delegate_async_read_to_stdio!(); - } - - impl AsyncRead for StdIo::Repeat { - unsafe_delegate_async_read_to_stdio!(); - } - - impl AsyncRead for StdIo::Empty { - unsafe_delegate_async_read_to_stdio!(); - } - - impl + Unpin> AsyncRead for StdIo::Cursor { - unsafe_delegate_async_read_to_stdio!(); - } - - macro_rules! deref_async_write { - () => { - fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) - -> Poll> - { - Pin::new(&mut **self).poll_write(cx, buf) - } - - fn poll_write_vectored(mut self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>]) - -> Poll> - { - Pin::new(&mut **self).poll_write_vectored(cx, bufs) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut **self).poll_flush(cx) - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut **self).poll_close(cx) - } - } - } - - impl AsyncWrite for Box { - deref_async_write!(); - } - - impl AsyncWrite for &mut T { - deref_async_write!(); - } - - impl

AsyncWrite for Pin

- where - P: DerefMut + Unpin, - P::Target: AsyncWrite, - { - fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) - -> Poll> - { - self.get_mut().as_mut().poll_write(cx, buf) - } - - fn poll_write_vectored(self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>]) - -> Poll> - { - self.get_mut().as_mut().poll_write_vectored(cx, bufs) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.get_mut().as_mut().poll_flush(cx) - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.get_mut().as_mut().poll_close(cx) - } - } - - macro_rules! delegate_async_write_to_stdio { - () => { - fn poll_write(mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8]) - -> Poll> - { - Poll::Ready(StdIo::Write::write(&mut *self, buf)) - } - - fn poll_write_vectored(mut self: Pin<&mut Self>, _: &mut Context<'_>, bufs: &[IoSlice<'_>]) - -> Poll> - { - Poll::Ready(StdIo::Write::write_vectored(&mut *self, bufs)) - } - - fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(StdIo::Write::flush(&mut *self)) - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.poll_flush(cx) - } - } - } - - impl + Unpin> AsyncWrite for StdIo::Cursor { - fn poll_write( - mut self: Pin<&mut Self>, - _: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - let position = self.position(); - let result = { - let out = (&mut *self).get_mut().as_mut(); - let pos = cmp::min(out.len() as u64, position) as usize; - StdIo::Write::write(&mut &mut out[pos..], buf) - }; - if let Ok(offset) = result { - self.get_mut().set_position(position + offset as u64); - } - Poll::Ready(result) - } - - fn poll_write_vectored(self: Pin<&mut Self>, _: &mut Context<'_>, bufs: &[IoSlice<'_>]) - -> Poll> - { - Poll::Ready(StdIo::Write::write_vectored(&mut self.get_mut().get_mut().as_mut(), bufs)) - } - - fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(StdIo::Write::flush(&mut self.get_mut().get_mut().as_mut())) - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.poll_flush(cx) - } - } - - impl AsyncWrite for Vec { - delegate_async_write_to_stdio!(); - } - - impl AsyncWrite for StdIo::Sink { - delegate_async_write_to_stdio!(); - } - - macro_rules! deref_async_seek { - () => { - fn poll_seek(mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom) - -> Poll> - { - Pin::new(&mut **self).poll_seek(cx, pos) - } - } - } - - impl AsyncSeek for Box { - deref_async_seek!(); - } - - impl AsyncSeek for &mut T { - deref_async_seek!(); - } - - - impl

AsyncSeek for Pin

- where - P: DerefMut + Unpin, - P::Target: AsyncSeek, - { - fn poll_seek(self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom) - -> Poll> - { - self.get_mut().as_mut().poll_seek(cx, pos) - } - } - - macro_rules! delegate_async_seek_to_stdio { - () => { - fn poll_seek(mut self: Pin<&mut Self>, _: &mut Context<'_>, pos: SeekFrom) - -> Poll> - { - Poll::Ready(StdIo::Seek::seek(&mut *self, pos)) - } - } - } - - impl + Unpin> AsyncSeek for StdIo::Cursor { - delegate_async_seek_to_stdio!(); - } - - macro_rules! deref_async_buf_read { - () => { - fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, cx: &mut Context<'_>) - -> Poll> - { - Pin::new(&mut **self.get_mut()).poll_fill_buf(cx) - } - - fn consume(self: Pin<&mut Self>, amt: usize) { - Pin::new(&mut **self.get_mut()).consume(amt) - } - } - } - - impl AsyncBufRead for Box { - deref_async_buf_read!(); - } - - impl AsyncBufRead for &mut T { - deref_async_buf_read!(); - } - - impl

AsyncBufRead for Pin

- where - P: DerefMut + Unpin, - P::Target: AsyncBufRead, - { - fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, cx: &mut Context<'_>) - -> Poll> - { - self.get_mut().as_mut().poll_fill_buf(cx) - } - - fn consume(self: Pin<&mut Self>, amt: usize) { - self.get_mut().as_mut().consume(amt) - } - } - - macro_rules! delegate_async_buf_read_to_stdio { - () => { - fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, _: &mut Context<'_>) - -> Poll> - { - Poll::Ready(StdIo::BufRead::fill_buf(self.get_mut())) - } - - fn consume(self: Pin<&mut Self>, amt: usize) { - StdIo::BufRead::consume(self.get_mut(), amt) - } - } - } - - impl AsyncBufRead for &[u8] { - delegate_async_buf_read_to_stdio!(); - } - - impl AsyncBufRead for StdIo::Empty { - delegate_async_buf_read_to_stdio!(); - } - - impl + Unpin> AsyncBufRead for StdIo::Cursor { - delegate_async_buf_read_to_stdio!(); - } -} - -#[cfg(feature = "std")] -pub use self::if_std::*; diff --git a/futures-test/Cargo.toml b/futures-test/Cargo.toml index 5630a2f354..faa103bffb 100644 --- a/futures-test/Cargo.toml +++ b/futures-test/Cargo.toml @@ -16,7 +16,6 @@ name = "futures_test" [dependencies] futures-core-preview = { version = "=0.3.0-alpha.16", path = "../futures-core", default-features = false } -futures-io-preview = { version = "=0.3.0-alpha.16", path = "../futures-io", default-features = false } futures-util-preview = { version = "=0.3.0-alpha.16", path = "../futures-util", default-features = false } futures-executor-preview = { version = "=0.3.0-alpha.16", path = "../futures-executor", default-features = false } pin-utils = { version = "0.1.0-alpha.4", default-features = false } @@ -26,4 +25,4 @@ futures-preview = { version = "=0.3.0-alpha.16", path = "../futures", default-fe [features] default = ["std"] -std = ["futures-core-preview/std", "futures-io-preview/std", "futures-util-preview/std", "futures-executor-preview/std"] +std = ["futures-core-preview/std", "futures-util-preview/std", "futures-executor-preview/std"] diff --git a/futures-test/src/interleave_pending.rs b/futures-test/src/interleave_pending.rs index aac3bac9a0..5a0ee1a5fd 100644 --- a/futures-test/src/interleave_pending.rs +++ b/futures-test/src/interleave_pending.rs @@ -1,6 +1,6 @@ use futures_core::future::Future; use futures_core::stream::Stream; -use futures_io::{self as io, AsyncBufRead, AsyncRead, AsyncWrite}; +use futures_core::io::{self as io, AsyncBufRead, AsyncRead, AsyncWrite}; use pin_utils::{unsafe_pinned, unsafe_unpinned}; use std::{ pin::Pin, diff --git a/futures-test/src/io/limited.rs b/futures-test/src/io/limited.rs index 5d6228d9d3..b015f22f89 100644 --- a/futures-test/src/io/limited.rs +++ b/futures-test/src/io/limited.rs @@ -1,4 +1,4 @@ -use futures_io::{self as io, AsyncBufRead, AsyncRead, AsyncWrite}; +use futures_core::io::{self as io, AsyncBufRead, AsyncRead, AsyncWrite}; use pin_utils::{unsafe_pinned, unsafe_unpinned}; use std::{ cmp, diff --git a/futures-test/src/io/read/mod.rs b/futures-test/src/io/read/mod.rs index 064f646f5f..0bb023fd3e 100644 --- a/futures-test/src/io/read/mod.rs +++ b/futures-test/src/io/read/mod.rs @@ -1,6 +1,6 @@ //! Additional combinators for testing async readers. -use futures_io::AsyncRead; +use futures_core::io::AsyncRead; pub use super::limited::Limited; pub use crate::interleave_pending::InterleavePending; diff --git a/futures-test/src/io/write/mod.rs b/futures-test/src/io/write/mod.rs index 5c506178da..e216f5509c 100644 --- a/futures-test/src/io/write/mod.rs +++ b/futures-test/src/io/write/mod.rs @@ -1,6 +1,6 @@ //! Additional combinators for testing async writers. -use futures_io::AsyncWrite; +use futures_core::io::AsyncWrite; pub use super::limited::Limited; pub use crate::interleave_pending::InterleavePending; diff --git a/futures-util/Cargo.toml b/futures-util/Cargo.toml index 1c668b45b1..4d15ef38fd 100644 --- a/futures-util/Cargo.toml +++ b/futures-util/Cargo.toml @@ -15,7 +15,7 @@ Common utilities and extension traits for the futures-rs library. name = "futures_util" [features] -std = ["alloc", "futures-core-preview/std", "futures-io-preview/std", "futures-sink-preview/std", "slab", "memchr"] +std = ["alloc", "futures-core-preview/std", "futures-sink-preview/std", "slab", "memchr"] default = ["std"] async-await = ["std", "futures-select-macro-preview", "proc-macro-hack", "proc-macro-nested", "rand", "rand_core"] compat = ["std", "futures_01"] @@ -29,7 +29,6 @@ alloc = ["futures-core-preview/alloc", "futures-sink-preview/alloc"] [dependencies] futures-core-preview = { path = "../futures-core", version = "=0.3.0-alpha.16", default-features = false } futures-channel-preview = { path = "../futures-channel", version = "=0.3.0-alpha.16", default-features = false } -futures-io-preview = { path = "../futures-io", version = "=0.3.0-alpha.16", default-features = false } futures-sink-preview = { path = "../futures-sink", version = "=0.3.0-alpha.16", default-features = false} futures-select-macro-preview = { path = "../futures-select-macro", version = "=0.3.0-alpha.16", default-features = false, optional = true } proc-macro-hack = { version = "0.5", optional = true } diff --git a/futures-util/src/compat/compat01as03.rs b/futures-util/src/compat/compat01as03.rs index a7d0489c3c..ae4e99be78 100644 --- a/futures-util/src/compat/compat01as03.rs +++ b/futures-util/src/compat/compat01as03.rs @@ -339,7 +339,7 @@ unsafe impl UnsafeNotify01 for NotifyWaker { #[cfg(feature = "io-compat")] mod io { use super::*; - use futures_io::{ + use futures_core::io::{ AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03, Initializer, }; use std::io::Error; @@ -347,8 +347,8 @@ mod io { /// Extension trait for tokio-io [`AsyncRead`](tokio_io::AsyncRead) pub trait AsyncRead01CompatExt: AsyncRead01 { - /// Converts a tokio-io [`AsyncRead`](tokio_io::AsyncRead) into a futures-io 0.3 - /// [`AsyncRead`](futures_io::AsyncRead). + /// Converts a tokio-io [`AsyncRead`](tokio_io::AsyncRead) into a futures 0.3 + /// [`AsyncRead`](futures_core::io::AsyncRead). /// /// ``` /// #![feature(async_await, impl_trait_in_bindings)] @@ -376,8 +376,8 @@ mod io { /// Extension trait for tokio-io [`AsyncWrite`](tokio_io::AsyncWrite) pub trait AsyncWrite01CompatExt: AsyncWrite01 { - /// Converts a tokio-io [`AsyncWrite`](tokio_io::AsyncWrite) into a futures-io 0.3 - /// [`AsyncWrite`](futures_io::AsyncWrite). + /// Converts a tokio-io [`AsyncWrite`](tokio_io::AsyncWrite) into a futures 0.3 + /// [`AsyncWrite`](futures_core::io::AsyncWrite). /// /// ``` /// #![feature(async_await, impl_trait_in_bindings)] diff --git a/futures-util/src/compat/compat03as01.rs b/futures-util/src/compat/compat03as01.rs index cdc3cf9fd6..a34d8904c2 100644 --- a/futures-util/src/compat/compat03as01.rs +++ b/futures-util/src/compat/compat03as01.rs @@ -215,7 +215,7 @@ where #[cfg(feature = "io-compat")] mod io { use super::*; - use futures_io::{AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03}; + use futures_core::io::{AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03}; use tokio_io::{AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01}; fn poll_03_to_io(x: task03::Poll>) diff --git a/futures-util/src/io/allow_std.rs b/futures-util/src/io/allow_std.rs index 4e752bb1e9..8d75d491a2 100644 --- a/futures-util/src/io/allow_std.rs +++ b/futures-util/src/io/allow_std.rs @@ -1,5 +1,5 @@ use futures_core::task::{Context, Poll}; -use futures_io::{AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead, IoSlice, IoSliceMut, SeekFrom}; +use futures_core::io::{AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead, IoSlice, IoSliceMut, SeekFrom}; use std::{fmt, io}; use std::pin::Pin; diff --git a/futures-util/src/io/buf_reader.rs b/futures-util/src/io/buf_reader.rs index a8d5e6c57f..8076a4e446 100644 --- a/futures-util/src/io/buf_reader.rs +++ b/futures-util/src/io/buf_reader.rs @@ -1,5 +1,5 @@ use futures_core::task::{Context, Poll}; -use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, Initializer, IoSliceMut, SeekFrom}; +use futures_core::io::{AsyncBufRead, AsyncRead, AsyncSeek, Initializer, IoSliceMut, SeekFrom}; use pin_utils::{unsafe_pinned, unsafe_unpinned}; use std::io::{self, Read}; use std::pin::Pin; @@ -22,7 +22,7 @@ use super::DEFAULT_BUF_SIZE; /// discarded. Creating multiple instances of a `BufReader` on the same /// stream can cause data loss. /// -/// [`AsyncRead`]: futures_io::AsyncRead +/// [`AsyncRead`]: futures_core::io::AsyncRead /// // TODO: Examples pub struct BufReader { @@ -220,7 +220,7 @@ impl AsyncSeek for BufReader { /// To seek without discarding the internal buffer, use /// [`BufReader::poll_seek_relative`](BufReader::poll_seek_relative). /// - /// See [`AsyncSeek`](futures_io::AsyncSeek) for more details. + /// See [`AsyncSeek`](futures_core::io::AsyncSeek) for more details. /// /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)` /// where `n` minus the internal buffer length overflows an `i64`, two diff --git a/futures-util/src/io/buf_writer.rs b/futures-util/src/io/buf_writer.rs index ae8198c2bc..c71e316b25 100644 --- a/futures-util/src/io/buf_writer.rs +++ b/futures-util/src/io/buf_writer.rs @@ -1,5 +1,5 @@ use futures_core::task::{Context, Poll}; -use futures_io::{AsyncSeek, AsyncWrite, IoSlice, SeekFrom}; +use futures_core::io::{AsyncSeek, AsyncWrite, IoSlice, SeekFrom}; use pin_utils::{unsafe_pinned, unsafe_unpinned}; use std::fmt; use std::io::{self, Write}; @@ -23,7 +23,7 @@ use super::DEFAULT_BUF_SIZE; /// stream can cause data loss. If you need to write out the contents of its /// buffer, you must manually call flush before the writer is dropped. /// -/// [`AsyncWrite`]: futures_io::AsyncWrite +/// [`AsyncWrite`]: futures_core::io::AsyncWrite /// [`flush`]: super::AsyncWriteExt::flush /// // TODO: Examples diff --git a/futures-util/src/io/close.rs b/futures-util/src/io/close.rs index ecc204c3f8..18b7cdd99f 100644 --- a/futures-util/src/io/close.rs +++ b/futures-util/src/io/close.rs @@ -1,6 +1,6 @@ use futures_core::future::Future; use futures_core::task::{Context, Poll}; -use futures_io::AsyncWrite; +use futures_core::io::AsyncWrite; use std::io; use std::pin::Pin; diff --git a/futures-util/src/io/copy_buf_into.rs b/futures-util/src/io/copy_buf_into.rs index 37b122a8d9..ef32a56d80 100644 --- a/futures-util/src/io/copy_buf_into.rs +++ b/futures-util/src/io/copy_buf_into.rs @@ -1,6 +1,6 @@ use futures_core::future::Future; use futures_core::task::{Context, Poll}; -use futures_io::{AsyncBufRead, AsyncWrite}; +use futures_core::io::{AsyncBufRead, AsyncWrite}; use std::io; use std::pin::Pin; diff --git a/futures-util/src/io/copy_into.rs b/futures-util/src/io/copy_into.rs index 198386da49..6dbee833ee 100644 --- a/futures-util/src/io/copy_into.rs +++ b/futures-util/src/io/copy_into.rs @@ -1,6 +1,6 @@ use futures_core::future::Future; use futures_core::task::{Context, Poll}; -use futures_io::{AsyncRead, AsyncWrite}; +use futures_core::io::{AsyncRead, AsyncWrite}; use std::io; use std::pin::Pin; use super::{BufReader, CopyBufInto}; diff --git a/futures-util/src/io/flush.rs b/futures-util/src/io/flush.rs index 18a1f8e1cb..8faa1e0d37 100644 --- a/futures-util/src/io/flush.rs +++ b/futures-util/src/io/flush.rs @@ -1,6 +1,6 @@ use futures_core::future::Future; use futures_core::task::{Context, Poll}; -use futures_io::AsyncWrite; +use futures_core::io::AsyncWrite; use std::io; use std::pin::Pin; diff --git a/futures-util/src/io/into_sink.rs b/futures-util/src/io/into_sink.rs index 3573519406..616b3b8ad0 100644 --- a/futures-util/src/io/into_sink.rs +++ b/futures-util/src/io/into_sink.rs @@ -1,5 +1,5 @@ use futures_core::task::{Context, Poll}; -use futures_io::AsyncWrite; +use futures_core::io::AsyncWrite; use futures_sink::Sink; use std::io; use std::pin::Pin; diff --git a/futures-util/src/io/lines.rs b/futures-util/src/io/lines.rs index b2933fcb66..9c2e94aa0c 100644 --- a/futures-util/src/io/lines.rs +++ b/futures-util/src/io/lines.rs @@ -1,6 +1,6 @@ use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; -use futures_io::AsyncBufRead; +use futures_core::io::AsyncBufRead; use std::io; use std::mem; use std::pin::Pin; diff --git a/futures-util/src/io/mod.rs b/futures-util/src/io/mod.rs index ac9964725d..1154c6a14c 100644 --- a/futures-util/src/io/mod.rs +++ b/futures-util/src/io/mod.rs @@ -9,7 +9,7 @@ //! This module is only available when the `std` feature of this //! library is activated, and it is activated by default. -pub use futures_io::{ +pub use futures_core::io::{ AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead, IoSlice, IoSliceMut, SeekFrom, }; diff --git a/futures-util/src/io/read_line.rs b/futures-util/src/io/read_line.rs index 9517e4e4eb..e70008fc7c 100644 --- a/futures-util/src/io/read_line.rs +++ b/futures-util/src/io/read_line.rs @@ -1,6 +1,6 @@ use futures_core::future::Future; use futures_core::task::{Context, Poll}; -use futures_io::AsyncBufRead; +use futures_core::io::AsyncBufRead; use std::io; use std::mem; use std::pin::Pin; diff --git a/futures-util/src/io/read_to_end.rs b/futures-util/src/io/read_to_end.rs index 691e66dd09..e198718b3e 100644 --- a/futures-util/src/io/read_to_end.rs +++ b/futures-util/src/io/read_to_end.rs @@ -1,6 +1,6 @@ use futures_core::future::Future; use futures_core::task::{Context, Poll}; -use futures_io::AsyncRead; +use futures_core::io::AsyncRead; use std::io; use std::pin::Pin; use std::vec::Vec; diff --git a/futures-util/src/io/read_until.rs b/futures-util/src/io/read_until.rs index 01e1ad827d..1267f4a2ab 100644 --- a/futures-util/src/io/read_until.rs +++ b/futures-util/src/io/read_until.rs @@ -1,6 +1,6 @@ use futures_core::future::Future; use futures_core::task::{Context, Poll}; -use futures_io::AsyncBufRead; +use futures_core::io::AsyncBufRead; use std::io; use std::mem; use std::pin::Pin; diff --git a/futures-util/src/io/split.rs b/futures-util/src/io/split.rs index d12df867fc..f88487fb13 100644 --- a/futures-util/src/io/split.rs +++ b/futures-util/src/io/split.rs @@ -1,6 +1,6 @@ use crate::lock::BiLock; use futures_core::task::{Context, Poll}; -use futures_io::{AsyncRead, AsyncWrite, IoSlice, IoSliceMut}; +use futures_core::io::{AsyncRead, AsyncWrite, IoSlice, IoSliceMut}; use std::io; use std::pin::Pin; diff --git a/futures-util/src/io/write_all.rs b/futures-util/src/io/write_all.rs index afcbb4ae52..6904293bfa 100644 --- a/futures-util/src/io/write_all.rs +++ b/futures-util/src/io/write_all.rs @@ -1,6 +1,6 @@ use futures_core::future::Future; use futures_core::task::{Context, Poll}; -use futures_io::AsyncWrite; +use futures_core::io::AsyncWrite; use std::io; use std::mem; use std::pin::Pin; diff --git a/futures-util/src/try_stream/into_async_read.rs b/futures-util/src/try_stream/into_async_read.rs index 007df3d5de..e2e5ae02e0 100644 --- a/futures-util/src/try_stream/into_async_read.rs +++ b/futures-util/src/try_stream/into_async_read.rs @@ -2,7 +2,7 @@ use crate::try_stream::TryStreamExt; use core::pin::Pin; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; -use futures_io::{AsyncRead, AsyncBufRead}; +use futures_core::io::{AsyncRead, AsyncBufRead}; use std::cmp; use std::io::{Error, Result}; diff --git a/futures/Cargo.toml b/futures/Cargo.toml index 4af997d18a..3f8ce099cf 100644 --- a/futures/Cargo.toml +++ b/futures/Cargo.toml @@ -26,7 +26,6 @@ appveyor = { repository = "rust-lang-nursery/futures-rs" } futures-core-preview = { path = "../futures-core", version = "=0.3.0-alpha.16", default-features = false } futures-channel-preview = { path = "../futures-channel", version = "=0.3.0-alpha.16", default-features = false } futures-executor-preview = { path = "../futures-executor", version = "=0.3.0-alpha.16", default-features = false } -futures-io-preview = { path = "../futures-io", version = "=0.3.0-alpha.16", default-features = false } futures-sink-preview = { path = "../futures-sink", version = "=0.3.0-alpha.16", default-features = false } futures-util-preview = { path = "../futures-util", version = "=0.3.0-alpha.16", default-features = false } @@ -34,10 +33,11 @@ futures-util-preview = { path = "../futures-util", version = "=0.3.0-alpha.16", pin-utils = "0.1.0-alpha.4" futures-test-preview = { path = "../futures-test", version = "=0.3.0-alpha.16" } tokio = "0.1.11" +assert_matches = "1.3.0" [features] nightly = ["futures-core-preview/nightly", "futures-sink-preview/nightly", "futures-util-preview/nightly"] -std = ["alloc", "futures-core-preview/std", "futures-executor-preview/std", "futures-io-preview/std", "futures-sink-preview/std", "futures-util-preview/std"] +std = ["alloc", "futures-core-preview/std", "futures-executor-preview/std", "futures-sink-preview/std", "futures-util-preview/std"] async-await = ["futures-util-preview/async-await"] default = ["std"] compat = ["std", "futures-util-preview/compat"] diff --git a/futures/src/lib.rs b/futures/src/lib.rs index 2c85f743e8..6d39575954 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -60,7 +60,7 @@ compile_error!("The `never-type` feature requires the `nightly` feature as an ex #[doc(hidden)] pub use futures_util::sink::SinkExt; #[cfg(feature = "std")] -#[doc(hidden)] pub use futures_io::{AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead}; +#[doc(hidden)] pub use futures_core::{AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead}; #[cfg(feature = "std")] #[doc(hidden)] pub use futures_util::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt}; @@ -295,7 +295,7 @@ pub mod io { //! This module is only available when the `std` feature of this //! library is activated, and it is activated by default. - pub use futures_io::{ + pub use futures_core::io::{ AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead, Error, ErrorKind, Initializer, IoSlice, IoSliceMut, Result, SeekFrom, }; diff --git a/futures-io/tests/cursor.rs b/futures/tests/io_cursor.rs similarity index 100% rename from futures-io/tests/cursor.rs rename to futures/tests/io_cursor.rs