Skip to content

Commit efb8415

Browse files
committed
Merge branch 'master' into fs-stream-find-map
2 parents 45bd0ef + 6d1e71f commit efb8415

13 files changed

+266
-57
lines changed

.travis.yml

+20-11
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,16 @@
11
language: rust
22

3-
env: RUSTFLAGS="-D warnings"
3+
env:
4+
- RUSTFLAGS="-D warnings"
5+
6+
# Cache the whole `~/.cargo` directory to keep `~/cargo/.crates.toml`.
7+
cache:
8+
directories:
9+
- /home/travis/.cargo
10+
11+
# Don't cache the cargo registry because it's too big.
12+
before_cache:
13+
- rm -rf /home/travis/.cargo/registry
414

515
matrix:
616
fast_finish: true
@@ -35,16 +45,15 @@ matrix:
3545
script:
3646
- cargo doc --features docs
3747

38-
# TODO(yoshuawuyts): re-enable mdbook
39-
# - name: book
40-
# rust: nightly
41-
# os: linux
42-
# before_script:
43-
# - test -x $HOME/.cargo/bin/mdbook || ./ci/install-mdbook.sh
44-
# - cargo build # to find 'extern crate async_std' by `mdbook test`
45-
# script:
46-
# - mdbook build docs
47-
# - mdbook test -L ./target/debug/deps docs
48+
- name: book
49+
rust: nightly
50+
os: linux
51+
before_script:
52+
- test -x $HOME/.cargo/bin/mdbook || ./ci/install-mdbook.sh
53+
- cargo build # to find 'extern crate async_std' by `mdbook test`
54+
script:
55+
- mdbook build docs
56+
- mdbook test -L ./target/debug/deps docs
4857

4958
script:
5059
- cargo check --features unstable --all --benches --bins --examples --tests

Cargo.toml

+4
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ femme = "1.2.0"
4444
surf = "1.0.2"
4545
tempdir = "0.3.7"
4646

47+
# These are used by the book for examples
48+
futures-channel-preview = "0.3.0-alpha.18"
49+
futures-util-preview = "0.3.0-alpha.18"
50+
4751
[dev-dependencies.futures-preview]
4852
version = "0.3.0-alpha.18"
4953
features = ["std", "nightly", "async-await"]

docs/src/tutorial/all_together.md

+4-5
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,16 @@ At this point, we only need to start the broker to get a fully-functioning (in t
44

55
```rust,edition2018
66
# extern crate async_std;
7-
# extern crate futures;
7+
# extern crate futures_channel;
8+
# extern crate futures_util;
89
use async_std::{
910
io::{self, BufReader},
1011
net::{TcpListener, TcpStream, ToSocketAddrs},
1112
prelude::*,
1213
task,
1314
};
14-
use futures::{
15-
channel::mpsc,
16-
SinkExt,
17-
};
15+
use futures_channel::mpsc;
16+
use futures_util::SinkExt;
1817
use std::{
1918
collections::hash_map::{HashMap, Entry},
2019
sync::Arc,

docs/src/tutorial/clean_shutdown.md

+8-10
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,16 @@ Let's add waiting to the server:
2222

2323
```rust,edition2018
2424
# extern crate async_std;
25-
# extern crate futures;
25+
# extern crate futures_channel;
26+
# extern crate futures_util;
2627
# use async_std::{
2728
# io::{self, BufReader},
2829
# net::{TcpListener, TcpStream, ToSocketAddrs},
2930
# prelude::*,
3031
# task,
3132
# };
32-
# use futures::{
33-
# channel::mpsc,
34-
# SinkExt,
35-
# };
33+
# use futures_channel::mpsc;
34+
# use futures_util::SinkExt;
3635
# use std::{
3736
# collections::hash_map::{HashMap, Entry},
3837
# sync::Arc,
@@ -156,17 +155,16 @@ And to the broker:
156155

157156
```rust,edition2018
158157
# extern crate async_std;
159-
# extern crate futures;
158+
# extern crate futures_channel;
159+
# extern crate futures_util;
160160
# use async_std::{
161161
# io::{self, BufReader},
162162
# net::{TcpListener, TcpStream, ToSocketAddrs},
163163
# prelude::*,
164164
# task,
165165
# };
166-
# use futures::{
167-
# channel::mpsc,
168-
# SinkExt,
169-
# };
166+
# use futures_channel::mpsc;
167+
# use futures_util::SinkExt;
170168
# use std::{
171169
# collections::hash_map::{HashMap, Entry},
172170
# sync::Arc,

docs/src/tutorial/connecting_readers_and_writers.md

+4-3
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,16 @@ The order of events "Bob sends message to Alice" and "Alice joins" is determined
1212

1313
```rust,edition2018
1414
# extern crate async_std;
15-
# extern crate futures;
15+
# extern crate futures_channel;
16+
# extern crate futures_util;
1617
# use async_std::{
1718
# io::{Write},
1819
# net::TcpStream,
1920
# prelude::{Future, Stream},
2021
# task,
2122
# };
22-
# use futures::channel::mpsc;
23-
# use futures::sink::SinkExt;
23+
# use futures_channel::mpsc;
24+
# use futures_util::sink::SinkExt;
2425
# use std::sync::Arc;
2526
#
2627
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;

docs/src/tutorial/handling_disconnection.md

+13-6
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@ First, let's add a shutdown channel to the `client`:
1919

2020
```rust,edition2018
2121
# extern crate async_std;
22-
# extern crate futures;
22+
# extern crate futures_channel;
23+
# extern crate futures_util;
2324
# use async_std::net::TcpStream;
24-
# use futures::{channel::mpsc, SinkExt};
25+
# use futures_channel::mpsc;
26+
# use futures_util::SinkExt;
2527
# use std::sync::Arc;
2628
#
2729
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
@@ -68,9 +70,11 @@ We use the `select` macro for this purpose:
6870

6971
```rust,edition2018
7072
# extern crate async_std;
71-
# extern crate futures;
73+
# extern crate futures_channel;
74+
# extern crate futures_util;
7275
# use async_std::{io::Write, net::TcpStream};
73-
use futures::{channel::mpsc, select, FutureExt, StreamExt};
76+
use futures_channel::mpsc;
77+
use futures_util::{select, FutureExt, StreamExt};
7478
# use std::sync::Arc;
7579
7680
# type Receiver<T> = mpsc::UnboundedReceiver<T>;
@@ -118,15 +122,18 @@ The final code looks like this:
118122

119123
```rust,edition2018
120124
# extern crate async_std;
121-
# extern crate futures;
125+
# extern crate futures_channel;
126+
# extern crate futures_util;
122127
use async_std::{
123128
io::{BufReader, BufRead, Write},
124129
net::{TcpListener, TcpStream, ToSocketAddrs},
125130
task,
126131
};
127-
use futures::{channel::mpsc, future::Future, select, FutureExt, SinkExt, StreamExt};
132+
use futures_channel::mpsc;
133+
use futures_util::{select, FutureExt, SinkExt, StreamExt};
128134
use std::{
129135
collections::hash_map::{Entry, HashMap},
136+
future::Future,
130137
sync::Arc,
131138
};
132139

docs/src/tutorial/implementing_a_client.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@ With async, we can just use the `select!` macro.
1616

1717
```rust,edition2018
1818
# extern crate async_std;
19-
# extern crate futures;
19+
# extern crate futures_util;
2020
use async_std::{
2121
io::{stdin, BufRead, BufReader, Write},
2222
net::{TcpStream, ToSocketAddrs},
2323
task,
2424
};
25-
use futures::{select, FutureExt, StreamExt};
25+
use futures_util::{select, FutureExt, StreamExt};
2626
2727
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
2828

docs/src/tutorial/sending_messages.md

+4-3
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,15 @@ if Alice and Charley send two messages to Bob at the same time, Bob will see the
1313

1414
```rust,edition2018
1515
# extern crate async_std;
16-
# extern crate futures;
16+
# extern crate futures_channel;
17+
# extern crate futures_util;
1718
# use async_std::{
1819
# io::Write,
1920
# net::TcpStream,
2021
# prelude::Stream,
2122
# };
22-
use futures::channel::mpsc; // 1
23-
use futures::sink::SinkExt;
23+
use futures_channel::mpsc; // 1
24+
use futures_util::sink::SinkExt;
2425
use std::sync::Arc;
2526
2627
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;

src/io/write/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ pub trait Write {
127127
/// #
128128
/// # Ok(()) }) }
129129
/// ```
130+
///
131+
/// [`write`]: #tymethod.write
130132
fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> ret!('a, WriteAllFuture, io::Result<()>)
131133
where
132134
Self: Unpin,

src/stream/stream/filter_map.rs

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
use std::marker::PhantomData;
2+
use std::pin::Pin;
3+
use std::task::{Context, Poll};
4+
5+
/// A stream that both filters and maps.
6+
#[derive(Clone, Debug)]
7+
pub struct FilterMap<S, F, T, B> {
8+
stream: S,
9+
f: F,
10+
__from: PhantomData<T>,
11+
__to: PhantomData<B>,
12+
}
13+
14+
impl<S, F, T, B> FilterMap<S, F, T, B> {
15+
pin_utils::unsafe_pinned!(stream: S);
16+
pin_utils::unsafe_unpinned!(f: F);
17+
18+
pub(crate) fn new(stream: S, f: F) -> Self {
19+
FilterMap {
20+
stream,
21+
f,
22+
__from: PhantomData,
23+
__to: PhantomData,
24+
}
25+
}
26+
}
27+
28+
impl<S, F, B> futures_core::stream::Stream for FilterMap<S, F, S::Item, B>
29+
where
30+
S: futures_core::stream::Stream,
31+
F: FnMut(S::Item) -> Option<B>,
32+
{
33+
type Item = B;
34+
35+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
36+
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
37+
match next {
38+
Some(v) => match (self.as_mut().f())(v) {
39+
Some(b) => Poll::Ready(Some(b)),
40+
None => {
41+
cx.waker().wake_by_ref();
42+
Poll::Pending
43+
}
44+
},
45+
None => Poll::Ready(None),
46+
}
47+
}
48+
}

src/stream/stream/min_by.rs

+15-15
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,20 @@ use std::cmp::Ordering;
22
use std::pin::Pin;
33

44
use crate::future::Future;
5-
use crate::stream::Stream;
65
use crate::task::{Context, Poll};
76

8-
/// A future that yields the minimum item in a stream by a given comparison function.
9-
#[derive(Clone, Debug)]
10-
pub struct MinByFuture<S: Stream, F> {
7+
#[allow(missing_debug_implementations)]
8+
pub struct MinByFuture<S, F, T> {
119
stream: S,
1210
compare: F,
13-
min: Option<S::Item>,
11+
min: Option<T>,
1412
}
1513

16-
impl<S: Stream + Unpin, F> Unpin for MinByFuture<S, F> {}
14+
impl<S, F, T> MinByFuture<S, F, T> {
15+
pin_utils::unsafe_pinned!(stream: S);
16+
pin_utils::unsafe_unpinned!(compare: F);
17+
pin_utils::unsafe_unpinned!(min: Option<T>);
1718

18-
impl<S: Stream + Unpin, F> MinByFuture<S, F> {
1919
pub(super) fn new(stream: S, compare: F) -> Self {
2020
MinByFuture {
2121
stream,
@@ -25,25 +25,25 @@ impl<S: Stream + Unpin, F> MinByFuture<S, F> {
2525
}
2626
}
2727

28-
impl<S, F> Future for MinByFuture<S, F>
28+
impl<S, F> Future for MinByFuture<S, F, S::Item>
2929
where
30-
S: futures_core::stream::Stream + Unpin,
30+
S: futures_core::stream::Stream + Unpin + Sized,
3131
S::Item: Copy,
3232
F: FnMut(&S::Item, &S::Item) -> Ordering,
3333
{
3434
type Output = Option<S::Item>;
3535

3636
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
37-
let next = futures_core::ready!(Pin::new(&mut self.stream).poll_next(cx));
37+
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
3838

3939
match next {
4040
Some(new) => {
4141
cx.waker().wake_by_ref();
42-
match self.as_mut().min.take() {
43-
None => self.as_mut().min = Some(new),
44-
Some(old) => match (&mut self.as_mut().compare)(&new, &old) {
45-
Ordering::Less => self.as_mut().min = Some(new),
46-
_ => self.as_mut().min = Some(old),
42+
match self.as_mut().min().take() {
43+
None => *self.as_mut().min() = Some(new),
44+
Some(old) => match (&mut self.as_mut().compare())(&new, &old) {
45+
Ordering::Less => *self.as_mut().min() = Some(new),
46+
_ => *self.as_mut().min() = Some(old),
4747
},
4848
}
4949
Poll::Pending

0 commit comments

Comments
 (0)