From d06596e21c8f7670f144e7d5fc83488d826e711c Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 19 Aug 2019 14:03:05 -0700 Subject: [PATCH] refactor(http2): re-enable http2 client and server support --- Cargo.toml | 2 +- benches/end_to_end.rs | 10 -- src/body/body.rs | 37 ++--- src/client/conn.rs | 84 +++-------- src/client/mod.rs | 8 +- src/proto/h2/client.rs | 308 ++++++++++++++++++++--------------------- src/proto/h2/mod.rs | 58 ++++---- src/proto/h2/server.rs | 165 +++++++++++----------- src/server/conn.rs | 10 +- src/server/mod.rs | 3 + src/server/shutdown.rs | 3 + 11 files changed, 321 insertions(+), 367 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7f0a6e31e7..272bcc1836 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,7 +29,7 @@ futures-util-preview = { version = "0.3.0-alpha.17" } http = "0.1.15" http-body = "0.1" httparse = "1.0" -h2 = "0.1.10" +h2 = { git = "https://github.com/hyperium/h2" } iovec = "0.1" itoa = "0.4.1" log = "0.4" diff --git a/benches/end_to_end.rs b/benches/end_to_end.rs index d16a9563e2..59679e3df8 100644 --- a/benches/end_to_end.rs +++ b/benches/end_to_end.rs @@ -64,18 +64,14 @@ fn http1_parallel_x10_req_10mb(b: &mut test::Bencher) { } #[bench] -#[ignore] fn http2_get(b: &mut test::Bencher) { - // FIXME: re-implement tests when `h2` upgrades to `async/await` opts() .http2() .bench(b) } #[bench] -#[ignore] fn http2_post(b: &mut test::Bencher) { - // FIXME: re-implement tests when `h2` upgrades to `async/await` opts() .http2() .method(Method::POST) @@ -84,9 +80,7 @@ fn http2_post(b: &mut test::Bencher) { } #[bench] -#[ignore] fn http2_req_100kb(b: &mut test::Bencher) { - // FIXME: re-implement tests when `h2` upgrades to `async/await` let body = &[b'x'; 1024 * 100]; opts() .http2() @@ -96,9 +90,7 @@ fn http2_req_100kb(b: &mut test::Bencher) { } #[bench] -#[ignore] fn http2_parallel_x10_empty(b: &mut test::Bencher) { - // FIXME: re-implement tests when `h2` upgrades to `async/await` opts() .http2() .parallel(10) @@ -106,9 +98,7 @@ fn http2_parallel_x10_empty(b: &mut test::Bencher) { } #[bench] -#[ignore] fn http2_parallel_x10_req_10mb(b: &mut test::Bencher) { - // FIXME: re-implement tests when `h2` upgrades to `async/await` let body = &[b'x'; 1024 * 1024 * 10]; opts() .http2() diff --git a/src/body/body.rs b/src/body/body.rs index 388dccb4a4..3671930cce 100644 --- a/src/body/body.rs +++ b/src/body/body.rs @@ -269,25 +269,17 @@ impl Body { } None => Poll::Ready(None), } - } + }, Kind::H2 { - /*recv: ref mut h2,*/ .. - } => { - unimplemented!("h2.poll_inner"); - /* - h2 - .poll() - .map(|r#async| { - r#async.map(|opt| { - opt.map(|bytes| { - let _ = h2.release_capacity().release_capacity(bytes.len()); - Chunk::from(bytes) - }) - }) - }) - .map_err(crate::Error::new_body) - */ - } + recv: ref mut h2, .. + } => match ready!(Pin::new(&mut *h2).poll_next(cx)) { + Some(Ok(bytes)) => { + let _ = h2.release_capacity().release_capacity(bytes.len()); + Poll::Ready(Some(Ok(Chunk::from(bytes)))) + }, + Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_body(e)))), + None => Poll::Ready(None), + }, Kind::Wrapped(ref mut s) => { match ready!(s.as_mut().poll_next(cx)) { Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))), @@ -314,11 +306,12 @@ impl Payload for Body { self.poll_eof(cx) } - fn poll_trailers(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll>> { + fn poll_trailers(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll>> { match self.kind { - Kind::H2 { /*recv: ref mut h2,*/ .. } => { - unimplemented!("h2.poll_trailers"); - //h2.poll_trailers().map_err(crate::Error::new_h2) + Kind::H2 { recv: ref mut h2, .. } => match ready!(h2.poll_trailers(cx)) { + Some(Ok(t)) => Poll::Ready(Some(Ok(t))), + Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_h2(e)))), + None => Poll::Ready(None), }, _ => Poll::Ready(None), } diff --git a/src/client/conn.rs b/src/client/conn.rs index e171f31aa0..203ff41608 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -32,18 +32,19 @@ type Http1Dispatcher = proto::dispatch::Dispatcher< >; type ConnEither = Either< Http1Dispatcher, - proto::h2::Client, + proto::h2::ClientTask, >; -/// Returns a `Handshake` future over some IO. +/// Returns a handshake future over some IO. /// /// This is a shortcut for `Builder::new().handshake(io)`. -pub fn handshake(io: T) -> Handshake +pub async fn handshake(io: T) -> crate::Result<(SendRequest, Connection)> where T: AsyncRead + AsyncWrite + Unpin + Send + 'static, { Builder::new() .handshake(io) + .await } /// The sender side of an established connection. @@ -68,7 +69,7 @@ where /// A builder to configure an HTTP connection. /// -/// After setting options, the builder is used to create a `Handshake` future. +/// After setting options, the builder is used to create a handshake future. #[derive(Clone, Debug)] pub struct Builder { pub(super) exec: Exec, @@ -80,16 +81,6 @@ pub struct Builder { h2_builder: h2::client::Builder, } -/// A future setting up HTTP over an IO object. -/// -/// If successful, yields a `(SendRequest, Connection)` pair. -#[must_use = "futures do nothing unless polled"] -pub struct Handshake { - builder: Builder, - io: Option, - _marker: PhantomData, -} - /// A future returned by `SendRequest::send_request`. /// /// Yields a `Response` if successful. @@ -334,7 +325,8 @@ impl Clone for Http2SendRequest { impl Connection where T: AsyncRead + AsyncWrite + Unpin + Send + 'static, - B: Payload + 'static, + B: Payload + Unpin + 'static, + B::Data: Unpin, { /// Return the inner IO object, and additional information. /// @@ -365,29 +357,20 @@ where /// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html) /// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html) /// to work with this function; or use the `without_shutdown` wrapper. - pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll> - where - B: Unpin, - { + pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll> { match self.inner.as_mut().expect("already upgraded") { &mut Either::Left(ref mut h1) => { h1.poll_without_shutdown(cx) }, &mut Either::Right(ref mut h2) => { - unimplemented!("h2 poll_without_shutdown"); - /* - h2.poll().map(|x| x.map(|_| ())) - */ + Pin::new(h2).poll(cx).map_ok(|_| ()) } } } /// Prevent shutdown of the underlying IO object at the end of service the request, /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`. - pub fn without_shutdown(self) -> impl Future>> - where - B: Unpin, - { + pub fn without_shutdown(self) -> impl Future>> { let mut conn = Some(self); future::poll_fn(move |cx| -> Poll>> { ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?; @@ -400,6 +383,7 @@ impl Future for Connection where T: AsyncRead + AsyncWrite + Unpin + Send + 'static, B: Payload + Unpin + 'static, + B::Data: Unpin, { type Output = crate::Result<()>; @@ -522,70 +506,46 @@ impl Builder { } /// Constructs a connection with the configured options and IO. - #[inline] - pub fn handshake(&self, io: T) -> Handshake + pub async fn handshake(self, io: T) -> crate::Result<(SendRequest, Connection)> where T: AsyncRead + AsyncWrite + Unpin + Send + 'static, B: Payload + 'static, + B::Data: Unpin, { trace!("client handshake HTTP/{}", if self.http2 { 2 } else { 1 }); - Handshake { - builder: self.clone(), - io: Some(io), - _marker: PhantomData, - } - } -} - -// ===== impl Handshake - -impl Future for Handshake -where - T: AsyncRead + AsyncWrite + Unpin + Send + 'static, - B: Payload + 'static, -{ - type Output = crate::Result<(SendRequest, Connection)>; - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - let io = self.io.take().expect("polled more than once"); let (tx, rx) = dispatch::channel(); - let either = if !self.builder.http2 { + let either = if !self.http2 { let mut conn = proto::Conn::new(io); - if !self.builder.h1_writev { + if !self.h1_writev { conn.set_write_strategy_flatten(); } - if self.builder.h1_title_case_headers { + if self.h1_title_case_headers { conn.set_title_case_headers(); } - if let Some(sz) = self.builder.h1_read_buf_exact_size { + if let Some(sz) = self.h1_read_buf_exact_size { conn.set_read_buf_exact_size(sz); } - if let Some(max) = self.builder.h1_max_buf_size { + if let Some(max) = self.h1_max_buf_size { conn.set_max_buf_size(max); } let cd = proto::h1::dispatch::Client::new(rx); let dispatch = proto::h1::Dispatcher::new(cd, conn); Either::Left(dispatch) } else { - let h2 = proto::h2::Client::new(io, rx, &self.builder.h2_builder, self.builder.exec.clone()); + let h2 = proto::h2::client::handshake(io, rx, &self.h2_builder, self.exec.clone()) + .await?; Either::Right(h2) }; - Poll::Ready(Ok(( + Ok(( SendRequest { dispatch: tx, }, Connection { inner: Some(either), }, - ))) - } -} - -impl fmt::Debug for Handshake { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("Handshake") - .finish() + )) } } diff --git a/src/client/mod.rs b/src/client/mod.rs index 6309d9999c..f2dc8cff4a 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -163,7 +163,7 @@ where C: Connect + Sync + 'static, C::Transport: 'static, C::Future: 'static, B: Payload + Unpin + Send + 'static, - B::Data: Send, + B::Data: Send + Unpin, { /// Send a `GET` request to the supplied `Uri`. /// @@ -512,8 +512,10 @@ where C: Connect + Sync + 'static, connecting }; let is_h2 = is_ver_h2 || connected.alpn == Alpn::H2; - Either::Left(conn_builder + Either::Left(Box::pin(conn_builder .http2_only(is_h2) + // TODO: convert client::conn::Builder to be by-value? + .clone() .handshake(io) .and_then(move |(tx, conn)| { trace!("handshake complete, spawning background dispatcher task"); @@ -541,7 +543,7 @@ where C: Connect + Sync + 'static, PoolTx::Http1(tx) }, }) - })) + }))) })) }) } diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index 9e01321c13..5a8e9d2668 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -1,8 +1,11 @@ use bytes::IntoBuf; -//use futures::{Async, Future, Poll, Stream}; +use futures_channel::{mpsc, oneshot}; +use futures_util::future::{self, FutureExt as _, Either}; +use futures_util::stream::StreamExt as _; +use futures_util::try_future::TryFutureExt as _; //use futures::future::{self, Either}; //use futures::sync::{mpsc, oneshot}; -use h2::client::{Builder, Handshake, SendRequest}; +use h2::client::{Builder, SendRequest}; use tokio_io::{AsyncRead, AsyncWrite}; use crate::headers::content_length_parse_all; @@ -14,192 +17,187 @@ use super::{PipeToSendStream, SendBuf}; use crate::{Body, Request, Response}; type ClientRx = crate::client::dispatch::Receiver, Response>; + ///// An mpsc channel is used to help notify the `Connection` task when *all* ///// other handles to it have been dropped, so that it can shutdown. -//type ConnDropRef = mpsc::Sender; +type ConnDropRef = mpsc::Sender; ///// A oneshot channel watches the `Connection` task, and when it completes, ///// the "dispatch" task will be notified and can shutdown sooner. -//type ConnEof = oneshot::Receiver; +type ConnEof = oneshot::Receiver; -pub(crate) struct Client +pub(crate) async fn handshake( + io: T, + req_rx: ClientRx, + builder: &Builder, + exec: Exec, +) -> crate::Result> where + T: AsyncRead + AsyncWrite + Send + Unpin + 'static, B: Payload, + B::Data: Unpin, { - executor: Exec, - rx: ClientRx, - state: State>, -} + let (h2_tx, conn) = builder + .handshake::<_, SendBuf>(io) + .await + .map_err(crate::Error::new_h2)?; + + // An mpsc channel is used entirely to detect when the + // 'Client' has been dropped. This is to get around a bug + // in h2 where dropping all SendRequests won't notify a + // parked Connection. + let (conn_drop_ref, rx) = mpsc::channel(1); + let (cancel_tx, conn_eof) = oneshot::channel(); + + let conn_drop_rx = rx.into_future() + .map(|(item, _rx)| { + match item { + Some(never) => match never {}, + None => (), + } + }); + + let conn = conn.map_err(|e| debug!("connection error: {}", e)); + + let conn_task = async move { + match future::select(conn, conn_drop_rx).await { + Either::Left(_) => { + // ok or err, the `conn` has finished + } + Either::Right(((), conn)) => { + // mpsc has been dropped, hopefully polling + // the connection some more should start shutdown + // and then close + trace!("send_request dropped, starting conn shutdown"); + drop(cancel_tx); + let _ = conn.await; + } + } + }; + + exec.execute(conn_task)?; -enum State where B: IntoBuf { - Handshaking(Handshake), - //Ready(SendRequest, ConnDropRef, ConnEof), + Ok(ClientTask { + conn_drop_ref, + conn_eof, + executor: exec, + h2_tx, + req_rx, + }) } -impl Client +pub(crate) struct ClientTask where - T: AsyncRead + AsyncWrite + Send + 'static, B: Payload, { - pub(crate) fn new(io: T, rx: ClientRx, builder: &Builder, exec: Exec) -> Client { - unimplemented!("proto::h2::Client::new"); - /* - let handshake = builder.handshake(io); - - Client { - executor: exec, - rx: rx, - state: State::Handshaking(handshake), - } - */ - } + conn_drop_ref: ConnDropRef, + conn_eof: ConnEof, + executor: Exec, + h2_tx: SendRequest>, + req_rx: ClientRx, } -impl Future for Client +impl Future for ClientTask where - T: AsyncRead + AsyncWrite + Send + 'static, - B: Payload + 'static, + B: Payload + Unpin + 'static, + B::Data: Unpin, { type Output = crate::Result; fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - unimplemented!("impl Future for proto::h2::Client"); - /* loop { - let next = match self.state { - State::Handshaking(ref mut h) => { - let (request_tx, conn) = try_ready!(h.poll().map_err(crate::Error::new_h2)); - // An mpsc channel is used entirely to detect when the - // 'Client' has been dropped. This is to get around a bug - // in h2 where dropping all SendRequests won't notify a - // parked Connection. - let (tx, rx) = mpsc::channel(0); - let (cancel_tx, cancel_rx) = oneshot::channel(); - let rx = rx.into_future() - .map(|(msg, _)| match msg { - Some(never) => match never {}, - None => (), - }) - .map_err(|_| -> Never { unreachable!("mpsc cannot error") }); - let fut = conn - .inspect(move |_| { - drop(cancel_tx); - trace!("connection complete") - }) - .map_err(|e| debug!("connection error: {}", e)) - .select2(rx) - .then(|res| match res { - Ok(Either::A(((), _))) | - Err(Either::A(((), _))) => { - // conn has finished either way - Either::A(future::ok(())) - }, - Ok(Either::B(((), conn))) => { - // mpsc has been dropped, hopefully polling - // the connection some more should start shutdown - // and then close - trace!("send_request dropped, starting conn shutdown"); - Either::B(conn) - } - Err(Either::B((never, _))) => match never {}, - }); - self.executor.execute(fut)?; - State::Ready(request_tx, tx, cancel_rx) - }, - State::Ready(ref mut tx, ref conn_dropper, ref mut cancel_rx) => { - match tx.poll_ready() { - Ok(Async::Ready(())) => (), - Ok(Async::NotReady) => return Ok(Async::NotReady), + match ready!(self.h2_tx.poll_ready(cx)) { + Ok(()) => (), + Err(err) => { + return if err.reason() == Some(::h2::Reason::NO_ERROR) { + trace!("connection gracefully shutdown"); + Poll::Ready(Ok(Dispatched::Shutdown)) + } else { + Poll::Ready(Err(crate::Error::new_h2(err))) + }; + } + }; + + match Pin::new(&mut self.req_rx).poll_next(cx) { + Poll::Ready(Some((req, cb))) => { + // check that future hasn't been canceled already + if cb.is_canceled() { + trace!("request callback is canceled"); + continue; + } + let (head, body) = req.into_parts(); + let mut req = ::http::Request::from_parts(head, ()); + super::strip_connection_headers(req.headers_mut(), true); + if let Some(len) = body.content_length() { + headers::set_content_length_if_missing(req.headers_mut(), len); + } + let eos = body.is_end_stream(); + let (fut, body_tx) = match self.h2_tx.send_request(req, eos) { + Ok(ok) => ok, Err(err) => { - return if err.reason() == Some(::h2::Reason::NO_ERROR) { - trace!("connection gracefully shutdown"); - Ok(Async::Ready(Dispatched::Shutdown)) - } else { - Err(crate::Error::new_h2(err)) - }; + debug!("client send request error: {}", err); + cb.send(Err((crate::Error::new_h2(err), None))); + continue; } - } - match self.rx.poll() { - Ok(Async::Ready(Some((req, cb)))) => { - // check that future hasn't been canceled already - if cb.is_canceled() { - trace!("request callback is canceled"); - continue; - } - let (head, body) = req.into_parts(); - let mut req = ::http::Request::from_parts(head, ()); - super::strip_connection_headers(req.headers_mut(), true); - if let Some(len) = body.content_length() { - headers::set_content_length_if_missing(req.headers_mut(), len); - } - let eos = body.is_end_stream(); - let (fut, body_tx) = match tx.send_request(req, eos) { - Ok(ok) => ok, - Err(err) => { - debug!("client send request error: {}", err); - cb.send(Err((crate::Error::new_h2(err), None))); - continue; - } - }; - if !eos { - let mut pipe = PipeToSendStream::new(body, body_tx) - .map_err(|e| debug!("client request body error: {}", e)); - - // eagerly see if the body pipe is ready and - // can thus skip allocating in the executor - match pipe.poll() { - Ok(Async::Ready(())) | Err(()) => (), - Ok(Async::NotReady) => { - let conn_drop_ref = conn_dropper.clone(); - let pipe = pipe.then(move |x| { - drop(conn_drop_ref); - x - }); - self.executor.execute(pipe)?; - } + }; + + if !eos { + let mut pipe = PipeToSendStream::new(body, body_tx) + .map(|res| { + if let Err(e) = res { + debug!("client request body error: {}", e); } + }); + + // eagerly see if the body pipe is ready and + // can thus skip allocating in the executor + match Pin::new(&mut pipe).poll(cx) { + Poll::Ready(_) => (), + Poll::Pending => { + let conn_drop_ref = self.conn_drop_ref.clone(); + let pipe = pipe.map(move |x| { + drop(conn_drop_ref); + x + }); + self.executor.execute(pipe)?; } + } + } - let fut = fut - .then(move |result| { - match result { - Ok(res) => { - let content_length = content_length_parse_all(res.headers()); - let res = res.map(|stream| - crate::Body::h2(stream, content_length)); - Ok(res) - }, - Err(err) => { - debug!("client response error: {}", err); - Err((crate::Error::new_h2(err), None)) - } - } - }); - self.executor.execute(cb.send_when(fut))?; - continue; - }, - - Ok(Async::NotReady) => { - match cancel_rx.poll() { - Ok(Async::Ready(never)) => match never {}, - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(_conn_is_eof) => { - trace!("connection task is closed, closing dispatch task"); - return Ok(Async::Ready(Dispatched::Shutdown)); + let fut = fut + .map(move |result| { + match result { + Ok(res) => { + let content_length = content_length_parse_all(res.headers()); + let res = res.map(|stream| + crate::Body::h2(stream, content_length)); + Ok(res) + }, + Err(err) => { + debug!("client response error: {}", err); + Err((crate::Error::new_h2(err), None)) } } - }, + }); + self.executor.execute(cb.send_when(fut))?; + continue; + }, + + Poll::Ready(None) => { + trace!("client::dispatch::Sender dropped"); + return Poll::Ready(Ok(Dispatched::Shutdown)); + } - Ok(Async::Ready(None)) => { - trace!("client::dispatch::Sender dropped"); - return Ok(Async::Ready(Dispatched::Shutdown)); - }, - Err(never) => match never {}, + Poll::Pending => { + match ready!(Pin::new(&mut self.conn_eof).poll(cx)) { + Ok(never) => match never {}, + Err(_conn_is_eof) => { + trace!("connection task is closed, closing dispatch task"); + return Poll::Ready(Ok(Dispatched::Shutdown)); + } } }, - }; - self.state = next; + } } - */ } } diff --git a/src/proto/h2/mod.rs b/src/proto/h2/mod.rs index 3cb9c4b9ea..696fb01596 100644 --- a/src/proto/h2/mod.rs +++ b/src/proto/h2/mod.rs @@ -1,5 +1,4 @@ use bytes::Buf; -//use futures::{Async, Future, Poll}; use h2::{SendStream}; use http::header::{ HeaderName, CONNECTION, PROXY_AUTHENTICATE, PROXY_AUTHORIZATION, TE, TRAILER, @@ -8,11 +7,12 @@ use http::header::{ use http::HeaderMap; use crate::body::Payload; +use crate::common::{Future, Pin, Poll, task}; -mod client; +pub(crate) mod client; pub(crate) mod server; -pub(crate) use self::client::Client; +pub(crate) use self::client::ClientTask; pub(crate) use self::server::Server; fn strip_connection_headers(headers: &mut HeaderMap, is_request: bool) { @@ -106,17 +106,13 @@ where } } -/* impl Future for PipeToSendStream where - S: Payload, + S: Payload + Unpin, { - type Item = (); - type Error = crate::Error; + type Output = crate::Result<()>; - fn poll(&mut self) -> Poll { - unimplemented!("impl Future for PipeToSendStream"); - /* + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { loop { if !self.data_done { // we don't have the next chunk of data yet, so just reserve 1 byte to make @@ -126,23 +122,25 @@ where if self.body_tx.capacity() == 0 { loop { - match try_ready!(self.body_tx.poll_capacity().map_err(crate::Error::new_body_write)) { - Some(0) => {} - Some(_) => break, - None => return Err(crate::Error::new_canceled()), + match ready!(self.body_tx.poll_capacity(cx)) { + + Some(Ok(0)) => {}, + Some(Ok(_)) => break, + Some(Err(e)) => return Poll::Ready(Err(crate::Error::new_body_write(e))) , + None => return Poll::Ready(Err(crate::Error::new_canceled())), } } } else { - if let Async::Ready(reason) = - self.body_tx.poll_reset().map_err(crate::Error::new_body_write)? + if let Poll::Ready(reason) = + self.body_tx.poll_reset(cx).map_err(crate::Error::new_body_write)? { debug!("stream received RST_STREAM: {:?}", reason); - return Err(crate::Error::new_body_write(::h2::Error::from(reason))); + return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from(reason)))); } } - match try_ready!(self.stream.poll_data().map_err(|e| self.on_user_err(e))) { - Some(chunk) => { + match ready!(Pin::new(&mut self.stream).poll_data(cx)) { + Some(Ok(chunk)) => { let is_eos = self.stream.is_end_stream(); trace!( "send body chunk: {} bytes, eos={}", @@ -156,14 +154,15 @@ where .map_err(crate::Error::new_body_write)?; if is_eos { - return Ok(Async::Ready(())); + return Poll::Ready(Ok(())); } } + Some(Err(e)) => return Poll::Ready(Err(self.on_user_err(e))), None => { self.body_tx.reserve_capacity(0); let is_eos = self.stream.is_end_stream(); if is_eos { - return self.send_eos_frame().map(Async::Ready); + return Poll::Ready(self.send_eos_frame()); } else { self.data_done = true; // loop again to poll_trailers @@ -171,31 +170,30 @@ where } } } else { - if let Async::Ready(reason) = - self.body_tx.poll_reset().map_err(|e| crate::Error::new_body_write(e))? + if let Poll::Ready(reason) = + self.body_tx.poll_reset(cx).map_err(|e| crate::Error::new_body_write(e))? { debug!("stream received RST_STREAM: {:?}", reason); - return Err(crate::Error::new_body_write(::h2::Error::from(reason))); + return Poll::Ready(Err(crate::Error::new_body_write(::h2::Error::from(reason)))); } - match try_ready!(self.stream.poll_trailers().map_err(|e| self.on_user_err(e))) { - Some(trailers) => { + match ready!(Pin::new(&mut self.stream).poll_trailers(cx)) { + Some(Ok(trailers)) => { self.body_tx .send_trailers(trailers) .map_err(crate::Error::new_body_write)?; - return Ok(Async::Ready(())); + return Poll::Ready(Ok(())); } + Some(Err(e)) => return Poll::Ready(Err(self.on_user_err(e))), None => { // There were no trailers, so send an empty DATA frame... - return self.send_eos_frame().map(Async::Ready); + return Poll::Ready(self.send_eos_frame()); } } } } - */ } } -*/ struct SendBuf(Option); diff --git a/src/proto/h2/server.rs b/src/proto/h2/server.rs index 00ce730b79..1367845458 100644 --- a/src/proto/h2/server.rs +++ b/src/proto/h2/server.rs @@ -1,5 +1,7 @@ use std::error::Error as StdError; +use std::marker::Unpin; +use futures_core::Stream; use h2::Reason; use h2::server::{Builder, Connection, Handshake, SendResponse}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -49,27 +51,23 @@ where impl Server where - T: AsyncRead + AsyncWrite, + T: AsyncRead + AsyncWrite + Unpin, S: Service, S::Error: Into>, B: Payload, + B::Data: Unpin, E: H2Exec, { pub(crate) fn new(io: T, service: S, builder: &Builder, exec: E) -> Server { - unimplemented!("proto::h2::Server::new") - /* let handshake = builder.handshake(io); Server { exec, state: State::Handshaking(handshake), service, } - */ } pub fn graceful_shutdown(&mut self) { - unimplemented!("proto::h2::Server::graceful_shutdown") - /* trace!("graceful_shutdown"); match self.state { State::Handshaking(..) => { @@ -86,54 +84,53 @@ where } } self.state = State::Closed; - */ } } impl Future for Server where - T: AsyncRead + AsyncWrite, + T: AsyncRead + AsyncWrite + Unpin, S: Service, S::Error: Into>, B: Payload, + B::Data: Unpin, E: H2Exec, { type Output = crate::Result; fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - unimplemented!("h2 server future") - /* + let me = &mut *self; loop { - let next = match self.state { + let next = match me.state { State::Handshaking(ref mut h) => { - let conn = try_ready!(h.poll().map_err(crate::Error::new_h2)); + let conn = ready!(Pin::new(h).poll(cx).map_err(crate::Error::new_h2))?; State::Serving(Serving { conn, closing: None, }) }, State::Serving(ref mut srv) => { - try_ready!(srv.poll_server(&mut self.service, &self.exec)); - return Ok(Async::Ready(Dispatched::Shutdown)); + ready!(srv.poll_server(cx, &mut me.service, &mut me.exec))?; + return Poll::Ready(Ok(Dispatched::Shutdown)); } State::Closed => { // graceful_shutdown was called before handshaking finished, // nothing to do here... - return Ok(Async::Ready(Dispatched::Shutdown)); + return Poll::Ready(Ok(Dispatched::Shutdown)); } }; - self.state = next; + me.state = next; } - */ } } impl Serving where - T: AsyncRead + AsyncWrite, + T: AsyncRead + AsyncWrite + Unpin, B: Payload, + B::Data: Unpin, { - fn poll_server(&mut self, service: &mut S, exec: &E) -> Poll> + fn poll_server(&mut self, cx: &mut task::Context<'_>, service: &mut S, exec: &mut E) -> Poll> where S: Service< ReqBody=Body, @@ -142,19 +139,18 @@ where S::Error: Into>, E: H2Exec, { - /* if self.closing.is_none() { loop { // At first, polls the readiness of supplied service. - match service.poll_ready() { - Ok(Async::Ready(())) => (), - Ok(Async::NotReady) => { + match service.poll_ready(cx) { + Poll::Ready(Ok(())) => (), + Poll::Pending => { // use `poll_close` instead of `poll`, in order to avoid accepting a request. - try_ready!(self.conn.poll_close().map_err(crate::Error::new_h2)); + ready!(self.conn.poll_close(cx).map_err(crate::Error::new_h2))?; trace!("incoming connection complete"); - return Ok(Async::Ready(())); + return Poll::Ready(Ok(())); } - Err(err) => { + Poll::Ready(Err(err)) => { let err = crate::Error::new_user_service(err); debug!("service closed: {}", err); @@ -173,29 +169,33 @@ where } // When the service is ready, accepts an incoming request. - if let Some((req, respond)) = try_ready!(self.conn.poll().map_err(crate::Error::new_h2)) { - trace!("incoming request"); - let content_length = content_length_parse_all(req.headers()); - let req = req.map(|stream| { - crate::Body::h2(stream, content_length) - }); - let fut = H2Stream::new(service.call(req), respond); - exec.execute_h2stream(fut)?; - } else { - // no more incoming streams... - trace!("incoming connection complete"); - return Ok(Async::Ready(())) + match ready!(Pin::new(&mut self.conn).poll_next(cx)) { + Some(Ok((req, respond))) => { + trace!("incoming request"); + let content_length = content_length_parse_all(req.headers()); + let req = req.map(|stream| { + crate::Body::h2(stream, content_length) + }); + let fut = H2Stream::new(service.call(req), respond); + exec.execute_h2stream(fut)?; + }, + Some(Err(e)) => { + return Poll::Ready(Err(crate::Error::new_h2(e))); + }, + None => { + // no more incoming streams... + trace!("incoming connection complete"); + return Poll::Ready(Ok(())); + }, } } } debug_assert!(self.closing.is_some(), "poll_server broke loop without closing"); - try_ready!(self.conn.poll_close().map_err(crate::Error::new_h2)); + ready!(self.conn.poll_close(cx).map_err(crate::Error::new_h2))?; - Err(self.closing.take().expect("polled after error")) - */ - unimplemented!("h2 server poll_server") + Poll::Ready(Err(self.closing.take().expect("polled after error"))) } } @@ -230,38 +230,37 @@ where } } -impl Future for H2Stream +impl H2Stream where - //F: Future>, - //F::Error: Into>, - B: Payload, + F: Future, E>>, + B: Payload + Unpin, + B::Data: Unpin, + E: Into>, { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - unimplemented!("impl Future for H2Stream"); - /* + fn poll2(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + // Safety: State::{Service, Body} futures are never moved + let me = unsafe { self.get_unchecked_mut() }; loop { - let next = match self.state { + let next = match me.state { H2StreamState::Service(ref mut h) => { - let res = match h.poll() { - Ok(Async::Ready(r)) => r, - Ok(Async::NotReady) => { - // Body is not yet ready, so we want to check if the client has sent a + let res = match unsafe { Pin::new_unchecked(h) }.poll(cx) { + Poll::Ready(Ok(r)) => r, + Poll::Pending => { + // Response is not yet ready, so we want to check if the client has sent a // RST_STREAM frame which would cancel the current request. - if let Async::Ready(reason) = - self.reply.poll_reset().map_err(|e| crate::Error::new_h2(e))? + if let Poll::Ready(reason) = + me.reply.poll_reset(cx).map_err(|e| crate::Error::new_h2(e))? { debug!("stream received RST_STREAM: {:?}", reason); - return Err(crate::Error::new_h2(reason.into())); + return Poll::Ready(Err(crate::Error::new_h2(reason.into()))); } - return Ok(Async::NotReady); + return Poll::Pending; } - Err(e) => { + Poll::Ready(Err(e)) => { let err = crate::Error::new_user_service(e); warn!("http2 service errored: {}", err); - self.reply.send_reset(err.h2_reason()); - return Err(err); + me.reply.send_reset(err.h2_reason()); + return Poll::Ready(Err(err)); }, }; @@ -278,12 +277,12 @@ where macro_rules! reply { ($eos:expr) => ({ - match self.reply.send_response(res, $eos) { + match me.reply.send_response(res, $eos) { Ok(tx) => tx, Err(e) => { debug!("send response error: {}", e); - self.reply.send_reset(Reason::INTERNAL_ERROR); - return Err(crate::Error::new_h2(e)); + me.reply.send_reset(Reason::INTERNAL_ERROR); + return Poll::Ready(Err(crate::Error::new_h2(e))); } } }) @@ -300,7 +299,7 @@ where body_tx .send_data(buf, true) .map_err(crate::Error::new_body_write)?; - return Ok(Async::Ready(())); + return Poll::Ready(Ok(())); } if !body.is_end_stream() { @@ -308,32 +307,32 @@ where H2StreamState::Body(PipeToSendStream::new(body, body_tx)) } else { reply!(true); - return Ok(Async::Ready(())); + return Poll::Ready(Ok(())); } }, H2StreamState::Body(ref mut pipe) => { - return pipe.poll(); + return Pin::new(pipe).poll(cx); } }; - self.state = next; + me.state = next; } - */ } } -/* -impl Future for H2Stream + +impl Future for H2Stream where - F: Future>, - F::Error: Into>, - B: Payload, + F: Future, E>>, + B: Payload + Unpin, + B::Data: Unpin, + E: Into>, { - type Item = (); - type Error = (); + type Output = (); - fn poll(&mut self) -> Poll { - self.poll2() - .map_err(|e| debug!("stream error: {}", e)) + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + self.poll2(cx).map(|res| { + if let Err(e) = res { + debug!("stream error: {}", e); + } + }) } } -*/ - diff --git a/src/server/conn.rs b/src/server/conn.rs index 95759be35e..11005d9cd0 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -358,6 +358,7 @@ impl Http { S: Service, S::Error: Into>, Bd: Payload, + Bd::Data: Unpin, I: AsyncRead + AsyncWrite + Unpin, E: H2Exec, { @@ -479,6 +480,7 @@ where S::Error: Into>, I: AsyncRead + AsyncWrite + Unpin, B: Payload + 'static, + B::Data: Unpin, E: H2Exec, { /// Start a graceful shutdown process for this connection. @@ -549,7 +551,7 @@ where loop { let polled = match *self.conn.as_mut().unwrap() { Either::A(ref mut h1) => h1.poll_without_shutdown(cx), - Either::B(ref mut h2) => unimplemented!("Connection::poll_without_shutdown h2"),//return h2.poll().map(|x| x.map(|_| ())), + Either::B(ref mut h2) => return Pin::new(h2).poll(cx).map_ok(|_| ()), }; match ready!(polled) { Ok(x) => return Poll::Ready(Ok(x)), @@ -629,6 +631,7 @@ where S::Error: Into>, I: AsyncRead + AsyncWrite + Unpin + 'static, B: Payload + 'static, + B::Data: Unpin, E: H2Exec, { type Output = crate::Result<()>; @@ -744,6 +747,7 @@ where F: Future>, S: Service, B: Payload, + B::Data: Unpin, E: H2Exec, { type Output = Result, FE>; @@ -852,6 +856,7 @@ pub(crate) mod spawn_all { where I: AsyncRead + AsyncWrite + Unpin + Send + 'static, S: Service + 'static, + ::Data: Unpin, E: H2Exec, { type Future = UpgradeableConnection; @@ -895,6 +900,7 @@ pub(crate) mod spawn_all { NE: Into>, S: Service, B: Payload, + B::Data: Unpin, E: H2Exec, W: Watcher, { @@ -960,6 +966,7 @@ mod upgrades { S::Error: Into>, I: AsyncRead + AsyncWrite + Unpin, B: Payload + 'static, + B::Data: Unpin, E: H2Exec, { /// Start a graceful shutdown process for this connection. @@ -977,6 +984,7 @@ mod upgrades { S::Error: Into>, I: AsyncRead + AsyncWrite + Unpin + Send + 'static, B: Payload + 'static, + B::Data: Unpin, E: super::H2Exec, { type Output = crate::Result<()>; diff --git a/src/server/mod.rs b/src/server/mod.rs index 42b1e3ed39..af341809a8 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -154,6 +154,7 @@ where S::Error: Into>, S::Service: 'static, B: Payload, + B::Data: Unpin, E: H2Exec<::Future, B>, E: NewSvcExec, { @@ -211,6 +212,7 @@ where S::Error: Into>, S::Service: 'static, B: Payload, + B::Data: Unpin, E: H2Exec<::Future, B>, E: NewSvcExec, { @@ -409,6 +411,7 @@ impl Builder { S::Error: Into>, S::Service: 'static, B: Payload, + B::Data: Unpin, E: NewSvcExec, E: H2Exec<::Future, B>, { diff --git a/src/server/shutdown.rs b/src/server/shutdown.rs index c8e1019844..f47a70427e 100644 --- a/src/server/shutdown.rs +++ b/src/server/shutdown.rs @@ -47,6 +47,7 @@ where S::Service: 'static, S::Error: Into>, B: Payload, + B::Data: Unpin, F: Future, E: H2Exec<::Future, B>, E: NewSvcExec, @@ -98,6 +99,7 @@ impl Watcher for GracefulWatcher where I: AsyncRead + AsyncWrite + Unpin + Send + 'static, S: Service + 'static, + ::Data: Unpin, E: H2Exec, { type Future = Watching, fn(Pin<&mut UpgradeableConnection>)>; @@ -116,6 +118,7 @@ where S::Error: Into>, I: AsyncRead + AsyncWrite + Unpin, S::ResBody: Payload + 'static, + ::Data: Unpin, E: H2Exec, { conn.graceful_shutdown()