Skip to content

Commit 78f13ee

Browse files
Sam Reissrijs
Sam Reis
authored andcommitted
refactor(lib): convert to use tokio 0.1
1 parent 4857674 commit 78f13ee

20 files changed

+574
-449
lines changed

Cargo.toml

+4-1
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,19 @@ base64 = "0.9"
2525
bytes = "0.4.4"
2626
futures = "0.1.17"
2727
futures-cpupool = "0.1.6"
28+
futures-timer = "0.1.0"
2829
http = "0.1.5"
2930
httparse = "1.0"
3031
iovec = "0.1"
3132
language-tags = "0.2"
3233
log = "0.4"
3334
mime = "0.3.2"
35+
net2 = "0.2.32"
3436
percent-encoding = "1.0"
3537
relay = "0.1"
3638
time = "0.1"
37-
tokio-core = "0.1.11"
39+
tokio = "0.1.3"
40+
tokio-executor = "0.1.0"
3841
tokio-service = "0.1"
3942
tokio-io = "0.1"
4043
unicase = "2.0"

examples/client.rs

+19-18
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
//#![deny(warnings)]
22
extern crate futures;
33
extern crate hyper;
4-
extern crate tokio_core;
4+
extern crate tokio;
55

66
extern crate pretty_env_logger;
77

88
use std::env;
99
use std::io::{self, Write};
1010

11-
use futures::Future;
12-
use futures::stream::Stream;
11+
use futures::{Future, Stream};
12+
use futures::future::lazy;
1313

1414
use hyper::{Body, Client, Request};
1515

@@ -30,22 +30,23 @@ fn main() {
3030
return;
3131
}
3232

33-
let mut core = tokio_core::reactor::Core::new().unwrap();
34-
let handle = core.handle();
35-
let client = Client::new(&handle);
33+
tokio::run(lazy(move || {
34+
let client = Client::default();
3635

37-
let mut req = Request::new(Body::empty());
38-
*req.uri_mut() = url;
39-
let work = client.request(req).and_then(|res| {
40-
println!("Response: {}", res.status());
41-
println!("Headers: {:#?}", res.headers());
36+
let mut req = Request::new(Body::empty());
37+
*req.uri_mut() = url;
4238

43-
res.into_parts().1.into_stream().for_each(|chunk| {
44-
io::stdout().write_all(&chunk).map_err(From::from)
45-
})
46-
}).map(|_| {
47-
println!("\n\nDone.");
48-
});
39+
client.request(req).and_then(|res| {
40+
println!("Response: {}", res.status());
41+
println!("Headers: {:#?}", res.headers());
4942

50-
core.run(work).unwrap();
43+
res.into_parts().1.into_stream().for_each(|chunk| {
44+
io::stdout().write_all(&chunk).map_err(From::from)
45+
})
46+
}).map(|_| {
47+
println!("\n\nDone.");
48+
}).map_err(|err| {
49+
eprintln!("Error {}", err);
50+
})
51+
}));
5152
}

examples/hello.rs

+13-6
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22
extern crate hyper;
33
extern crate futures;
44
extern crate pretty_env_logger;
5+
extern crate tokio;
6+
7+
use futures::Future;
8+
use futures::future::lazy;
59

610
use hyper::{Body, Response};
711
use hyper::server::{Http, const_service, service_fn};
@@ -16,10 +20,13 @@ fn main() {
1620
Ok(Response::new(Body::from(PHRASE)))
1721
}));
1822

19-
let server = Http::new()
20-
.sleep_on_errors(true)
21-
.bind(&addr, new_service)
22-
.unwrap();
23-
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
24-
server.run().unwrap();
23+
tokio::run(lazy(move || {
24+
let server = Http::new()
25+
.sleep_on_errors(true)
26+
.bind(&addr, new_service)
27+
.unwrap();
28+
29+
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
30+
server.run().map_err(|err| eprintln!("Server error {}", err))
31+
}));
2532
}

examples/multi_server.rs

+16-20
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
#![deny(warnings)]
22
extern crate hyper;
33
extern crate futures;
4-
extern crate tokio_core;
54
extern crate pretty_env_logger;
5+
extern crate tokio;
66

77
use futures::{Future, Stream};
8-
use futures::future::FutureResult;
8+
use futures::future::{FutureResult, lazy};
99

1010
use hyper::{Body, Method, Request, Response, StatusCode};
11-
use tokio_core::reactor::Core;
1211
use hyper::server::{Http, Service};
1312

1413
static INDEX1: &'static [u8] = b"The 1st service!";
@@ -44,26 +43,23 @@ fn main() {
4443
let addr1 = "127.0.0.1:1337".parse().unwrap();
4544
let addr2 = "127.0.0.1:1338".parse().unwrap();
4645

47-
let mut core = Core::new().unwrap();
48-
let handle = core.handle();
46+
tokio::run(lazy(move || {
47+
let srv1 = Http::new().serve_addr(&addr1, || Ok(Srv(INDEX1))).unwrap();
48+
let srv2 = Http::new().serve_addr(&addr2, || Ok(Srv(INDEX2))).unwrap();
4949

50-
let srv1 = Http::new().serve_addr_handle(&addr1, &handle, || Ok(Srv(INDEX1))).unwrap();
51-
let srv2 = Http::new().serve_addr_handle(&addr2, &handle, || Ok(Srv(INDEX2))).unwrap();
50+
println!("Listening on http://{}", srv1.incoming_ref().local_addr());
51+
println!("Listening on http://{}", srv2.incoming_ref().local_addr());
5252

53-
println!("Listening on http://{}", srv1.incoming_ref().local_addr());
54-
println!("Listening on http://{}", srv2.incoming_ref().local_addr());
53+
tokio::spawn(srv1.for_each(move |conn| {
54+
tokio::spawn(conn.map(|_| ()).map_err(|err| println!("srv1 error: {:?}", err)));
55+
Ok(())
56+
}).map_err(|_| ()));
5557

56-
let handle1 = handle.clone();
57-
handle.spawn(srv1.for_each(move |conn| {
58-
handle1.spawn(conn.map(|_| ()).map_err(|err| println!("srv1 error: {:?}", err)));
59-
Ok(())
60-
}).map_err(|_| ()));
58+
tokio::spawn(srv2.for_each(move |conn| {
59+
tokio::spawn(conn.map(|_| ()).map_err(|err| println!("srv2 error: {:?}", err)));
60+
Ok(())
61+
}).map_err(|_| ()));
6162

62-
let handle2 = handle.clone();
63-
handle.spawn(srv2.for_each(move |conn| {
64-
handle2.spawn(conn.map(|_| ()).map_err(|err| println!("srv2 error: {:?}", err)));
6563
Ok(())
66-
}).map_err(|_| ()));
67-
68-
core.run(futures::future::empty::<(), ()>()).unwrap();
64+
}));
6965
}

examples/params.rs

+8-4
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22
extern crate futures;
33
extern crate hyper;
44
extern crate pretty_env_logger;
5+
extern crate tokio;
56
extern crate url;
67

78
use futures::{Future, Stream};
9+
use futures::future::lazy;
810

911
use hyper::{Body, Method, Request, Response, StatusCode};
1012
use hyper::server::{Http, Service};
@@ -22,7 +24,7 @@ impl Service for ParamExample {
2224
type Request = Request<Body>;
2325
type Response = Response<Body>;
2426
type Error = hyper::Error;
25-
type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;
27+
type Future = Box<Future<Item = Self::Response, Error = Self::Error> + Send>;
2628

2729
fn call(&self, req: Request<Body>) -> Self::Future {
2830
match (req.method(), req.uri().path()) {
@@ -96,7 +98,9 @@ fn main() {
9698
pretty_env_logger::init();
9799
let addr = "127.0.0.1:1337".parse().unwrap();
98100

99-
let server = Http::new().bind(&addr, || Ok(ParamExample)).unwrap();
100-
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
101-
server.run().unwrap();
101+
tokio::run(lazy(move || {
102+
let server = Http::new().bind(&addr, || Ok(ParamExample)).unwrap();
103+
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
104+
server.run().map_err(|err| eprintln!("Server error {}", err))
105+
}));
102106
}

examples/send_file.rs

+9-5
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22
extern crate futures;
33
extern crate hyper;
44
extern crate pretty_env_logger;
5+
extern crate tokio;
56

67
use futures::{Future/*, Sink*/};
8+
use futures::future::lazy;
79
use futures::sync::oneshot;
810

911
use hyper::{Body, /*Chunk,*/ Method, Request, Response, StatusCode};
@@ -17,7 +19,7 @@ use std::thread;
1719
static NOTFOUND: &[u8] = b"Not Found";
1820
static INDEX: &str = "examples/send_file_index.html";
1921

20-
fn simple_file_send(f: &str) -> Box<Future<Item = Response<Body>, Error = hyper::Error>> {
22+
fn simple_file_send(f: &str) -> Box<Future<Item = Response<Body>, Error = hyper::Error> + Send> {
2123
// Serve a file by reading it entirely into memory. As a result
2224
// this is limited to serving small files, but it is somewhat
2325
// simpler with a little less overhead.
@@ -63,7 +65,7 @@ impl Service for ResponseExamples {
6365
type Request = Request<Body>;
6466
type Response = Response<Body>;
6567
type Error = hyper::Error;
66-
type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;
68+
type Future = Box<Future<Item = Self::Response, Error = Self::Error> + Send>;
6769

6870
fn call(&self, req: Request<Body>) -> Self::Future {
6971
match (req.method(), req.uri().path()) {
@@ -139,7 +141,9 @@ fn main() {
139141
pretty_env_logger::init();
140142
let addr = "127.0.0.1:1337".parse().unwrap();
141143

142-
let server = Http::new().bind(&addr, || Ok(ResponseExamples)).unwrap();
143-
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
144-
server.run().unwrap();
144+
tokio::run(lazy(move || {
145+
let server = Http::new().bind(&addr, || Ok(ResponseExamples)).unwrap();
146+
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
147+
server.run().map_err(|err| eprintln!("Server error {}", err))
148+
}));
145149
}

examples/server.rs

+8-4
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22
extern crate futures;
33
extern crate hyper;
44
extern crate pretty_env_logger;
5+
extern crate tokio;
56

6-
use futures::future::FutureResult;
7+
use futures::Future;
8+
use futures::future::{FutureResult, lazy};
79

810
use hyper::{Body, Method, Request, Response, StatusCode};
911
use hyper::server::{Http, Service};
@@ -41,7 +43,9 @@ fn main() {
4143
pretty_env_logger::init();
4244
let addr = "127.0.0.1:1337".parse().unwrap();
4345

44-
let server = Http::new().bind(&addr, || Ok(Echo)).unwrap();
45-
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
46-
server.run().unwrap();
46+
tokio::run(lazy(move || {
47+
let server = Http::new().bind(&addr, || Ok(Echo)).unwrap();
48+
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
49+
server.run().map_err(|err| eprintln!("Server error {}", err))
50+
}));
4751
}

examples/web_api.rs

+13-16
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22
extern crate futures;
33
extern crate hyper;
44
extern crate pretty_env_logger;
5-
extern crate tokio_core;
5+
extern crate tokio;
66

77
use futures::{Future, Stream};
8+
use futures::future::lazy;
9+
use tokio::reactor::Handle;
810

911
use hyper::{Body, Chunk, Client, Method, Request, Response, StatusCode};
1012
use hyper::server::{Http, Service};
@@ -17,13 +19,13 @@ static URL: &str = "http://127.0.0.1:1337/web_api";
1719
static INDEX: &[u8] = b"<a href=\"test.html\">test.html</a>";
1820
static LOWERCASE: &[u8] = b"i am a lower case string";
1921

20-
struct ResponseExamples(tokio_core::reactor::Handle);
22+
struct ResponseExamples(Handle);
2123

2224
impl Service for ResponseExamples {
2325
type Request = Request<Body>;
2426
type Response = Response<Body>;
2527
type Error = hyper::Error;
26-
type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;
28+
type Future = Box<Future<Item = Self::Response, Error = Self::Error> + Send>;
2729

2830
fn call(&self, req: Self::Request) -> Self::Future {
2931
match (req.method(), req.uri().path()) {
@@ -76,18 +78,13 @@ fn main() {
7678
pretty_env_logger::init();
7779
let addr = "127.0.0.1:1337".parse().unwrap();
7880

79-
let mut core = tokio_core::reactor::Core::new().unwrap();
80-
let handle = core.handle();
81-
let client_handle = core.handle();
81+
tokio::run(lazy(move || {
82+
let handle = Handle::current();
83+
let serve = Http::new().serve_addr(&addr, move || Ok(ResponseExamples(handle.clone()))).unwrap();
84+
println!("Listening on http://{} with 1 thread.", serve.incoming_ref().local_addr());
8285

83-
let serve = Http::new().serve_addr_handle(&addr, &handle, move || Ok(ResponseExamples(client_handle.clone()))).unwrap();
84-
println!("Listening on http://{} with 1 thread.", serve.incoming_ref().local_addr());
85-
86-
let h2 = handle.clone();
87-
handle.spawn(serve.for_each(move |conn| {
88-
h2.spawn(conn.map(|_| ()).map_err(|err| println!("serve error: {:?}", err)));
89-
Ok(())
90-
}).map_err(|_| ()));
91-
92-
core.run(futures::future::empty::<(), ()>()).unwrap();
86+
serve.map_err(|_| ()).for_each(move |conn| {
87+
tokio::spawn(conn.map(|_| ()).map_err(|err| println!("serve error: {:?}", err)))
88+
})
89+
}));
9390
}

src/client/conn.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ where
208208
}
209209

210210
//TODO: replace with `impl Future` when stable
211-
pub(crate) fn send_request_retryable(&mut self, req: Request<B>) -> Box<Future<Item=Response<Body>, Error=(::Error, Option<Request<B>>)>> {
211+
pub(crate) fn send_request_retryable(&mut self, req: Request<B>) -> Box<Future<Item=Response<Body>, Error=(::Error, Option<Request<B>>)> + Send> {
212212
let inner = match self.dispatch.try_send(req) {
213213
Ok(rx) => {
214214
Either::A(rx.then(move |res| {

0 commit comments

Comments
 (0)