Skip to content

Commit 2b23aa7

Browse files
authored
util: add back public poll_read_buf() function (#3079)
This was accidentally removed in #3064.
1 parent 382ee6b commit 2b23aa7

File tree

6 files changed

+49
-9
lines changed

6 files changed

+49
-9
lines changed

tokio-util/CHANGELOG.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1+
### Added
2+
- io: `poll_read_buf` util fn (#2972).
3+
14
# 0.5.0 (October 30, 2020)
25

36
### Changed
47
- io: update `bytes` to 0.6 (#3071).
58

6-
### Added
7-
- io: `poll_read_buf` util fn (#2972).
8-
99
# 0.4.0 (October 15, 2020)
1010

1111
### Added

tokio-util/src/codec/framed_impl.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ where
150150
// got room for at least one byte to read to ensure that we don't
151151
// get a spurious 0 that looks like EOF
152152
state.buffer.reserve(1);
153-
let bytect = match poll_read_buf(cx, pinned.inner.as_mut(), &mut state.buffer)? {
153+
let bytect = match poll_read_buf(pinned.inner.as_mut(), cx, &mut state.buffer)? {
154154
Poll::Ready(ct) => ct,
155155
Poll::Pending => return Poll::Pending,
156156
};

tokio-util/src/io/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,4 @@ mod stream_reader;
1313
pub use self::read_buf::read_buf;
1414
pub use self::reader_stream::ReaderStream;
1515
pub use self::stream_reader::StreamReader;
16+
pub use crate::util::poll_read_buf;

tokio-util/src/io/read_buf.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ where
5959

6060
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
6161
let this = &mut *self;
62-
crate::util::poll_read_buf(cx, Pin::new(this.0), this.1)
62+
crate::util::poll_read_buf(Pin::new(this.0), cx, this.1)
6363
}
6464
}
6565
}

tokio-util/src/io/reader_stream.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ impl<R: AsyncRead> Stream for ReaderStream<R> {
8383
this.buf.reserve(CAPACITY);
8484
}
8585

86-
match poll_read_buf(cx, reader, &mut this.buf) {
86+
match poll_read_buf(reader, cx, &mut this.buf) {
8787
Poll::Pending => Poll::Pending,
8888
Poll::Ready(Err(err)) => {
8989
self.project().reader.set(None);

tokio-util/src/lib.rs

+42-3
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,49 @@ mod util {
6969
use std::pin::Pin;
7070
use std::task::{Context, Poll};
7171

72-
pub(crate) fn poll_read_buf<T: AsyncRead>(
73-
cx: &mut Context<'_>,
72+
/// Try to read data from an `AsyncRead` into an implementer of the [`Buf`] trait.
73+
///
74+
/// [`Buf`]: bytes::Buf
75+
///
76+
/// # Example
77+
///
78+
/// ```
79+
/// use bytes::{Bytes, BytesMut};
80+
/// use tokio::stream;
81+
/// use tokio::io::Result;
82+
/// use tokio_util::io::{StreamReader, poll_read_buf};
83+
/// use futures::future::poll_fn;
84+
/// use std::pin::Pin;
85+
/// # #[tokio::main]
86+
/// # async fn main() -> std::io::Result<()> {
87+
///
88+
/// // Create a reader from an iterator. This particular reader will always be
89+
/// // ready.
90+
/// let mut read = StreamReader::new(stream::iter(vec![Result::Ok(Bytes::from_static(&[0, 1, 2, 3]))]));
91+
///
92+
/// let mut buf = BytesMut::new();
93+
/// let mut reads = 0;
94+
///
95+
/// loop {
96+
/// reads += 1;
97+
/// let n = poll_fn(|cx| poll_read_buf(Pin::new(&mut read), cx, &mut buf)).await?;
98+
///
99+
/// if n == 0 {
100+
/// break;
101+
/// }
102+
/// }
103+
///
104+
/// // one or more reads might be necessary.
105+
/// assert!(reads >= 1);
106+
/// assert_eq!(&buf[..], &[0, 1, 2, 3]);
107+
/// # Ok(())
108+
/// # }
109+
/// ```
110+
#[cfg_attr(not(feature = "io"), allow(unreachable_pub))]
111+
pub fn poll_read_buf<T: AsyncRead, B: BufMut>(
74112
io: Pin<&mut T>,
75-
buf: &mut impl BufMut,
113+
cx: &mut Context<'_>,
114+
buf: &mut B,
76115
) -> Poll<io::Result<usize>> {
77116
if !buf.has_remaining_mut() {
78117
return Poll::Ready(Ok(0));

0 commit comments

Comments
 (0)