Skip to content

Commit 87bea01

Browse files
committed
refactor(lib): convert to futures 0.2.0-beta
1 parent 5db8531 commit 87bea01

33 files changed

+1346
-1327
lines changed

Cargo.toml

+8-8
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,17 @@ include = [
2222

2323
[dependencies]
2424
bytes = "0.4.4"
25-
futures = "0.1.17"
26-
futures-cpupool = "0.1.6"
27-
futures-timer = "0.1.0"
25+
futures = "0.2.0-beta"
26+
futures-timer = { git = "https://github.com/alexcrichton/futures-timer.git" }
2827
http = "0.1.5"
2928
httparse = "1.0"
3029
iovec = "0.1"
3130
log = "0.4"
3231
net2 = "0.2.32"
3332
time = "0.1"
34-
tokio = "0.1.3"
35-
tokio-executor = "0.1.0"
36-
tokio-service = "0.1"
37-
tokio-io = "0.1"
38-
want = "0.0.2"
33+
tokio = { git = "https://github.com/seanmonstar/tokio.git", branch = "futures2-use-after-free", features = ["unstable-futures"] }
34+
tokio-executor = { git = "https://github.com/seanmonstar/tokio.git", branch = "futures2-use-after-free", features = ["unstable-futures"] }
35+
want = { git = "https://github.com/srijs/want.git", branch = "futures-0.2" }
3936

4037
[dev-dependencies]
4138
num_cpus = "1.0"
@@ -45,3 +42,6 @@ url = "1.0"
4542

4643
[features]
4744
nightly = []
45+
46+
[replace]
47+
"futures:0.2.0-beta" = { git = "https://github.com/srijs/futures-rs.git", branch = "with-executor" }

benches/end_to_end.rs

+20-14
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ extern crate tokio;
88

99
use std::net::SocketAddr;
1010

11-
use futures::{Future, Stream};
11+
use futures::{FutureExt, StreamExt};
12+
use futures::executor::block_on;
1213
use tokio::runtime::Runtime;
1314
use tokio::net::TcpListener;
1415

@@ -22,19 +23,20 @@ fn get_one_at_a_time(b: &mut test::Bencher) {
2223
let addr = spawn_hello(&mut rt);
2324

2425
let client = hyper::Client::configure()
25-
.build_with_executor(&rt.handle(), rt.executor());
26+
.build(&rt.handle());
2627

2728
let url: hyper::Uri = format!("http://{}/get", addr).parse().unwrap();
2829

2930
b.bytes = 160 * 2 + PHRASE.len() as u64;
3031
b.iter(move || {
31-
client.get(url.clone())
32+
block_on(client.get(url.clone())
33+
.with_executor(rt.executor())
3234
.and_then(|res| {
3335
res.into_body().into_stream().for_each(|_chunk| {
3436
Ok(())
35-
})
37+
}).map(|_| ())
3638
})
37-
.wait().expect("client wait");
39+
).expect("client wait");
3840
});
3941
}
4042

@@ -44,7 +46,7 @@ fn post_one_at_a_time(b: &mut test::Bencher) {
4446
let addr = spawn_hello(&mut rt);
4547

4648
let client = hyper::Client::configure()
47-
.build_with_executor(&rt.handle(), rt.executor());
49+
.build(&rt.handle());
4850

4951
let url: hyper::Uri = format!("http://{}/post", addr).parse().unwrap();
5052

@@ -54,11 +56,14 @@ fn post_one_at_a_time(b: &mut test::Bencher) {
5456
let mut req = Request::new(post.into());
5557
*req.method_mut() = Method::POST;
5658
*req.uri_mut() = url.clone();
57-
client.request(req).and_then(|res| {
58-
res.into_body().into_stream().for_each(|_chunk| {
59-
Ok(())
59+
block_on(client.request(req)
60+
.with_executor(rt.executor())
61+
.and_then(|res| {
62+
res.into_body().into_stream().for_each(|_chunk| {
63+
Ok(())
64+
}).map(|_| ())
6065
})
61-
}).wait().expect("client wait");
66+
).expect("client wait");
6267

6368
});
6469
}
@@ -76,21 +81,22 @@ fn spawn_hello(rt: &mut Runtime) -> SocketAddr {
7681
let service = const_service(service_fn(|req: Request<Body>| {
7782
req.into_body()
7883
.into_stream()
79-
.concat2()
84+
.concat()
8085
.map(|_| {
8186
Response::new(Body::from(PHRASE))
8287
})
8388
}));
8489

8590
let srv = listener.incoming()
86-
.into_future()
91+
.next()
8792
.map_err(|(e, _inc)| panic!("accept error: {}", e))
8893
.and_then(move |(accepted, _inc)| {
8994
let socket = accepted.expect("accepted socket");
9095
http.serve_connection(socket, service.new_service().expect("new_service"))
9196
.map(|_| ())
9297
.map_err(|_| ())
93-
});
94-
rt.spawn(srv);
98+
})
99+
.map_err(|_| panic!("server error"));
100+
rt.spawn2(srv);
95101
return addr
96102
}

benches/server.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ use std::io::{Read, Write};
1111
use std::net::{TcpListener, TcpStream};
1212
use std::sync::mpsc;
1313

14-
use futures::{future, stream, Future, Stream};
15-
use futures::sync::oneshot;
14+
use futures::{future, stream, FutureExt, StreamExt};
15+
use futures::channel::oneshot;
1616

1717
use hyper::{Body, Request, Response};
1818
use hyper::server::Service;
@@ -31,7 +31,7 @@ macro_rules! bench_server {
3131
})).unwrap();
3232
let addr = srv.local_addr().unwrap();
3333
addr_tx.send(addr).unwrap();
34-
tokio::run(srv.run_until(until_rx.map_err(|_| ())).map_err(|e| panic!("server error: {}", e)));
34+
tokio::runtime::run2(srv.run_until(until_rx.map_err(|_| ())).map_err(|e| panic!("server error: {}", e)));
3535
});
3636

3737
addr_rx.recv().unwrap()

examples/client.rs

+9-6
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ extern crate pretty_env_logger;
88
use std::env;
99
use std::io::{self, Write};
1010

11-
use futures::{Future, Stream};
11+
use futures::{FutureExt, StreamExt};
1212
use futures::future::lazy;
1313

1414
use hyper::{Body, Client, Request};
@@ -30,7 +30,7 @@ fn main() {
3030
return;
3131
}
3232

33-
tokio::run(lazy(move || {
33+
tokio::runtime::run2(lazy(move |_| {
3434
let client = Client::default();
3535

3636
let mut req = Request::new(Body::empty());
@@ -43,10 +43,13 @@ fn main() {
4343
res.into_parts().1.into_stream().for_each(|chunk| {
4444
io::stdout().write_all(&chunk).map_err(From::from)
4545
})
46-
}).map(|_| {
47-
println!("\n\nDone.");
48-
}).map_err(|err| {
49-
eprintln!("Error {}", err);
46+
}).then(|result| {
47+
if let Some(err) = result.err() {
48+
eprintln!("Error {}", err);
49+
} else {
50+
println!("\n\nDone.");
51+
}
52+
Ok(())
5053
})
5154
}));
5255
}

examples/hello.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ extern crate futures;
44
extern crate pretty_env_logger;
55
extern crate tokio;
66

7-
use futures::Future;
7+
use futures::FutureExt;
88
use futures::future::lazy;
99

1010
use hyper::{Body, Response};
@@ -20,13 +20,13 @@ fn main() {
2020
Ok(Response::new(Body::from(PHRASE)))
2121
}));
2222

23-
tokio::run(lazy(move || {
23+
tokio::runtime::run2(lazy(move |_| {
2424
let server = Http::new()
2525
.sleep_on_errors(true)
2626
.bind(&addr, new_service)
2727
.unwrap();
2828

29-
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
30-
server.run().map_err(|err| eprintln!("Server error {}", err))
29+
println!("Listening on http://{}", server.local_addr().unwrap());
30+
server.run().map_err(|err| panic!("Server error {}", err))
3131
}));
3232
}

examples/multi_server.rs

+9-10
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@ extern crate futures;
44
extern crate pretty_env_logger;
55
extern crate tokio;
66

7-
use futures::{Future, Stream};
7+
use futures::{FutureExt, StreamExt};
88
use futures::future::{FutureResult, lazy};
9+
use futures::executor::spawn;
910

1011
use hyper::{Body, Method, Request, Response, StatusCode};
1112
use hyper::server::{Http, Service};
@@ -43,22 +44,20 @@ fn main() {
4344
let addr1 = "127.0.0.1:1337".parse().unwrap();
4445
let addr2 = "127.0.0.1:1338".parse().unwrap();
4546

46-
tokio::run(lazy(move || {
47+
tokio::runtime::run2(lazy(move |_| {
4748
let srv1 = Http::new().serve_addr(&addr1, || Ok(Srv(INDEX1))).unwrap();
4849
let srv2 = Http::new().serve_addr(&addr2, || Ok(Srv(INDEX2))).unwrap();
4950

5051
println!("Listening on http://{}", srv1.incoming_ref().local_addr());
5152
println!("Listening on http://{}", srv2.incoming_ref().local_addr());
5253

53-
tokio::spawn(srv1.for_each(move |conn| {
54-
tokio::spawn(conn.map(|_| ()).map_err(|err| println!("srv1 error: {:?}", err)));
55-
Ok(())
56-
}).map_err(|_| ()));
54+
spawn(srv1.map_err(|err| panic!("srv1 error: {:?}", err)).for_each(move |conn| {
55+
spawn(conn.map(|_| ()).map_err(|err| panic!("srv1 error: {:?}", err)))
56+
}).map(|_| ()));
5757

58-
tokio::spawn(srv2.for_each(move |conn| {
59-
tokio::spawn(conn.map(|_| ()).map_err(|err| println!("srv2 error: {:?}", err)));
60-
Ok(())
61-
}).map_err(|_| ()));
58+
spawn(srv2.map_err(|err| panic!("srv2 error: {:?}", err)).for_each(move |conn| {
59+
spawn(conn.map(|_| ()).map_err(|err| panic!("srv2 error: {:?}", err)))
60+
}).map(|_| ()));
6261

6362
Ok(())
6463
}));

examples/params.rs

+7-5
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ extern crate pretty_env_logger;
55
extern crate tokio;
66
extern crate url;
77

8-
use futures::{Future, Stream};
8+
use futures::{Future, FutureExt, StreamExt};
99
use futures::future::lazy;
1010

1111
use hyper::{Body, Method, Request, Response, StatusCode};
@@ -32,7 +32,7 @@ impl Service for ParamExample {
3232
Box::new(futures::future::ok(Response::new(INDEX.into())))
3333
},
3434
(&Method::POST, "/post") => {
35-
Box::new(req.into_parts().1.into_stream().concat2().map(|b| {
35+
Box::new(req.into_parts().1.into_stream().concat().map(|b| {
3636
// Parse the request body. form_urlencoded::parse
3737
// always succeeds, but in general parsing may
3838
// fail (for example, an invalid post of json), so
@@ -98,9 +98,11 @@ fn main() {
9898
pretty_env_logger::init();
9999
let addr = "127.0.0.1:1337".parse().unwrap();
100100

101-
tokio::run(lazy(move || {
101+
tokio::runtime::run2(lazy(move |_| {
102102
let server = Http::new().bind(&addr, || Ok(ParamExample)).unwrap();
103-
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
104-
server.run().map_err(|err| eprintln!("Server error {}", err))
103+
println!("Listening on http://{}", server.local_addr().unwrap());
104+
server.run().recover(|err| {
105+
eprintln!("Server error {}", err)
106+
})
105107
}));
106108
}

examples/send_file.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ extern crate hyper;
44
extern crate pretty_env_logger;
55
extern crate tokio;
66

7-
use futures::{Future/*, Sink*/};
7+
use futures::{Future, FutureExt};
88
use futures::future::lazy;
9-
use futures::sync::oneshot;
9+
use futures::channel::oneshot;
1010

1111
use hyper::{Body, /*Chunk,*/ Method, Request, Response, StatusCode};
1212
use hyper::error::Error;
@@ -141,9 +141,9 @@ fn main() {
141141
pretty_env_logger::init();
142142
let addr = "127.0.0.1:1337".parse().unwrap();
143143

144-
tokio::run(lazy(move || {
144+
tokio::runtime::run2(lazy(move |_| {
145145
let server = Http::new().bind(&addr, || Ok(ResponseExamples)).unwrap();
146-
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
147-
server.run().map_err(|err| eprintln!("Server error {}", err))
146+
println!("Listening on http://{}", server.local_addr().unwrap());
147+
server.run().map_err(|err| panic!("Server error {}", err))
148148
}));
149149
}

examples/server.rs

+6-4
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ extern crate hyper;
44
extern crate pretty_env_logger;
55
extern crate tokio;
66

7-
use futures::Future;
7+
use futures::FutureExt;
88
use futures::future::{FutureResult, lazy};
99

1010
use hyper::{Body, Method, Request, Response, StatusCode};
@@ -43,9 +43,11 @@ fn main() {
4343
pretty_env_logger::init();
4444
let addr = "127.0.0.1:1337".parse().unwrap();
4545

46-
tokio::run(lazy(move || {
46+
tokio::runtime::run2(lazy(move |_| {
4747
let server = Http::new().bind(&addr, || Ok(Echo)).unwrap();
48-
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
49-
server.run().map_err(|err| eprintln!("Server error {}", err))
48+
println!("Listening on http://{}", server.local_addr().unwrap());
49+
server.run().recover(|err| {
50+
eprintln!("Server error {}", err)
51+
})
5052
}));
5153
}

examples/web_api.rs

+9-6
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ extern crate hyper;
44
extern crate pretty_env_logger;
55
extern crate tokio;
66

7-
use futures::{Future, Stream};
7+
use futures::{Future, FutureExt, StreamExt};
8+
use futures::executor::spawn;
89
use futures::future::lazy;
910
use tokio::reactor::Handle;
1011

@@ -78,13 +79,15 @@ fn main() {
7879
pretty_env_logger::init();
7980
let addr = "127.0.0.1:1337".parse().unwrap();
8081

81-
tokio::run(lazy(move || {
82+
tokio::runtime::run2(lazy(move |_| {
8283
let handle = Handle::current();
8384
let serve = Http::new().serve_addr(&addr, move || Ok(ResponseExamples(handle.clone()))).unwrap();
84-
println!("Listening on http://{} with 1 thread.", serve.incoming_ref().local_addr());
85+
println!("Listening on http://{}", serve.incoming_ref().local_addr());
8586

86-
serve.map_err(|_| ()).for_each(move |conn| {
87-
tokio::spawn(conn.map(|_| ()).map_err(|err| println!("serve error: {:?}", err)))
88-
})
87+
serve.map_err(|err| panic!("server error {:?}", err)).for_each(move |conn| {
88+
spawn(conn.recover(|err| {
89+
println!("connection error: {:?}", err);
90+
}))
91+
}).map(|_| ())
8992
}));
9093
}

0 commit comments

Comments
 (0)