Skip to content

Commit 4480042

Browse files
authored
Add Peekable::{peek_mut, poll_peek_mut} (#2488)
1 parent 0e3e48c commit 4480042

File tree

5 files changed

+131
-5
lines changed

5 files changed

+131
-5
lines changed

futures-util/src/stream/mod.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream};
1919
mod stream;
2020
pub use self::stream::{
2121
Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach,
22-
Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, Peekable, Scan, SelectNextSome, Skip,
23-
SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, TryFold, TryForEach,
24-
Unzip, Zip,
22+
Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan, SelectNextSome,
23+
Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, TryFold,
24+
TryForEach, Unzip, Zip,
2525
};
2626

2727
#[cfg(feature = "std")]

futures-util/src/stream/stream/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ pub use self::select_next_some::SelectNextSome;
131131

132132
mod peek;
133133
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
134-
pub use self::peek::{NextIf, NextIfEq, Peek, Peekable};
134+
pub use self::peek::{NextIf, NextIfEq, Peek, PeekMut, Peekable};
135135

136136
mod skip;
137137
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411

futures-util/src/stream/stream/peek.rs

+91-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ impl<St: Stream> Peekable<St> {
3333

3434
delegate_access_inner!(stream, St, (.));
3535

36-
/// Produces a `Peek` future which retrieves a reference to the next item
36+
/// Produces a future which retrieves a reference to the next item
3737
/// in the stream, or `None` if the underlying stream terminates.
3838
pub fn peek(self: Pin<&mut Self>) -> Peek<'_, St> {
3939
Peek { inner: Some(self) }
@@ -57,6 +57,54 @@ impl<St: Stream> Peekable<St> {
5757
})
5858
}
5959

60+
/// Produces a future which retrieves a mutable reference to the next item
61+
/// in the stream, or `None` if the underlying stream terminates.
62+
///
63+
/// # Examples
64+
///
65+
/// ```
66+
/// # futures::executor::block_on(async {
67+
/// use futures::stream::{self, StreamExt};
68+
/// use futures::pin_mut;
69+
///
70+
/// let stream = stream::iter(vec![1, 2, 3]).peekable();
71+
/// pin_mut!(stream);
72+
///
73+
/// assert_eq!(stream.as_mut().peek_mut().await, Some(&mut 1));
74+
/// assert_eq!(stream.as_mut().next().await, Some(1));
75+
///
76+
/// // Peek into the stream and modify the value which will be returned next
77+
/// if let Some(p) = stream.as_mut().peek_mut().await {
78+
/// if *p == 2 {
79+
/// *p = 5;
80+
/// }
81+
/// }
82+
///
83+
/// assert_eq!(stream.collect::<Vec<_>>().await, vec![5, 3]);
84+
/// # });
85+
/// ```
86+
pub fn peek_mut(self: Pin<&mut Self>) -> PeekMut<'_, St> {
87+
PeekMut { inner: Some(self) }
88+
}
89+
90+
/// Peek retrieves a mutable reference to the next item in the stream.
91+
pub fn poll_peek_mut(
92+
self: Pin<&mut Self>,
93+
cx: &mut Context<'_>,
94+
) -> Poll<Option<&mut St::Item>> {
95+
let mut this = self.project();
96+
97+
Poll::Ready(loop {
98+
if this.peeked.is_some() {
99+
break this.peeked.as_mut();
100+
} else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
101+
*this.peeked = Some(item);
102+
} else {
103+
break None;
104+
}
105+
})
106+
}
107+
60108
/// Creates a future which will consume and return the next value of this
61109
/// stream if a condition is true.
62110
///
@@ -220,6 +268,48 @@ where
220268
}
221269
}
222270

271+
pin_project! {
272+
/// Future for the [`Peekable::peek_mut`](self::Peekable::peek_mut) method.
273+
#[must_use = "futures do nothing unless polled"]
274+
pub struct PeekMut<'a, St: Stream> {
275+
inner: Option<Pin<&'a mut Peekable<St>>>,
276+
}
277+
}
278+
279+
impl<St> fmt::Debug for PeekMut<'_, St>
280+
where
281+
St: Stream + fmt::Debug,
282+
St::Item: fmt::Debug,
283+
{
284+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
285+
f.debug_struct("PeekMut").field("inner", &self.inner).finish()
286+
}
287+
}
288+
289+
impl<St: Stream> FusedFuture for PeekMut<'_, St> {
290+
fn is_terminated(&self) -> bool {
291+
self.inner.is_none()
292+
}
293+
}
294+
295+
impl<'a, St> Future for PeekMut<'a, St>
296+
where
297+
St: Stream,
298+
{
299+
type Output = Option<&'a mut St::Item>;
300+
301+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
302+
let inner = self.project().inner;
303+
if let Some(peekable) = inner {
304+
ready!(peekable.as_mut().poll_peek_mut(cx));
305+
306+
inner.take().unwrap().poll_peek_mut(cx)
307+
} else {
308+
panic!("PeekMut polled after completion")
309+
}
310+
}
311+
}
312+
223313
pin_project! {
224314
/// Future for the [`Peekable::next_if`](self::Peekable::next_if) method.
225315
#[must_use = "futures do nothing unless polled"]

futures/tests/auto_traits.rs

+22
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,13 @@ pub mod future {
470470
assert_not_impl!(PollFn<*const ()>: Sync);
471471
assert_impl!(PollFn<PhantomPinned>: Unpin);
472472

473+
assert_impl!(PollImmediate<SendStream>: Send);
474+
assert_not_impl!(PollImmediate<LocalStream<()>>: Send);
475+
assert_impl!(PollImmediate<SyncStream>: Sync);
476+
assert_not_impl!(PollImmediate<LocalStream<()>>: Sync);
477+
assert_impl!(PollImmediate<UnpinStream>: Unpin);
478+
assert_not_impl!(PollImmediate<PinnedStream>: Unpin);
479+
473480
assert_impl!(Ready<()>: Send);
474481
assert_not_impl!(Ready<*const ()>: Send);
475482
assert_impl!(Ready<()>: Sync);
@@ -1430,6 +1437,14 @@ pub mod stream {
14301437
assert_not_impl!(Peek<'_, LocalStream<()>>: Sync);
14311438
assert_impl!(Peek<'_, PinnedStream>: Unpin);
14321439

1440+
assert_impl!(PeekMut<'_, SendStream<()>>: Send);
1441+
assert_not_impl!(PeekMut<'_, SendStream>: Send);
1442+
assert_not_impl!(PeekMut<'_, LocalStream<()>>: Send);
1443+
assert_impl!(PeekMut<'_, SyncStream<()>>: Sync);
1444+
assert_not_impl!(PeekMut<'_, SyncStream>: Sync);
1445+
assert_not_impl!(PeekMut<'_, LocalStream<()>>: Sync);
1446+
assert_impl!(PeekMut<'_, PinnedStream>: Unpin);
1447+
14331448
assert_impl!(Peekable<SendStream<()>>: Send);
14341449
assert_not_impl!(Peekable<SendStream>: Send);
14351450
assert_not_impl!(Peekable<LocalStream>: Send);
@@ -1451,6 +1466,13 @@ pub mod stream {
14511466
assert_not_impl!(PollFn<*const ()>: Sync);
14521467
assert_impl!(PollFn<PhantomPinned>: Unpin);
14531468

1469+
assert_impl!(PollImmediate<SendStream>: Send);
1470+
assert_not_impl!(PollImmediate<LocalStream<()>>: Send);
1471+
assert_impl!(PollImmediate<SyncStream>: Sync);
1472+
assert_not_impl!(PollImmediate<LocalStream<()>>: Sync);
1473+
assert_impl!(PollImmediate<UnpinStream>: Unpin);
1474+
assert_not_impl!(PollImmediate<PinnedStream>: Unpin);
1475+
14541476
assert_impl!(ReadyChunks<SendStream<()>>: Send);
14551477
assert_not_impl!(ReadyChunks<SendStream>: Send);
14561478
assert_not_impl!(ReadyChunks<LocalStream>: Send);

futures/tests/stream_peekable.rs

+14
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,20 @@ fn peekable() {
1212
});
1313
}
1414

15+
#[test]
16+
fn peekable_mut() {
17+
block_on(async {
18+
let s = stream::iter(vec![1u8, 2, 3]).peekable();
19+
pin_mut!(s);
20+
if let Some(p) = s.as_mut().peek_mut().await {
21+
if *p == 1 {
22+
*p = 5;
23+
}
24+
}
25+
assert_eq!(s.collect::<Vec<_>>().await, vec![5, 2, 3]);
26+
});
27+
}
28+
1529
#[test]
1630
fn peekable_next_if_eq() {
1731
block_on(async {

0 commit comments

Comments
 (0)