Skip to content

convert to use tokio 0.1 #1462

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,5 @@ after_success:

env:
global:
- RUST_BACKTRACE=1
- secure: KipdEhZsGIrb2W0HsDbC95x8FJ1RKEWPq8uSK8wSZwGw6MtvoZDX0edfrtf4o3/skA0h84yn35ZWF/rpo1ZEesgFY1g+l+me+jtyGvMwEsXTGjNP4oNR2MrDizjO8eYDm4hRUCLEmJVvsq4j7oNVdLGHfdrcnwqk8/NxJsRzqXM=
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,19 @@ base64 = "0.9"
bytes = "0.4.4"
futures = "0.1.17"
futures-cpupool = "0.1.6"
futures-timer = "0.1.0"
http = "0.1.5"
httparse = "1.0"
iovec = "0.1"
language-tags = "0.2"
log = "0.4"
mime = "0.3.2"
net2 = "0.2.32"
percent-encoding = "1.0"
relay = "0.1"
time = "0.1"
tokio-core = "0.1.11"
tokio = "0.1.3"
tokio-executor = "0.1.0"
tokio-service = "0.1"
tokio-io = "0.1"
unicase = "2.0"
Expand Down
66 changes: 31 additions & 35 deletions benches/end_to_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,48 +4,47 @@
extern crate futures;
extern crate hyper;
extern crate test;
extern crate tokio_core;
extern crate tokio;

use std::net::SocketAddr;

use futures::{Future, Stream};
use tokio_core::reactor::{Core, Handle};
use tokio_core::net::TcpListener;
use tokio::runtime::Runtime;
use tokio::net::TcpListener;

use hyper::{Body, Method, Request, Response};
use hyper::server::Http;


#[bench]
fn get_one_at_a_time(b: &mut test::Bencher) {
let mut core = Core::new().unwrap();
let handle = core.handle();
let addr = spawn_hello(&handle);
let mut rt = Runtime::new().unwrap();
let addr = spawn_hello(&mut rt);

let client = hyper::Client::new(&handle);
let client = hyper::Client::configure()
.build_with_executor(&rt.handle(), rt.executor());

let url: hyper::Uri = format!("http://{}/get", addr).parse().unwrap();

b.bytes = 160 * 2 + PHRASE.len() as u64;
b.iter(move || {
let work = client.get(url.clone()).and_then(|res| {
res.into_body().into_stream().for_each(|_chunk| {
Ok(())
client.get(url.clone())
.and_then(|res| {
res.into_body().into_stream().for_each(|_chunk| {
Ok(())
})
})
});

core.run(work).unwrap();
.wait().expect("client wait");
});
}

#[bench]
fn post_one_at_a_time(b: &mut test::Bencher) {
extern crate pretty_env_logger;
let _ = pretty_env_logger::try_init();
let mut core = Core::new().unwrap();
let handle = core.handle();
let addr = spawn_hello(&handle);
let mut rt = Runtime::new().unwrap();
let addr = spawn_hello(&mut rt);

let client = hyper::Client::new(&handle);
let client = hyper::Client::configure()
.build_with_executor(&rt.handle(), rt.executor());

let url: hyper::Uri = format!("http://{}/post", addr).parse().unwrap();

Expand All @@ -55,26 +54,24 @@ fn post_one_at_a_time(b: &mut test::Bencher) {
let mut req = Request::new(post.into());
*req.method_mut() = Method::POST;
*req.uri_mut() = url.clone();
let work = client.request(req).and_then(|res| {
client.request(req).and_then(|res| {
res.into_body().into_stream().for_each(|_chunk| {
Ok(())
})
});
}).wait().expect("client wait");

core.run(work).unwrap();
});
}

static PHRASE: &'static [u8] = include_bytes!("../CHANGELOG.md"); //b"Hello, World!";

fn spawn_hello(handle: &Handle) -> SocketAddr {
fn spawn_hello(rt: &mut Runtime) -> SocketAddr {
use hyper::server::{const_service, service_fn, NewService};
let addr = "127.0.0.1:0".parse().unwrap();
let listener = TcpListener::bind(&addr, handle).unwrap();
let listener = TcpListener::bind(&addr).unwrap();
let addr = listener.local_addr().unwrap();

let handle2 = handle.clone();
let http = hyper::server::Http::<hyper::Chunk>::new();
let http = Http::<hyper::Chunk>::new();

let service = const_service(service_fn(|req: Request<Body>| {
req.into_body()
Expand All @@ -85,16 +82,15 @@ fn spawn_hello(handle: &Handle) -> SocketAddr {
})
}));

let mut conns = 0;
handle.spawn(listener.incoming().for_each(move |(socket, _addr)| {
conns += 1;
assert_eq!(conns, 1, "should only need 1 connection");
handle2.spawn(
http.serve_connection(socket, service.new_service()?)
let srv = listener.incoming()
.into_future()
.map_err(|(e, _inc)| panic!("accept error: {}", e))
.and_then(move |(accepted, _inc)| {
let socket = accepted.expect("accepted socket");
http.serve_connection(socket, service.new_service().expect("new_service"))
.map(|_| ())
.map_err(|_| ())
);
Ok(())
}).then(|_| Ok(())));
});
rt.spawn(srv);
return addr
}
3 changes: 2 additions & 1 deletion benches/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ extern crate futures;
extern crate hyper;
extern crate pretty_env_logger;
extern crate test;
extern crate tokio;

use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
Expand All @@ -30,7 +31,7 @@ macro_rules! bench_server {
})).unwrap();
let addr = srv.local_addr().unwrap();
addr_tx.send(addr).unwrap();
srv.run_until(until_rx.map_err(|_| ())).unwrap();
tokio::run(srv.run_until(until_rx.map_err(|_| ())).map_err(|e| panic!("server error: {}", e)));
});

addr_rx.recv().unwrap()
Expand Down
37 changes: 19 additions & 18 deletions examples/client.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
//#![deny(warnings)]
extern crate futures;
extern crate hyper;
extern crate tokio_core;
extern crate tokio;

extern crate pretty_env_logger;

use std::env;
use std::io::{self, Write};

use futures::Future;
use futures::stream::Stream;
use futures::{Future, Stream};
use futures::future::lazy;

use hyper::{Body, Client, Request};

Expand All @@ -30,22 +30,23 @@ fn main() {
return;
}

let mut core = tokio_core::reactor::Core::new().unwrap();
let handle = core.handle();
let client = Client::new(&handle);
tokio::run(lazy(move || {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious what the lazy usage is for. Is that because calling all this code outside of tokio::run will panic?

Copy link
Contributor Author

@srijs srijs Mar 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, most constructors including Client::default now need a thread-local handle. When we change the API I think we should probably defer most or all I/O (like port binding) until the future is polled, but for now that's the compromise I made.

let client = Client::default();

let mut req = Request::new(Body::empty());
*req.uri_mut() = url;
let work = client.request(req).and_then(|res| {
println!("Response: {}", res.status());
println!("Headers: {:#?}", res.headers());
let mut req = Request::new(Body::empty());
*req.uri_mut() = url;

res.into_parts().1.into_stream().for_each(|chunk| {
io::stdout().write_all(&chunk).map_err(From::from)
})
}).map(|_| {
println!("\n\nDone.");
});
client.request(req).and_then(|res| {
println!("Response: {}", res.status());
println!("Headers: {:#?}", res.headers());

core.run(work).unwrap();
res.into_parts().1.into_stream().for_each(|chunk| {
io::stdout().write_all(&chunk).map_err(From::from)
})
}).map(|_| {
println!("\n\nDone.");
}).map_err(|err| {
eprintln!("Error {}", err);
})
}));
}
19 changes: 13 additions & 6 deletions examples/hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
extern crate hyper;
extern crate futures;
extern crate pretty_env_logger;
extern crate tokio;

use futures::Future;
use futures::future::lazy;

use hyper::{Body, Response};
use hyper::server::{Http, const_service, service_fn};
Expand All @@ -16,10 +20,13 @@ fn main() {
Ok(Response::new(Body::from(PHRASE)))
}));

let server = Http::new()
.sleep_on_errors(true)
.bind(&addr, new_service)
.unwrap();
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
server.run().unwrap();
tokio::run(lazy(move || {
let server = Http::new()
.sleep_on_errors(true)
.bind(&addr, new_service)
.unwrap();

println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the lines printing about using 1 thread can probably be adjusted now :D

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heh, will do!

server.run().map_err(|err| eprintln!("Server error {}", err))
}));
}
36 changes: 16 additions & 20 deletions examples/multi_server.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
#![deny(warnings)]
extern crate hyper;
extern crate futures;
extern crate tokio_core;
extern crate pretty_env_logger;
extern crate tokio;

use futures::{Future, Stream};
use futures::future::FutureResult;
use futures::future::{FutureResult, lazy};

use hyper::{Body, Method, Request, Response, StatusCode};
use tokio_core::reactor::Core;
use hyper::server::{Http, Service};

static INDEX1: &'static [u8] = b"The 1st service!";
Expand Down Expand Up @@ -44,26 +43,23 @@ fn main() {
let addr1 = "127.0.0.1:1337".parse().unwrap();
let addr2 = "127.0.0.1:1338".parse().unwrap();

let mut core = Core::new().unwrap();
let handle = core.handle();
tokio::run(lazy(move || {
let srv1 = Http::new().serve_addr(&addr1, || Ok(Srv(INDEX1))).unwrap();
let srv2 = Http::new().serve_addr(&addr2, || Ok(Srv(INDEX2))).unwrap();

let srv1 = Http::new().serve_addr_handle(&addr1, &handle, || Ok(Srv(INDEX1))).unwrap();
let srv2 = Http::new().serve_addr_handle(&addr2, &handle, || Ok(Srv(INDEX2))).unwrap();
println!("Listening on http://{}", srv1.incoming_ref().local_addr());
println!("Listening on http://{}", srv2.incoming_ref().local_addr());

println!("Listening on http://{}", srv1.incoming_ref().local_addr());
println!("Listening on http://{}", srv2.incoming_ref().local_addr());
tokio::spawn(srv1.for_each(move |conn| {
tokio::spawn(conn.map(|_| ()).map_err(|err| println!("srv1 error: {:?}", err)));
Ok(())
}).map_err(|_| ()));

let handle1 = handle.clone();
handle.spawn(srv1.for_each(move |conn| {
handle1.spawn(conn.map(|_| ()).map_err(|err| println!("srv1 error: {:?}", err)));
Ok(())
}).map_err(|_| ()));
tokio::spawn(srv2.for_each(move |conn| {
tokio::spawn(conn.map(|_| ()).map_err(|err| println!("srv2 error: {:?}", err)));
Ok(())
}).map_err(|_| ()));

let handle2 = handle.clone();
handle.spawn(srv2.for_each(move |conn| {
handle2.spawn(conn.map(|_| ()).map_err(|err| println!("srv2 error: {:?}", err)));
Ok(())
}).map_err(|_| ()));

core.run(futures::future::empty::<(), ()>()).unwrap();
}));
}
12 changes: 8 additions & 4 deletions examples/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
extern crate futures;
extern crate hyper;
extern crate pretty_env_logger;
extern crate tokio;
extern crate url;

use futures::{Future, Stream};
use futures::future::lazy;

use hyper::{Body, Method, Request, Response, StatusCode};
use hyper::server::{Http, Service};
Expand All @@ -22,7 +24,7 @@ impl Service for ParamExample {
type Request = Request<Body>;
type Response = Response<Body>;
type Error = hyper::Error;
type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;
type Future = Box<Future<Item = Self::Response, Error = Self::Error> + Send>;

fn call(&self, req: Request<Body>) -> Self::Future {
match (req.method(), req.uri().path()) {
Expand Down Expand Up @@ -96,7 +98,9 @@ fn main() {
pretty_env_logger::init();
let addr = "127.0.0.1:1337".parse().unwrap();

let server = Http::new().bind(&addr, || Ok(ParamExample)).unwrap();
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
server.run().unwrap();
tokio::run(lazy(move || {
let server = Http::new().bind(&addr, || Ok(ParamExample)).unwrap();
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
server.run().map_err(|err| eprintln!("Server error {}", err))
}));
}
14 changes: 9 additions & 5 deletions examples/send_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
extern crate futures;
extern crate hyper;
extern crate pretty_env_logger;
extern crate tokio;

use futures::{Future/*, Sink*/};
use futures::future::lazy;
use futures::sync::oneshot;

use hyper::{Body, /*Chunk,*/ Method, Request, Response, StatusCode};
Expand All @@ -17,7 +19,7 @@ use std::thread;
static NOTFOUND: &[u8] = b"Not Found";
static INDEX: &str = "examples/send_file_index.html";

fn simple_file_send(f: &str) -> Box<Future<Item = Response<Body>, Error = hyper::Error>> {
fn simple_file_send(f: &str) -> Box<Future<Item = Response<Body>, Error = hyper::Error> + Send> {
// Serve a file by reading it entirely into memory. As a result
// this is limited to serving small files, but it is somewhat
// simpler with a little less overhead.
Expand Down Expand Up @@ -63,7 +65,7 @@ impl Service for ResponseExamples {
type Request = Request<Body>;
type Response = Response<Body>;
type Error = hyper::Error;
type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;
type Future = Box<Future<Item = Self::Response, Error = Self::Error> + Send>;

fn call(&self, req: Request<Body>) -> Self::Future {
match (req.method(), req.uri().path()) {
Expand Down Expand Up @@ -139,7 +141,9 @@ fn main() {
pretty_env_logger::init();
let addr = "127.0.0.1:1337".parse().unwrap();

let server = Http::new().bind(&addr, || Ok(ResponseExamples)).unwrap();
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
server.run().unwrap();
tokio::run(lazy(move || {
let server = Http::new().bind(&addr, || Ok(ResponseExamples)).unwrap();
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
server.run().map_err(|err| eprintln!("Server error {}", err))
}));
}
Loading