Skip to content

Re-enable HTTP2 support #1906

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ futures-util-preview = { version = "0.3.0-alpha.17" }
http = "0.1.15"
http-body = "0.1"
httparse = "1.0"
h2 = "0.1.10"
h2 = { git = "https://github.com/hyperium/h2" }
iovec = "0.1"
itoa = "0.4.1"
log = "0.4"
Expand Down
10 changes: 0 additions & 10 deletions benches/end_to_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,14 @@ fn http1_parallel_x10_req_10mb(b: &mut test::Bencher) {
}

#[bench]
#[ignore]
fn http2_get(b: &mut test::Bencher) {
// FIXME: re-implement tests when `h2` upgrades to `async/await`
opts()
.http2()
.bench(b)
}

#[bench]
#[ignore]
fn http2_post(b: &mut test::Bencher) {
// FIXME: re-implement tests when `h2` upgrades to `async/await`
opts()
.http2()
.method(Method::POST)
Expand All @@ -84,9 +80,7 @@ fn http2_post(b: &mut test::Bencher) {
}

#[bench]
#[ignore]
fn http2_req_100kb(b: &mut test::Bencher) {
// FIXME: re-implement tests when `h2` upgrades to `async/await`
let body = &[b'x'; 1024 * 100];
opts()
.http2()
Expand All @@ -96,19 +90,15 @@ fn http2_req_100kb(b: &mut test::Bencher) {
}

#[bench]
#[ignore]
fn http2_parallel_x10_empty(b: &mut test::Bencher) {
// FIXME: re-implement tests when `h2` upgrades to `async/await`
opts()
.http2()
.parallel(10)
.bench(b)
}

#[bench]
#[ignore]
fn http2_parallel_x10_req_10mb(b: &mut test::Bencher) {
// FIXME: re-implement tests when `h2` upgrades to `async/await`
let body = &[b'x'; 1024 * 1024 * 10];
opts()
.http2()
Expand Down
37 changes: 15 additions & 22 deletions src/body/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,25 +269,17 @@ impl Body {
}
None => Poll::Ready(None),
}
}
},
Kind::H2 {
/*recv: ref mut h2,*/ ..
} => {
unimplemented!("h2.poll_inner");
/*
h2
.poll()
.map(|r#async| {
r#async.map(|opt| {
opt.map(|bytes| {
let _ = h2.release_capacity().release_capacity(bytes.len());
Chunk::from(bytes)
})
})
})
.map_err(crate::Error::new_body)
*/
}
recv: ref mut h2, ..
} => match ready!(Pin::new(&mut *h2).poll_next(cx)) {
Some(Ok(bytes)) => {
let _ = h2.release_capacity().release_capacity(bytes.len());
Poll::Ready(Some(Ok(Chunk::from(bytes))))
},
Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
None => Poll::Ready(None),
},
Kind::Wrapped(ref mut s) => {
match ready!(s.as_mut().poll_next(cx)) {
Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))),
Expand All @@ -314,11 +306,12 @@ impl Payload for Body {
self.poll_eof(cx)
}

fn poll_trailers(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Result<HeaderMap, Self::Error>>> {
fn poll_trailers(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Result<HeaderMap, Self::Error>>> {
match self.kind {
Kind::H2 { /*recv: ref mut h2,*/ .. } => {
unimplemented!("h2.poll_trailers");
//h2.poll_trailers().map_err(crate::Error::new_h2)
Kind::H2 { recv: ref mut h2, .. } => match ready!(h2.poll_trailers(cx)) {
Some(Ok(t)) => Poll::Ready(Some(Ok(t))),
Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_h2(e)))),
None => Poll::Ready(None),
},
_ => Poll::Ready(None),
}
Expand Down
84 changes: 22 additions & 62 deletions src/client/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,19 @@ type Http1Dispatcher<T, B, R> = proto::dispatch::Dispatcher<
>;
type ConnEither<T, B> = Either<
Http1Dispatcher<T, B, proto::h1::ClientTransaction>,
proto::h2::Client<T, B>,
proto::h2::ClientTask<B>,
>;

/// Returns a `Handshake` future over some IO.
/// Returns a handshake future over some IO.
///
/// This is a shortcut for `Builder::new().handshake(io)`.
pub fn handshake<T>(io: T) -> Handshake<T, crate::Body>
pub async fn handshake<T>(io: T) -> crate::Result<(SendRequest<crate::Body>, Connection<T, crate::Body>)>
where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
Builder::new()
.handshake(io)
.await
}

/// The sender side of an established connection.
Expand All @@ -68,7 +69,7 @@ where

/// A builder to configure an HTTP connection.
///
/// After setting options, the builder is used to create a `Handshake` future.
/// After setting options, the builder is used to create a handshake future.
#[derive(Clone, Debug)]
pub struct Builder {
pub(super) exec: Exec,
Expand All @@ -80,16 +81,6 @@ pub struct Builder {
h2_builder: h2::client::Builder,
}

/// A future setting up HTTP over an IO object.
///
/// If successful, yields a `(SendRequest, Connection)` pair.
#[must_use = "futures do nothing unless polled"]
pub struct Handshake<T, B> {
builder: Builder,
io: Option<T>,
_marker: PhantomData<fn(B)>,
}

/// A future returned by `SendRequest::send_request`.
///
/// Yields a `Response` if successful.
Expand Down Expand Up @@ -334,7 +325,8 @@ impl<B> Clone for Http2SendRequest<B> {
impl<T, B> Connection<T, B>
where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
B: Payload + 'static,
B: Payload + Unpin + 'static,
B::Data: Unpin,
{
/// Return the inner IO object, and additional information.
///
Expand Down Expand Up @@ -365,29 +357,20 @@ where
/// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html)
/// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html)
/// to work with this function; or use the `without_shutdown` wrapper.
pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>
where
B: Unpin,
{
pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
match self.inner.as_mut().expect("already upgraded") {
&mut Either::Left(ref mut h1) => {
h1.poll_without_shutdown(cx)
},
&mut Either::Right(ref mut h2) => {
unimplemented!("h2 poll_without_shutdown");
/*
h2.poll().map(|x| x.map(|_| ()))
*/
Pin::new(h2).poll(cx).map_ok(|_| ())
}
}
}

/// Prevent shutdown of the underlying IO object at the end of service the request,
/// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
pub fn without_shutdown(self) -> impl Future<Output=crate::Result<Parts<T>>>
where
B: Unpin,
{
pub fn without_shutdown(self) -> impl Future<Output=crate::Result<Parts<T>>> {
let mut conn = Some(self);
future::poll_fn(move |cx| -> Poll<crate::Result<Parts<T>>> {
ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?;
Expand All @@ -400,6 +383,7 @@ impl<T, B> Future for Connection<T, B>
where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
B: Payload + Unpin + 'static,
B::Data: Unpin,
{
type Output = crate::Result<()>;

Expand Down Expand Up @@ -522,70 +506,46 @@ impl Builder {
}

/// Constructs a connection with the configured options and IO.
#[inline]
pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B>
pub async fn handshake<T, B>(self, io: T) -> crate::Result<(SendRequest<B>, Connection<T, B>)>
where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
B: Payload + 'static,
B::Data: Unpin,
{
trace!("client handshake HTTP/{}", if self.http2 { 2 } else { 1 });
Handshake {
builder: self.clone(),
io: Some(io),
_marker: PhantomData,
}
}
}

// ===== impl Handshake

impl<T, B> Future for Handshake<T, B>
where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
B: Payload + 'static,
{
type Output = crate::Result<(SendRequest<B>, Connection<T, B>)>;

fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let io = self.io.take().expect("polled more than once");
let (tx, rx) = dispatch::channel();
let either = if !self.builder.http2 {
let either = if !self.http2 {
let mut conn = proto::Conn::new(io);
if !self.builder.h1_writev {
if !self.h1_writev {
conn.set_write_strategy_flatten();
}
if self.builder.h1_title_case_headers {
if self.h1_title_case_headers {
conn.set_title_case_headers();
}
if let Some(sz) = self.builder.h1_read_buf_exact_size {
if let Some(sz) = self.h1_read_buf_exact_size {
conn.set_read_buf_exact_size(sz);
}
if let Some(max) = self.builder.h1_max_buf_size {
if let Some(max) = self.h1_max_buf_size {
conn.set_max_buf_size(max);
}
let cd = proto::h1::dispatch::Client::new(rx);
let dispatch = proto::h1::Dispatcher::new(cd, conn);
Either::Left(dispatch)
} else {
let h2 = proto::h2::Client::new(io, rx, &self.builder.h2_builder, self.builder.exec.clone());
let h2 = proto::h2::client::handshake(io, rx, &self.h2_builder, self.exec.clone())
.await?;
Either::Right(h2)
};

Poll::Ready(Ok((
Ok((
SendRequest {
dispatch: tx,
},
Connection {
inner: Some(either),
},
)))
}
}

impl<T, B> fmt::Debug for Handshake<T, B> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Handshake")
.finish()
))
}
}

Expand Down
8 changes: 5 additions & 3 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ where C: Connect + Sync + 'static,
C::Transport: 'static,
C::Future: 'static,
B: Payload + Unpin + Send + 'static,
B::Data: Send,
B::Data: Send + Unpin,
{
/// Send a `GET` request to the supplied `Uri`.
///
Expand Down Expand Up @@ -512,8 +512,10 @@ where C: Connect + Sync + 'static,
connecting
};
let is_h2 = is_ver_h2 || connected.alpn == Alpn::H2;
Either::Left(conn_builder
Either::Left(Box::pin(conn_builder
.http2_only(is_h2)
// TODO: convert client::conn::Builder to be by-value?
.clone()
.handshake(io)
.and_then(move |(tx, conn)| {
trace!("handshake complete, spawning background dispatcher task");
Expand Down Expand Up @@ -541,7 +543,7 @@ where C: Connect + Sync + 'static,
PoolTx::Http1(tx)
},
})
}))
})))
}))
})
}
Expand Down
Loading