Skip to content

Commit c353d98

Browse files
committed
docs(examples): update upgrade example to async/await
1 parent 3984682 commit c353d98

File tree

2 files changed

+154
-127
lines changed

2 files changed

+154
-127
lines changed

examples/upgrades.rs

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
#![feature(async_await)]
2+
#![deny(warnings)]
3+
4+
// Note: `hyper::upgrade` docs link to this upgrade.
5+
use std::str;
6+
7+
use tokio::sync::oneshot;
8+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
9+
10+
use hyper::{Body, Client, Request, Response, Server, StatusCode};
11+
use hyper::header::{UPGRADE, HeaderValue};
12+
use hyper::service::{make_service_fn, service_fn};
13+
use hyper::upgrade::Upgraded;
14+
use std::net::SocketAddr;
15+
16+
// A simple type alias so as to DRY.
17+
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
18+
19+
/// Handle server-side I/O after HTTP upgraded.
20+
async fn server_upgraded_io(mut upgraded: Upgraded) -> Result<()> {
21+
// we have an upgraded connection that we can read and
22+
// write on directly.
23+
//
24+
// since we completely control this example, we know exactly
25+
// how many bytes the client will write, so just read exact...
26+
let mut vec = vec![0; 7];
27+
upgraded.read_exact(&mut vec).await?;
28+
println!("server[foobar] recv: {:?}", str::from_utf8(&vec));
29+
30+
// and now write back the server 'foobar' protocol's
31+
// response...
32+
upgraded.write_all(b"barr=foo").await?;
33+
println!("server[foobar] sent");
34+
Ok(())
35+
}
36+
37+
/// Our server HTTP handler to initiate HTTP upgrades.
38+
async fn server_upgrade(req: Request<Body>) -> Result<Response<Body>> {
39+
let mut res = Response::new(Body::empty());
40+
41+
// Send a 400 to any request that doesn't have
42+
// an `Upgrade` header.
43+
if !req.headers().contains_key(UPGRADE) {
44+
*res.status_mut() = StatusCode::BAD_REQUEST;
45+
return Ok(res);
46+
}
47+
48+
// Setup a future that will eventually receive the upgraded
49+
// connection and talk a new protocol, and spawn the future
50+
// into the runtime.
51+
//
52+
// Note: This can't possibly be fulfilled until the 101 response
53+
// is returned below, so it's better to spawn this future instead
54+
// waiting for it to complete to then return a response.
55+
hyper::rt::spawn(async move {
56+
match req.into_body().on_upgrade().await {
57+
Ok(upgraded) => {
58+
if let Err(e) = server_upgraded_io(upgraded).await {
59+
eprintln!("server foobar io error: {}", e)
60+
};
61+
}
62+
Err(e) => eprintln!("upgrade error: {}", e)
63+
}
64+
});
65+
66+
// Now return a 101 Response saying we agree to the upgrade to some
67+
// made-up 'foobar' protocol.
68+
*res.status_mut() = StatusCode::SWITCHING_PROTOCOLS;
69+
res.headers_mut().insert(UPGRADE, HeaderValue::from_static("foobar"));
70+
Ok(res)
71+
}
72+
73+
/// Handle client-side I/O after HTTP upgraded.
74+
async fn client_upgraded_io(mut upgraded: Upgraded) -> Result<()> {
75+
// We've gotten an upgraded connection that we can read
76+
// and write directly on. Let's start out 'foobar' protocol.
77+
upgraded.write_all(b"foo=bar").await?;
78+
println!("client[foobar] sent");
79+
80+
let mut vec = Vec::new();
81+
upgraded.read_to_end(&mut vec).await?;
82+
println!("client[foobar] recv: {:?}", str::from_utf8(&vec));
83+
84+
Ok(())
85+
}
86+
87+
/// Our client HTTP handler to initiate HTTP upgrades.
88+
async fn client_upgrade_request(addr: SocketAddr) -> Result<()> {
89+
let req = Request::builder()
90+
.uri(format!("http://{}/", addr))
91+
.header(UPGRADE, "foobar")
92+
.body(Body::empty())
93+
.unwrap();
94+
95+
let res = Client::new().request(req).await?;
96+
if res.status() != StatusCode::SWITCHING_PROTOCOLS {
97+
panic!("Our server didn't upgrade: {}", res.status());
98+
}
99+
100+
match res.into_body().on_upgrade().await {
101+
Ok(upgraded) => {
102+
if let Err(e) = client_upgraded_io(upgraded).await {
103+
eprintln!("client foobar io error: {}", e)
104+
};
105+
}
106+
Err(e) => eprintln!("upgrade error: {}", e)
107+
}
108+
109+
Ok(())
110+
}
111+
112+
#[hyper::rt::main]
113+
async fn main() {
114+
// For this example, we just make a server and our own client to talk to
115+
// it, so the exact port isn't important. Instead, let the OS give us an
116+
// unused port.
117+
let addr = ([127, 0, 0, 1], 0).into();
118+
119+
let make_service = make_service_fn(|_| async {
120+
Ok::<_, hyper::Error>(service_fn(server_upgrade))
121+
});
122+
123+
let server = Server::bind(&addr)
124+
.serve(make_service);
125+
126+
// We need the assigned address for the client to send it messages.
127+
let addr = server.local_addr();
128+
129+
// For this example, a oneshot is used to signal that after 1 request,
130+
// the server should be shutdown.
131+
let (tx, rx) = oneshot::channel::<()>();
132+
let server = server
133+
.with_graceful_shutdown(async {
134+
rx.await.ok();
135+
});
136+
137+
// Spawn server on the default executor,
138+
// which is usually a thread-pool from tokio default runtime.
139+
hyper::rt::spawn(async {
140+
if let Err(e) = server.await {
141+
eprintln!("server error: {}", e);
142+
}
143+
});
144+
145+
// Client requests a HTTP connection upgrade.
146+
let request = client_upgrade_request(addr.clone());
147+
if let Err(e) = request.await {
148+
eprintln!("client error: {}", e);
149+
}
150+
151+
// Complete the oneshot so that the server stops
152+
// listening and the process can close down.
153+
let _ = tx.send(());
154+
}

examples_disabled/upgrades.rs

Lines changed: 0 additions & 127 deletions
This file was deleted.

0 commit comments

Comments
 (0)