Skip to content

Commit f41a0b2

Browse files
committed
Bring Body::channel implementation from Hyper
Given the lack of a public implementation anymore, we don't have other choice but copying Hyper's private implementation. Signed-off-by: David Calavera <[email protected]>
1 parent 8136c94 commit f41a0b2

File tree

11 files changed

+363
-84
lines changed

11 files changed

+363
-84
lines changed

examples/basic-streaming-response/Cargo.toml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,6 @@ edition = "2021"
66
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
77

88
[dependencies]
9-
hyper = { version = "0.14", features = [
10-
"http1",
11-
"client",
12-
"stream",
13-
] }
149
lambda_runtime = { path = "../../lambda-runtime" }
1510
tokio = { version = "1", features = ["macros"] }
1611
tracing = { version = "0.1", features = ["log"] }

examples/basic-streaming-response/src/main.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
use hyper::body::Body;
2-
use lambda_runtime::{service_fn, Error, LambdaEvent, StreamResponse};
1+
use lambda_runtime::{body::Body, service_fn, Error, LambdaEvent, StreamResponse};
32
use serde_json::Value;
43
use std::{thread, time::Duration};
54

lambda-http/Cargo.toml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,22 @@ alb = []
2525
[dependencies]
2626
base64 = "0.21"
2727
bytes = { workspace = true }
28+
encoding_rs = "0.8"
2829
futures = { workspace = true }
2930
futures-util = { workspace = true }
3031
http = { workspace = true }
3132
http-body = { workspace = true }
3233
http-body-util = { workspace = true }
3334
hyper = { workspace = true }
3435
lambda_runtime = { path = "../lambda-runtime", version = "0.8.3" }
36+
mime = "0.3"
37+
percent-encoding = "2.2"
38+
pin-project-lite = { workspace = true }
3539
serde = { version = "1.0", features = ["derive"] }
3640
serde_json = "1.0"
3741
serde_urlencoded = "0.7"
3842
tokio-stream = "0.1.2"
39-
mime = "0.3"
40-
encoding_rs = "0.8"
4143
url = "2.2"
42-
percent-encoding = "2.2"
4344

4445
[dependencies.aws_lambda_events]
4546
path = "../lambda-events"

lambda-http/src/streaming.rs

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -63,19 +63,11 @@ where
6363
lambda_runtime::run(svc).await
6464
}
6565

66+
pin_project_lite::pin_project! {
6667
pub struct BodyStream<B> {
68+
#[pin]
6769
pub(crate) body: B,
6870
}
69-
70-
impl<B> BodyStream<B>
71-
where
72-
B: Body + Unpin + Send + 'static,
73-
B::Data: Into<Bytes> + Send,
74-
B::Error: Into<Error> + Send + Debug,
75-
{
76-
fn project(self: Pin<&mut Self>) -> Pin<&mut B> {
77-
unsafe { self.map_unchecked_mut(|s| &mut s.body) }
78-
}
7971
}
8072

8173
impl<B> Stream for BodyStream<B>
@@ -86,13 +78,16 @@ where
8678
{
8779
type Item = Result<B::Data, B::Error>;
8880

89-
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
90-
match futures_util::ready!(self.project().poll_frame(cx)?) {
91-
Some(frame) => match frame.into_data() {
92-
Ok(data) => Poll::Ready(Some(Ok(data))),
93-
Err(_frame) => Poll::Ready(None),
94-
},
95-
None => Poll::Ready(None),
81+
#[inline]
82+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
83+
loop {
84+
match futures_util::ready!(self.as_mut().project().body.poll_frame(cx)?) {
85+
Some(frame) => match frame.into_data() {
86+
Ok(data) => return Poll::Ready(Some(Ok(data))),
87+
Err(_frame) => {}
88+
},
89+
None => return Poll::Ready(None),
90+
}
9691
}
9792
}
9893
}

lambda-runtime-api-client/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ hyper-util = { workspace = true, features = [
2727
"http1",
2828
"tokio",
2929
] }
30-
pin-project-lite = "0.2"
3130
sync_wrapper = "0.1.2"
3231
tower = { workspace = true, features = ["util"] }
3332
tower-service = { workspace = true }
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
//! Body::channel utilities. Extracted from Hyper under MIT license.
2+
//! https://github.com/hyperium/hyper/blob/master/LICENSE
3+
4+
use std::pin::Pin;
5+
use std::task::Context;
6+
use std::task::Poll;
7+
8+
use crate::body::{sender, watch};
9+
use bytes::Bytes;
10+
use futures_channel::mpsc;
11+
use futures_channel::oneshot;
12+
use futures_util::{stream::FusedStream, Future, Stream};
13+
use http::HeaderMap;
14+
use http_body::Body;
15+
use http_body::Frame;
16+
use http_body::SizeHint;
17+
pub use sender::Sender;
18+
19+
#[derive(Clone, Copy, PartialEq, Eq)]
20+
pub(crate) struct DecodedLength(u64);
21+
22+
impl DecodedLength {
23+
pub(crate) const CLOSE_DELIMITED: DecodedLength = DecodedLength(::std::u64::MAX);
24+
pub(crate) const CHUNKED: DecodedLength = DecodedLength(::std::u64::MAX - 1);
25+
pub(crate) const ZERO: DecodedLength = DecodedLength(0);
26+
27+
pub(crate) fn sub_if(&mut self, amt: u64) {
28+
match *self {
29+
DecodedLength::CHUNKED | DecodedLength::CLOSE_DELIMITED => (),
30+
DecodedLength(ref mut known) => {
31+
*known -= amt;
32+
}
33+
}
34+
}
35+
36+
/// Converts to an Option<u64> representing a Known or Unknown length.
37+
pub(crate) fn into_opt(self) -> Option<u64> {
38+
match self {
39+
DecodedLength::CHUNKED | DecodedLength::CLOSE_DELIMITED => None,
40+
DecodedLength(known) => Some(known),
41+
}
42+
}
43+
}
44+
45+
pub struct ChannelBody {
46+
content_length: DecodedLength,
47+
want_tx: watch::Sender,
48+
data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
49+
trailers_rx: oneshot::Receiver<HeaderMap>,
50+
}
51+
52+
pub fn channel() -> (Sender, ChannelBody) {
53+
let (data_tx, data_rx) = mpsc::channel(0);
54+
let (trailers_tx, trailers_rx) = oneshot::channel();
55+
56+
let (want_tx, want_rx) = watch::channel(sender::WANT_READY);
57+
58+
let tx = Sender {
59+
want_rx,
60+
data_tx,
61+
trailers_tx: Some(trailers_tx),
62+
};
63+
let rx = ChannelBody {
64+
content_length: DecodedLength::CHUNKED,
65+
want_tx,
66+
data_rx,
67+
trailers_rx,
68+
};
69+
70+
(tx, rx)
71+
}
72+
73+
impl Body for ChannelBody {
74+
type Data = Bytes;
75+
type Error = crate::Error;
76+
77+
fn poll_frame(
78+
mut self: Pin<&mut Self>,
79+
cx: &mut Context<'_>,
80+
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
81+
self.want_tx.send(sender::WANT_READY);
82+
83+
if !self.data_rx.is_terminated() {
84+
if let Some(chunk) = ready!(Pin::new(&mut self.data_rx).poll_next(cx)?) {
85+
self.content_length.sub_if(chunk.len() as u64);
86+
return Poll::Ready(Some(Ok(Frame::data(chunk))));
87+
}
88+
}
89+
90+
// check trailers after data is terminated
91+
match ready!(Pin::new(&mut self.trailers_rx).poll(cx)) {
92+
Ok(t) => Poll::Ready(Some(Ok(Frame::trailers(t)))),
93+
Err(_) => Poll::Ready(None),
94+
}
95+
}
96+
97+
fn is_end_stream(&self) -> bool {
98+
self.content_length == DecodedLength::ZERO
99+
}
100+
101+
fn size_hint(&self) -> SizeHint {
102+
let mut hint = SizeHint::default();
103+
104+
if let Some(content_length) = self.content_length.into_opt() {
105+
hint.set_exact(content_length);
106+
}
107+
108+
hint
109+
}
110+
}

lambda-runtime-api-client/src/body.rs renamed to lambda-runtime-api-client/src/body/mod.rs

Lines changed: 31 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,26 @@
33
44
use crate::{BoxError, Error};
55
use bytes::Bytes;
6-
use futures_channel::mpsc::{self, Sender};
76
use futures_util::stream::Stream;
8-
use futures_util::TryStream;
97
use http_body::{Body as _, Frame};
108
use http_body_util::{BodyExt, Collected};
11-
use pin_project_lite::pin_project;
129
use std::pin::Pin;
1310
use std::task::{Context, Poll};
14-
use sync_wrapper::SyncWrapper;
11+
12+
use self::channel::Sender;
13+
14+
macro_rules! ready {
15+
($e:expr) => {
16+
match $e {
17+
std::task::Poll::Ready(v) => v,
18+
std::task::Poll::Pending => return std::task::Poll::Pending,
19+
}
20+
};
21+
}
22+
23+
mod channel;
24+
mod sender;
25+
mod watch;
1526

1627
type BoxBody = http_body_util::combinators::UnsyncBoxBody<Bytes, Error>;
1728

@@ -56,24 +67,24 @@ impl Body {
5667
}
5768

5869
/// Create a new `Body` stream with associated Sender half.
59-
pub fn channel() -> (Sender<Result<Bytes, BoxError>>, Body) {
60-
let (sender, recv) = mpsc::channel::<Result<Bytes, BoxError>>(0);
61-
(sender, Self::from_stream(recv))
70+
pub fn channel() -> (Sender, Body) {
71+
let (sender, body) = channel::channel();
72+
(sender, Body::new(body))
6273
}
6374

64-
/// Create a new `Body` from a [`Stream`].
65-
///
66-
/// [`Stream`]: https://docs.rs/futures-core/latest/futures_core/stream/trait.Stream.html
67-
pub fn from_stream<S>(stream: S) -> Self
68-
where
69-
S: TryStream + Send + 'static,
70-
S::Ok: Into<Bytes>,
71-
S::Error: Into<BoxError>,
72-
{
73-
Self::new(StreamBody {
74-
stream: SyncWrapper::new(stream),
75-
})
76-
}
75+
// /// Create a new `Body` from a [`Stream`].
76+
// ///
77+
// /// [`Stream`]: https://docs.rs/futures-core/latest/futures_core/stream/trait.Stream.html
78+
// pub fn from_stream<S>(stream: S) -> Self
79+
// where
80+
// S: TryStream + Send + 'static,
81+
// S::Ok: Into<Bytes>,
82+
// S::Error: Into<BoxError>,
83+
// {
84+
// Self::new(StreamBody {
85+
// stream: SyncWrapper::new(stream),
86+
// })
87+
// }
7788

7889
/// Collect the body into `Bytes`
7990
pub async fn collect(self) -> Result<Collected<Bytes>, Error> {
@@ -146,35 +157,3 @@ impl Stream for Body {
146157
}
147158
}
148159
}
149-
150-
pin_project! {
151-
struct StreamBody<S> {
152-
#[pin]
153-
stream: SyncWrapper<S>,
154-
}
155-
}
156-
157-
impl<S> http_body::Body for StreamBody<S>
158-
where
159-
S: TryStream,
160-
S::Ok: Into<Bytes>,
161-
S::Error: Into<BoxError>,
162-
{
163-
type Data = Bytes;
164-
type Error = Error;
165-
166-
fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
167-
let stream = self.project().stream.get_pin_mut();
168-
match futures_util::ready!(stream.try_poll_next(cx)) {
169-
Some(Ok(chunk)) => Poll::Ready(Some(Ok(Frame::data(chunk.into())))),
170-
Some(Err(err)) => Poll::Ready(Some(Err(Error::new(err)))),
171-
None => Poll::Ready(None),
172-
}
173-
}
174-
}
175-
176-
#[test]
177-
fn test_try_downcast() {
178-
assert_eq!(try_downcast::<i32, _>(5_u32), Err(5_u32));
179-
assert_eq!(try_downcast::<i32, _>(5_i32), Ok(5_i32));
180-
}

0 commit comments

Comments
 (0)