From 152af752528f8513c77b757f7c5439d563e043c6 Mon Sep 17 00:00:00 2001 From: Weihang Lo Date: Mon, 15 Jul 2019 00:51:09 +0800 Subject: [PATCH 1/4] docs(examples): update proxy example to async/await --- examples/proxy.rs | 48 ++++++++++++++++++++++++++++++++++++++ examples_disabled/proxy.rs | 44 ---------------------------------- 2 files changed, 48 insertions(+), 44 deletions(-) create mode 100644 examples/proxy.rs delete mode 100644 examples_disabled/proxy.rs diff --git a/examples/proxy.rs b/examples/proxy.rs new file mode 100644 index 0000000000..36f5e82315 --- /dev/null +++ b/examples/proxy.rs @@ -0,0 +1,48 @@ +#![feature(async_await)] +#![deny(warnings)] + +use hyper::{Client, Error, Server}; +use hyper::service::{make_service_fn, service_fn}; +use std::net::SocketAddr; + +#[hyper::rt::main] +async fn main() { + pretty_env_logger::init(); + + let in_addr = ([127, 0, 0, 1], 3001).into(); + let out_addr: SocketAddr = ([127, 0, 0, 1], 3000).into(); + + let client_main = Client::new(); + + let out_addr_clone = out_addr.clone(); + + // The closure inside `make_service_fn` is run for each connection, + // creating a 'service' to handle requests for that specific connection. + let make_service = make_service_fn(move |_| { + let client = client_main.clone(); + + async move { + // This is the `Service` that will handle the connection. + // `service_fn` is a helper to convert a function that + // returns a Response into a `Service`. + Ok::<_, Error>(service_fn(move |mut req| { + let uri_string = format!("http://{}/{}", + out_addr_clone, + req.uri().path_and_query().map(|x| x.as_str()).unwrap_or("")); + let uri = uri_string.parse().unwrap(); + *req.uri_mut() = uri; + client.request(req) + })) + } + }); + + let server = Server::bind(&in_addr) + .serve(make_service); + + println!("Listening on http://{}", in_addr); + println!("Proxying on http://{}", out_addr); + + if let Err(e) = server.await { + eprintln!("server error: {}", e); + } +} diff --git a/examples_disabled/proxy.rs b/examples_disabled/proxy.rs deleted file mode 100644 index 49b7cf3e36..0000000000 --- a/examples_disabled/proxy.rs +++ /dev/null @@ -1,44 +0,0 @@ -#![deny(warnings)] -extern crate hyper; -extern crate pretty_env_logger; - -use hyper::{Client, Server}; -use hyper::service::service_fn; -use hyper::rt::{self, Future}; -use std::net::SocketAddr; - -fn main() { - pretty_env_logger::init(); - - let in_addr = ([127, 0, 0, 1], 3001).into(); - let out_addr: SocketAddr = ([127, 0, 0, 1], 3000).into(); - - let client_main = Client::new(); - - let out_addr_clone = out_addr.clone(); - // new_service is run for each connection, creating a 'service' - // to handle requests for that specific connection. - let new_service = move || { - let client = client_main.clone(); - // This is the `Service` that will handle the connection. - // `service_fn_ok` is a helper to convert a function that - // returns a Response into a `Service`. - service_fn(move |mut req| { - let uri_string = format!("http://{}/{}", - out_addr_clone, - req.uri().path_and_query().map(|x| x.as_str()).unwrap_or("")); - let uri = uri_string.parse().unwrap(); - *req.uri_mut() = uri; - client.request(req) - }) - }; - - let server = Server::bind(&in_addr) - .serve(new_service) - .map_err(|e| eprintln!("server error: {}", e)); - - println!("Listening on http://{}", in_addr); - println!("Proxying on http://{}", out_addr); - - rt::run(server); -} From 9002bacc6e647a747192e5f86175e0b9c9473651 Mon Sep 17 00:00:00 2001 From: Weihang Lo Date: Mon, 15 Jul 2019 06:57:38 +0800 Subject: [PATCH 2/4] docs(examples): update state example to async/await --- examples/state.rs | 56 ++++++++++++++++++++++++++++++++++++++ examples_disabled/state.rs | 55 ------------------------------------- 2 files changed, 56 insertions(+), 55 deletions(-) create mode 100644 examples/state.rs delete mode 100644 examples_disabled/state.rs diff --git a/examples/state.rs b/examples/state.rs new file mode 100644 index 0000000000..4438959e3c --- /dev/null +++ b/examples/state.rs @@ -0,0 +1,56 @@ +#![feature(async_await)] +#![deny(warnings)] + +use std::sync::{Arc, atomic::{AtomicUsize, Ordering}}; + +use hyper::{Body, Error, Response, Server}; +use hyper::service::{make_service_fn, service_fn}; + +#[hyper::rt::main] +async fn main() { + pretty_env_logger::init(); + + let addr = ([127, 0, 0, 1], 3000).into(); + + // For the most basic of state, we just share a counter, that increments + // with each request, and we send its value back in the response. + let counter = Arc::new(AtomicUsize::new(0)); + + // The closure inside `make_service_fn` is run for each connection, + // creating a 'service' to handle requests for that specific connection. + let make_service = make_service_fn(move |_| { + // While the state was moved into the make_service closure, + // we need to clone it here because this closure is called + // once for every connection. + // + // Each connection could send multiple requests, so + // the `Service` needs a clone to handle later requests. + let counter = counter.clone(); + + async move { + // This is the `Service` that will handle the connection. + // `service_fn` is a helper to convert a function that + // returns a Response into a `Service`. + Ok::<_, Error>(service_fn(move |_req| { + // Get the current count, and also increment by 1, in a single + // atomic operation. + let count = counter.fetch_add(1, Ordering::AcqRel); + async move { + Ok::<_, Error>( + Response::new(Body::from(format!("Request #{}", count))) + ) + } + })) + } + }); + + let server = Server::bind(&addr) + .serve(make_service); + + println!("Listening on http://{}", addr); + + if let Err(e) = server.await { + eprintln!("server error: {}", e); + } +} + diff --git a/examples_disabled/state.rs b/examples_disabled/state.rs deleted file mode 100644 index 55f6d93a9c..0000000000 --- a/examples_disabled/state.rs +++ /dev/null @@ -1,55 +0,0 @@ -#![deny(warnings)] -extern crate hyper; -extern crate pretty_env_logger; -extern crate serde_json; - -use std::sync::{Arc, atomic::{AtomicUsize, Ordering}}; - -use hyper::{Body, Response, Server}; -use hyper::service::service_fn_ok; -use hyper::rt::{self, Future}; - -fn main() { - pretty_env_logger::init(); - - let addr = ([127, 0, 0, 1], 3000).into(); - - // For the most basic of state, we just share a counter, that increments - // with each request, and we send its value back in the response. - let counter = Arc::new(AtomicUsize::new(0)); - - // new_service is run for each connection, creating a 'service' - // to handle requests for that specific connection. - let new_service = move || { - // While the state was moved into the new_service closure, - // we need to clone it here because this closure is called - // once for every connection. - // - // Each connection could send multiple requests, so - // the `Service` needs a clone to handle later requests. - let counter = counter.clone(); - - // This is the `Service` that will handle the connection. - // `service_fn_ok` is a helper to convert a function that - // returns a Response into a `Service`. - // - // If you wanted to return a `Future` of a `Response`, such as because - // you wish to load data from a database or do other things, you - // could change this to `service_fn` instead. - service_fn_ok(move |_req| { - // Get the current count, and also increment by 1, in a single - // atomic operation. - let count = counter.fetch_add(1, Ordering::AcqRel); - Response::new(Body::from(format!("Request #{}", count))) - }) - }; - - let server = Server::bind(&addr) - .serve(new_service) - .map_err(|e| eprintln!("server error: {}", e)); - - println!("Listening on http://{}", addr); - - rt::run(server); -} - From 39846821d07307b331aaa5a09a45e77c78320c64 Mon Sep 17 00:00:00 2001 From: Weihang Lo Date: Mon, 15 Jul 2019 07:31:48 +0800 Subject: [PATCH 3/4] docs(examples): update single_threaded example to async/await --- examples/single_threaded.rs | 55 ++++++++++++++++++++++++++++ examples_disabled/single_threaded.rs | 51 -------------------------- 2 files changed, 55 insertions(+), 51 deletions(-) create mode 100644 examples/single_threaded.rs delete mode 100644 examples_disabled/single_threaded.rs diff --git a/examples/single_threaded.rs b/examples/single_threaded.rs new file mode 100644 index 0000000000..1105f060d2 --- /dev/null +++ b/examples/single_threaded.rs @@ -0,0 +1,55 @@ +#![feature(async_await)] +#![deny(warnings)] + +use std::cell::Cell; +use std::rc::Rc; + +use hyper::{Body, Error, Response, Server}; +use hyper::service::{make_service_fn, service_fn}; +use tokio::runtime::current_thread; + +// Configure a runtime that runs everything on the current thread, +// which means it can spawn !Send futures... +#[hyper::rt::main(single_thread)] +async fn main() { + pretty_env_logger::init(); + + let addr = ([127, 0, 0, 1], 3000).into(); + + // Using a !Send request counter is fine on 1 thread... + let counter = Rc::new(Cell::new(0)); + + let make_service = make_service_fn(move |_| { + // For each connection, clone the counter to use in our service... + let cnt = counter.clone(); + + async move { + Ok::<_, Error>(service_fn(move |_| { + let prev = cnt.get(); + cnt.set(prev + 1); + let value = cnt.get(); + async move { + Ok::<_, Error>(Response::new(Body::from( + format!("Request #{}", value) + ))) + } + })) + } + }); + + // Since the Server needs to spawn some background tasks, we needed + // to configure an Executor that can spawn !Send futures... + let exec = current_thread::TaskExecutor::current(); + + let server = Server::bind(&addr) + .executor(exec) + .serve(make_service); + + println!("Listening on http://{}", addr); + + // The server would block on current thread to await !Send futures. + if let Err(e) = server.await { + eprintln!("server error: {}", e); + } +} + diff --git a/examples_disabled/single_threaded.rs b/examples_disabled/single_threaded.rs deleted file mode 100644 index d39158bc45..0000000000 --- a/examples_disabled/single_threaded.rs +++ /dev/null @@ -1,51 +0,0 @@ -#![deny(warnings)] -extern crate futures; -extern crate hyper; -extern crate pretty_env_logger; -extern crate tokio; - -use std::cell::Cell; -use std::rc::Rc; - -use hyper::{Body, Response, Server}; -use hyper::service::service_fn_ok; -use hyper::rt::Future; -use tokio::runtime::current_thread; - -fn main() { - pretty_env_logger::init(); - - let addr = ([127, 0, 0, 1], 3000).into(); - - // Using a !Send request counter is fine on 1 thread... - let counter = Rc::new(Cell::new(0)); - - let new_service = move || { - // For each connection, clone the counter to use in our service... - let cnt = counter.clone(); - - service_fn_ok(move |_| { - let prev = cnt.get(); - cnt.set(prev + 1); - Response::new(Body::from(format!("Request count: {}", prev + 1))) - }) - }; - - // Since the Server needs to spawn some background tasks, we needed - // to configure an Executor that can spawn !Send futures... - let exec = current_thread::TaskExecutor::current(); - - let server = Server::bind(&addr) - .executor(exec) - .serve(new_service) - .map_err(|e| eprintln!("server error: {}", e)); - - println!("Listening on http://{}", addr); - - current_thread::Runtime::new() - .expect("rt new") - .spawn(server) - .run() - .expect("rt run"); -} - From c353d986152b461d3fbe3d68c09c4de82acd31d6 Mon Sep 17 00:00:00 2001 From: Weihang Lo Date: Mon, 15 Jul 2019 09:40:30 +0800 Subject: [PATCH 4/4] docs(examples): update upgrade example to async/await --- examples/upgrades.rs | 154 ++++++++++++++++++++++++++++++++++ examples_disabled/upgrades.rs | 127 ---------------------------- 2 files changed, 154 insertions(+), 127 deletions(-) create mode 100644 examples/upgrades.rs delete mode 100644 examples_disabled/upgrades.rs diff --git a/examples/upgrades.rs b/examples/upgrades.rs new file mode 100644 index 0000000000..b79a8a5d97 --- /dev/null +++ b/examples/upgrades.rs @@ -0,0 +1,154 @@ +#![feature(async_await)] +#![deny(warnings)] + +// Note: `hyper::upgrade` docs link to this upgrade. +use std::str; + +use tokio::sync::oneshot; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; + +use hyper::{Body, Client, Request, Response, Server, StatusCode}; +use hyper::header::{UPGRADE, HeaderValue}; +use hyper::service::{make_service_fn, service_fn}; +use hyper::upgrade::Upgraded; +use std::net::SocketAddr; + +// A simple type alias so as to DRY. +type Result = std::result::Result>; + +/// Handle server-side I/O after HTTP upgraded. +async fn server_upgraded_io(mut upgraded: Upgraded) -> Result<()> { + // we have an upgraded connection that we can read and + // write on directly. + // + // since we completely control this example, we know exactly + // how many bytes the client will write, so just read exact... + let mut vec = vec![0; 7]; + upgraded.read_exact(&mut vec).await?; + println!("server[foobar] recv: {:?}", str::from_utf8(&vec)); + + // and now write back the server 'foobar' protocol's + // response... + upgraded.write_all(b"barr=foo").await?; + println!("server[foobar] sent"); + Ok(()) +} + +/// Our server HTTP handler to initiate HTTP upgrades. +async fn server_upgrade(req: Request) -> Result> { + let mut res = Response::new(Body::empty()); + + // Send a 400 to any request that doesn't have + // an `Upgrade` header. + if !req.headers().contains_key(UPGRADE) { + *res.status_mut() = StatusCode::BAD_REQUEST; + return Ok(res); + } + + // Setup a future that will eventually receive the upgraded + // connection and talk a new protocol, and spawn the future + // into the runtime. + // + // Note: This can't possibly be fulfilled until the 101 response + // is returned below, so it's better to spawn this future instead + // waiting for it to complete to then return a response. + hyper::rt::spawn(async move { + match req.into_body().on_upgrade().await { + Ok(upgraded) => { + if let Err(e) = server_upgraded_io(upgraded).await { + eprintln!("server foobar io error: {}", e) + }; + } + Err(e) => eprintln!("upgrade error: {}", e) + } + }); + + // Now return a 101 Response saying we agree to the upgrade to some + // made-up 'foobar' protocol. + *res.status_mut() = StatusCode::SWITCHING_PROTOCOLS; + res.headers_mut().insert(UPGRADE, HeaderValue::from_static("foobar")); + Ok(res) +} + +/// Handle client-side I/O after HTTP upgraded. +async fn client_upgraded_io(mut upgraded: Upgraded) -> Result<()> { + // We've gotten an upgraded connection that we can read + // and write directly on. Let's start out 'foobar' protocol. + upgraded.write_all(b"foo=bar").await?; + println!("client[foobar] sent"); + + let mut vec = Vec::new(); + upgraded.read_to_end(&mut vec).await?; + println!("client[foobar] recv: {:?}", str::from_utf8(&vec)); + + Ok(()) +} + +/// Our client HTTP handler to initiate HTTP upgrades. +async fn client_upgrade_request(addr: SocketAddr) -> Result<()> { + let req = Request::builder() + .uri(format!("http://{}/", addr)) + .header(UPGRADE, "foobar") + .body(Body::empty()) + .unwrap(); + + let res = Client::new().request(req).await?; + if res.status() != StatusCode::SWITCHING_PROTOCOLS { + panic!("Our server didn't upgrade: {}", res.status()); + } + + match res.into_body().on_upgrade().await { + Ok(upgraded) => { + if let Err(e) = client_upgraded_io(upgraded).await { + eprintln!("client foobar io error: {}", e) + }; + } + Err(e) => eprintln!("upgrade error: {}", e) + } + + Ok(()) +} + +#[hyper::rt::main] +async fn main() { + // For this example, we just make a server and our own client to talk to + // it, so the exact port isn't important. Instead, let the OS give us an + // unused port. + let addr = ([127, 0, 0, 1], 0).into(); + + let make_service = make_service_fn(|_| async { + Ok::<_, hyper::Error>(service_fn(server_upgrade)) + }); + + let server = Server::bind(&addr) + .serve(make_service); + + // We need the assigned address for the client to send it messages. + let addr = server.local_addr(); + + // For this example, a oneshot is used to signal that after 1 request, + // the server should be shutdown. + let (tx, rx) = oneshot::channel::<()>(); + let server = server + .with_graceful_shutdown(async { + rx.await.ok(); + }); + + // Spawn server on the default executor, + // which is usually a thread-pool from tokio default runtime. + hyper::rt::spawn(async { + if let Err(e) = server.await { + eprintln!("server error: {}", e); + } + }); + + // Client requests a HTTP connection upgrade. + let request = client_upgrade_request(addr.clone()); + if let Err(e) = request.await { + eprintln!("client error: {}", e); + } + + // Complete the oneshot so that the server stops + // listening and the process can close down. + let _ = tx.send(()); +} diff --git a/examples_disabled/upgrades.rs b/examples_disabled/upgrades.rs deleted file mode 100644 index 3cee4940a9..0000000000 --- a/examples_disabled/upgrades.rs +++ /dev/null @@ -1,127 +0,0 @@ -// Note: `hyper::upgrade` docs link to this upgrade. -extern crate futures; -extern crate hyper; -extern crate tokio; - -use std::str; - -use futures::sync::oneshot; - -use hyper::{Body, Client, Request, Response, Server, StatusCode}; -use hyper::header::{UPGRADE, HeaderValue}; -use hyper::rt::{self, Future}; -use hyper::service::service_fn_ok; - -/// Our server HTTP handler to initiate HTTP upgrades. -fn server_upgrade(req: Request) -> Response { - let mut res = Response::new(Body::empty()); - - // Send a 400 to any request that doesn't have - // an `Upgrade` header. - if !req.headers().contains_key(UPGRADE) { - *res.status_mut() = StatusCode::BAD_REQUEST; - return res; - } - - // Setup a future that will eventually receive the upgraded - // connection and talk a new protocol, and spawn the future - // into the runtime. - // - // Note: This can't possibly be fulfilled until the 101 response - // is returned below, so it's better to spawn this future instead - // waiting for it to complete to then return a response. - let on_upgrade = req - .into_body() - .on_upgrade() - .map_err(|err| eprintln!("upgrade error: {}", err)) - .and_then(|upgraded| { - // We have an upgraded connection that we can read and - // write on directly. - // - // Since we completely control this example, we know exactly - // how many bytes the client will write, so just read exact... - tokio::io::read_exact(upgraded, vec![0; 7]) - .and_then(|(upgraded, vec)| { - println!("server[foobar] recv: {:?}", str::from_utf8(&vec)); - - // And now write back the server 'foobar' protocol's - // response... - tokio::io::write_all(upgraded, b"bar=foo") - }) - .map(|_| println!("server[foobar] sent")) - .map_err(|e| eprintln!("server foobar io error: {}", e)) - }); - - rt::spawn(on_upgrade); - - - // Now return a 101 Response saying we agree to the upgrade to some - // made-up 'foobar' protocol. - *res.status_mut() = StatusCode::SWITCHING_PROTOCOLS; - res.headers_mut().insert(UPGRADE, HeaderValue::from_static("foobar")); - res -} - -fn main() { - // For this example, we just make a server and our own client to talk to - // it, so the exact port isn't important. Instead, let the OS give us an - // unused port. - let addr = ([127, 0, 0, 1], 0).into(); - - let server = Server::bind(&addr) - .serve(|| service_fn_ok(server_upgrade)); - - // We need the assigned address for the client to send it messages. - let addr = server.local_addr(); - - - // For this example, a oneshot is used to signal that after 1 request, - // the server should be shutdown. - let (tx, rx) = oneshot::channel(); - - let server = server - .map_err(|e| eprintln!("server error: {}", e)) - .select2(rx) - .then(|_| Ok(())); - - rt::run(rt::lazy(move || { - rt::spawn(server); - - let req = Request::builder() - .uri(format!("http://{}/", addr)) - .header(UPGRADE, "foobar") - .body(Body::empty()) - .unwrap(); - - Client::new() - .request(req) - .and_then(|res| { - if res.status() != StatusCode::SWITCHING_PROTOCOLS { - panic!("Our server didn't upgrade: {}", res.status()); - } - - res - .into_body() - .on_upgrade() - }) - .map_err(|e| eprintln!("client error: {}", e)) - .and_then(|upgraded| { - // We've gotten an upgraded connection that we can read - // and write directly on. Let's start out 'foobar' protocol. - tokio::io::write_all(upgraded, b"foo=bar") - .and_then(|(upgraded, _)| { - println!("client[foobar] sent"); - tokio::io::read_to_end(upgraded, Vec::new()) - }) - .map(|(_upgraded, vec)| { - println!("client[foobar] recv: {:?}", str::from_utf8(&vec)); - - - // Complete the oneshot so that the server stops - // listening and the process can close down. - let _ = tx.send(()); - }) - .map_err(|e| eprintln!("client foobar io error: {}", e)) - }) - })); -}