Skip to content

refactor(lib): convert to futures 0.2.0-beta #1470

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,17 @@ include = [

[dependencies]
bytes = "0.4.4"
futures = "0.1.17"
futures-cpupool = "0.1.6"
futures-timer = "0.1.0"
futures = "0.2.0-beta"
futures-timer = { git = "https://github.com/alexcrichton/futures-timer.git" }
http = "0.1.5"
httparse = "1.0"
iovec = "0.1"
log = "0.4"
net2 = "0.2.32"
time = "0.1"
tokio = "0.1.3"
tokio-executor = "0.1.0"
tokio-service = "0.1"
tokio-io = "0.1"
want = "0.0.2"
tokio = { git = "https://github.com/seanmonstar/tokio.git", branch = "futures2-use-after-free", features = ["unstable-futures"] }
tokio-executor = { git = "https://github.com/seanmonstar/tokio.git", branch = "futures2-use-after-free", features = ["unstable-futures"] }
want = { git = "https://github.com/srijs/want.git", branch = "futures-0.2" }

[dev-dependencies]
num_cpus = "1.0"
Expand All @@ -45,3 +42,6 @@ url = "1.0"

[features]
nightly = []

[replace]
"futures:0.2.0-beta" = { git = "https://github.com/srijs/futures-rs.git", branch = "with-executor" }
34 changes: 20 additions & 14 deletions benches/end_to_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ extern crate tokio;

use std::net::SocketAddr;

use futures::{Future, Stream};
use futures::{FutureExt, StreamExt};
use futures::executor::block_on;
use tokio::runtime::Runtime;
use tokio::net::TcpListener;

Expand All @@ -22,19 +23,20 @@ fn get_one_at_a_time(b: &mut test::Bencher) {
let addr = spawn_hello(&mut rt);

let client = hyper::Client::configure()
.build_with_executor(&rt.handle(), rt.executor());
.build(&rt.handle());

let url: hyper::Uri = format!("http://{}/get", addr).parse().unwrap();

b.bytes = 160 * 2 + PHRASE.len() as u64;
b.iter(move || {
client.get(url.clone())
block_on(client.get(url.clone())
.with_executor(rt.executor())
.and_then(|res| {
res.into_body().into_stream().for_each(|_chunk| {
Ok(())
})
}).map(|_| ())
})
.wait().expect("client wait");
).expect("client wait");
});
}

Expand All @@ -44,7 +46,7 @@ fn post_one_at_a_time(b: &mut test::Bencher) {
let addr = spawn_hello(&mut rt);

let client = hyper::Client::configure()
.build_with_executor(&rt.handle(), rt.executor());
.build(&rt.handle());

let url: hyper::Uri = format!("http://{}/post", addr).parse().unwrap();

Expand All @@ -54,11 +56,14 @@ fn post_one_at_a_time(b: &mut test::Bencher) {
let mut req = Request::new(post.into());
*req.method_mut() = Method::POST;
*req.uri_mut() = url.clone();
client.request(req).and_then(|res| {
res.into_body().into_stream().for_each(|_chunk| {
Ok(())
block_on(client.request(req)
.with_executor(rt.executor())
.and_then(|res| {
res.into_body().into_stream().for_each(|_chunk| {
Ok(())
}).map(|_| ())
})
}).wait().expect("client wait");
).expect("client wait");

});
}
Expand All @@ -76,21 +81,22 @@ fn spawn_hello(rt: &mut Runtime) -> SocketAddr {
let service = const_service(service_fn(|req: Request<Body>| {
req.into_body()
.into_stream()
.concat2()
.concat()
.map(|_| {
Response::new(Body::from(PHRASE))
})
}));

let srv = listener.incoming()
.into_future()
.next()
.map_err(|(e, _inc)| panic!("accept error: {}", e))
.and_then(move |(accepted, _inc)| {
let socket = accepted.expect("accepted socket");
http.serve_connection(socket, service.new_service().expect("new_service"))
.map(|_| ())
.map_err(|_| ())
});
rt.spawn(srv);
})
.map_err(|_| panic!("server error"));
rt.spawn2(srv);
return addr
}
6 changes: 3 additions & 3 deletions benches/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::mpsc;

use futures::{future, stream, Future, Stream};
use futures::sync::oneshot;
use futures::{future, stream, FutureExt, StreamExt};
use futures::channel::oneshot;

use hyper::{Body, Request, Response};
use hyper::server::Service;
Expand All @@ -31,7 +31,7 @@ macro_rules! bench_server {
})).unwrap();
let addr = srv.local_addr().unwrap();
addr_tx.send(addr).unwrap();
tokio::run(srv.run_until(until_rx.map_err(|_| ())).map_err(|e| panic!("server error: {}", e)));
tokio::runtime::run2(srv.run_until(until_rx.map_err(|_| ())).map_err(|e| panic!("server error: {}", e)));
});

addr_rx.recv().unwrap()
Expand Down
15 changes: 9 additions & 6 deletions examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ extern crate pretty_env_logger;
use std::env;
use std::io::{self, Write};

use futures::{Future, Stream};
use futures::{FutureExt, StreamExt};
use futures::future::lazy;

use hyper::{Body, Client, Request};
Expand All @@ -30,7 +30,7 @@ fn main() {
return;
}

tokio::run(lazy(move || {
tokio::runtime::run2(lazy(move |_| {
let client = Client::default();

let mut req = Request::new(Body::empty());
Expand All @@ -43,10 +43,13 @@ fn main() {
res.into_parts().1.into_stream().for_each(|chunk| {
io::stdout().write_all(&chunk).map_err(From::from)
})
}).map(|_| {
println!("\n\nDone.");
}).map_err(|err| {
eprintln!("Error {}", err);
}).then(|result| {
if let Some(err) = result.err() {
eprintln!("Error {}", err);
} else {
println!("\n\nDone.");
}
Ok(())
})
}));
}
8 changes: 4 additions & 4 deletions examples/hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ extern crate futures;
extern crate pretty_env_logger;
extern crate tokio;

use futures::Future;
use futures::FutureExt;
use futures::future::lazy;

use hyper::{Body, Response};
Expand All @@ -20,13 +20,13 @@ fn main() {
Ok(Response::new(Body::from(PHRASE)))
}));

tokio::run(lazy(move || {
tokio::runtime::run2(lazy(move |_| {
let server = Http::new()
.sleep_on_errors(true)
.bind(&addr, new_service)
.unwrap();

println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
server.run().map_err(|err| eprintln!("Server error {}", err))
println!("Listening on http://{}", server.local_addr().unwrap());
server.run().map_err(|err| panic!("Server error {}", err))
}));
}
19 changes: 9 additions & 10 deletions examples/multi_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ extern crate futures;
extern crate pretty_env_logger;
extern crate tokio;

use futures::{Future, Stream};
use futures::{FutureExt, StreamExt};
use futures::future::{FutureResult, lazy};
use futures::executor::spawn;

use hyper::{Body, Method, Request, Response, StatusCode};
use hyper::server::{Http, Service};
Expand Down Expand Up @@ -43,22 +44,20 @@ fn main() {
let addr1 = "127.0.0.1:1337".parse().unwrap();
let addr2 = "127.0.0.1:1338".parse().unwrap();

tokio::run(lazy(move || {
tokio::runtime::run2(lazy(move |_| {
let srv1 = Http::new().serve_addr(&addr1, || Ok(Srv(INDEX1))).unwrap();
let srv2 = Http::new().serve_addr(&addr2, || Ok(Srv(INDEX2))).unwrap();

println!("Listening on http://{}", srv1.incoming_ref().local_addr());
println!("Listening on http://{}", srv2.incoming_ref().local_addr());

tokio::spawn(srv1.for_each(move |conn| {
tokio::spawn(conn.map(|_| ()).map_err(|err| println!("srv1 error: {:?}", err)));
Ok(())
}).map_err(|_| ()));
spawn(srv1.map_err(|err| panic!("srv1 error: {:?}", err)).for_each(move |conn| {
spawn(conn.map(|_| ()).map_err(|err| panic!("srv1 error: {:?}", err)))
}).map(|_| ()));

tokio::spawn(srv2.for_each(move |conn| {
tokio::spawn(conn.map(|_| ()).map_err(|err| println!("srv2 error: {:?}", err)));
Ok(())
}).map_err(|_| ()));
spawn(srv2.map_err(|err| panic!("srv2 error: {:?}", err)).for_each(move |conn| {
spawn(conn.map(|_| ()).map_err(|err| panic!("srv2 error: {:?}", err)))
}).map(|_| ()));

Ok(())
}));
Expand Down
12 changes: 7 additions & 5 deletions examples/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ extern crate pretty_env_logger;
extern crate tokio;
extern crate url;

use futures::{Future, Stream};
use futures::{Future, FutureExt, StreamExt};
use futures::future::lazy;

use hyper::{Body, Method, Request, Response, StatusCode};
Expand All @@ -32,7 +32,7 @@ impl Service for ParamExample {
Box::new(futures::future::ok(Response::new(INDEX.into())))
},
(&Method::POST, "/post") => {
Box::new(req.into_parts().1.into_stream().concat2().map(|b| {
Box::new(req.into_parts().1.into_stream().concat().map(|b| {
// Parse the request body. form_urlencoded::parse
// always succeeds, but in general parsing may
// fail (for example, an invalid post of json), so
Expand Down Expand Up @@ -98,9 +98,11 @@ fn main() {
pretty_env_logger::init();
let addr = "127.0.0.1:1337".parse().unwrap();

tokio::run(lazy(move || {
tokio::runtime::run2(lazy(move |_| {
let server = Http::new().bind(&addr, || Ok(ParamExample)).unwrap();
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
server.run().map_err(|err| eprintln!("Server error {}", err))
println!("Listening on http://{}", server.local_addr().unwrap());
server.run().recover(|err| {
eprintln!("Server error {}", err)
})
}));
}
10 changes: 5 additions & 5 deletions examples/send_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ extern crate hyper;
extern crate pretty_env_logger;
extern crate tokio;

use futures::{Future/*, Sink*/};
use futures::{Future, FutureExt};
use futures::future::lazy;
use futures::sync::oneshot;
use futures::channel::oneshot;

use hyper::{Body, /*Chunk,*/ Method, Request, Response, StatusCode};
use hyper::error::Error;
Expand Down Expand Up @@ -141,9 +141,9 @@ fn main() {
pretty_env_logger::init();
let addr = "127.0.0.1:1337".parse().unwrap();

tokio::run(lazy(move || {
tokio::runtime::run2(lazy(move |_| {
let server = Http::new().bind(&addr, || Ok(ResponseExamples)).unwrap();
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
server.run().map_err(|err| eprintln!("Server error {}", err))
println!("Listening on http://{}", server.local_addr().unwrap());
server.run().map_err(|err| panic!("Server error {}", err))
}));
}
10 changes: 6 additions & 4 deletions examples/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ extern crate hyper;
extern crate pretty_env_logger;
extern crate tokio;

use futures::Future;
use futures::FutureExt;
use futures::future::{FutureResult, lazy};

use hyper::{Body, Method, Request, Response, StatusCode};
Expand Down Expand Up @@ -43,9 +43,11 @@ fn main() {
pretty_env_logger::init();
let addr = "127.0.0.1:1337".parse().unwrap();

tokio::run(lazy(move || {
tokio::runtime::run2(lazy(move |_| {
let server = Http::new().bind(&addr, || Ok(Echo)).unwrap();
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
server.run().map_err(|err| eprintln!("Server error {}", err))
println!("Listening on http://{}", server.local_addr().unwrap());
server.run().recover(|err| {
eprintln!("Server error {}", err)
})
}));
}
17 changes: 10 additions & 7 deletions examples/web_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ extern crate hyper;
extern crate pretty_env_logger;
extern crate tokio;

use futures::{Future, Stream};
use futures::{Future, FutureExt, StreamExt};
use futures::executor::spawn;
use futures::future::lazy;
use tokio::reactor::Handle;

use hyper::{Body, Chunk, Client, Method, Request, Response, StatusCode};
use hyper::server::{Http, Service};

#[allow(unused)]
#[allow(unused, deprecated)]
use std::ascii::AsciiExt;

static NOTFOUND: &[u8] = b"Not Found";
Expand Down Expand Up @@ -78,13 +79,15 @@ fn main() {
pretty_env_logger::init();
let addr = "127.0.0.1:1337".parse().unwrap();

tokio::run(lazy(move || {
tokio::runtime::run2(lazy(move |_| {
let handle = Handle::current();
let serve = Http::new().serve_addr(&addr, move || Ok(ResponseExamples(handle.clone()))).unwrap();
println!("Listening on http://{} with 1 thread.", serve.incoming_ref().local_addr());
println!("Listening on http://{}", serve.incoming_ref().local_addr());

serve.map_err(|_| ()).for_each(move |conn| {
tokio::spawn(conn.map(|_| ()).map_err(|err| println!("serve error: {:?}", err)))
})
serve.map_err(|err| panic!("server error {:?}", err)).for_each(move |conn| {
spawn(conn.recover(|err| {
println!("connection error: {:?}", err);
}))
}).map(|_| ())
}));
}
Loading