Skip to content

Commit 71da8d1

Browse files
committed
Start hooking up sending data
1 parent 6053ee0 commit 71da8d1

File tree

6 files changed

+76
-18
lines changed

6 files changed

+76
-18
lines changed

src/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ impl<B: IntoBuf> Stream<B> {
160160
pub fn send_data(&mut self, data: B, end_of_stream: bool)
161161
-> Result<(), ConnectionError>
162162
{
163-
unimplemented!();
163+
self.inner.send_data(data.into_buf(), end_of_stream)
164164
}
165165

166166
/// Send trailers

src/proto/streams/recv.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ impl<P, B> Recv<P, B>
7373
// Increment the number of remote initiated streams
7474
self.num_streams += 1;
7575

76-
Ok(Some(Stream::new()))
76+
Ok(Some(Stream::new(id)))
7777
}
7878

7979
/// Transition the stream state based on receiving headers

src/proto/streams/send.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ impl<P, B> Send<P, B>
6767
/// Update state reflecting a new, locally opened stream
6868
///
6969
/// Returns the stream state if successful. `None` if refused
70-
pub fn open(&mut self) -> Result<(StreamId, Stream<B>), ConnectionError> {
70+
pub fn open(&mut self) -> Result<Stream<B>, ConnectionError> {
7171
try!(self.ensure_can_open());
7272

7373
if let Some(max) = self.max_streams {
@@ -76,7 +76,7 @@ impl<P, B> Send<P, B>
7676
}
7777
}
7878

79-
let ret = (self.next_stream_id, Stream::new());
79+
let ret = Stream::new(self.next_stream_id);
8080

8181
// Increment the number of locally initiated streams
8282
self.num_streams += 1;
@@ -106,8 +106,8 @@ impl<P, B> Send<P, B>
106106
}
107107

108108
pub fn send_data(&mut self,
109-
frame: &frame::Data<B>,
110-
stream: &mut Stream<B>)
109+
frame: frame::Data<B>,
110+
stream: &mut store::Ptr<B>)
111111
-> Result<(), ConnectionError>
112112
{
113113
let sz = frame.payload().remaining();
@@ -148,6 +148,8 @@ impl<P, B> Send<P, B>
148148
try!(stream.state.send_close());
149149
}
150150

151+
self.prioritize.queue_frame(frame.into(), stream);
152+
151153
Ok(())
152154
}
153155

src/proto/streams/stream.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ use super::*;
22

33
#[derive(Debug)]
44
pub(super) struct Stream<B> {
5+
/// The h2 stream identifier
6+
pub id: StreamId,
7+
58
/// Current state of the stream
69
pub state: State,
710

@@ -22,8 +25,9 @@ pub(super) struct Stream<B> {
2225
}
2326

2427
impl<B> Stream<B> {
25-
pub fn new() -> Stream<B> {
28+
pub fn new(id: StreamId) -> Stream<B> {
2629
Stream {
30+
id,
2731
state: State::default(),
2832
pending_recv: buffer::Deque::new(),
2933
recv_task: None,

src/proto/streams/streams.rs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ impl<P, B> Streams<P, B>
200200
*/
201201
}
202202

203+
/*
203204
pub fn send_data(&mut self, frame: &frame::Data<B>)
204205
-> Result<(), ConnectionError>
205206
{
@@ -222,6 +223,7 @@ impl<P, B> Streams<P, B>
222223
223224
Ok(())
224225
}
226+
*/
225227

226228
pub fn poll_window_update(&mut self)
227229
-> Poll<WindowUpdate, ConnectionError>
@@ -290,13 +292,13 @@ impl<B> Streams<client::Peer, B>
290292
let me = &mut *me;
291293

292294
// Initialize a new stream. This fails if the connection is at capacity.
293-
let (id, mut stream) = me.actions.send.open()?;
295+
let mut stream = me.actions.send.open()?;
294296

295297
// Convert the message
296298
let headers = client::Peer::convert_send_message(
297-
id, request, end_of_stream);
299+
stream.id, request, end_of_stream);
298300

299-
let mut stream = me.store.insert(id, stream);
301+
let mut stream = me.store.insert(stream.id, stream);
300302

301303
me.actions.send.send_headers(headers, &mut stream)?;
302304

@@ -320,6 +322,27 @@ impl<P, B> StreamRef<P, B>
320322
where P: Peer,
321323
B: Buf,
322324
{
325+
pub fn send_data(&mut self, data: B, end_of_stream: bool)
326+
-> Result<(), ConnectionError>
327+
{
328+
let mut me = self.inner.lock().unwrap();
329+
let me = &mut *me;
330+
331+
let mut stream = me.store.resolve(self.key);
332+
333+
// Create the data frame
334+
let frame = frame::Data::from_buf(stream.id, data, end_of_stream);
335+
336+
// Send the data frame
337+
me.actions.send.send_data(frame, &mut stream)?;
338+
339+
if stream.state.is_closed() {
340+
me.actions.dec_num_streams(stream.id);
341+
}
342+
343+
Ok(())
344+
}
345+
323346
pub fn poll_data(&mut self) -> Poll<Option<Chunk<P, B>>, ConnectionError> {
324347
let recv = {
325348
let mut me = self.inner.lock().unwrap();

tests/stream_states.rs

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ fn send_recv_headers_only() {
3838
h2.wait().unwrap();
3939
}
4040

41-
/*
4241
#[test]
4342
fn send_recv_data() {
4443
let _ = env_logger::init();
@@ -64,14 +63,42 @@ fn send_recv_data() {
6463
])
6564
.build();
6665

67-
let h2 = client::handshake(mock).wait().expect("handshake");
66+
let mut h2 = Client::handshake2(mock)
67+
.wait().unwrap();
6868

69-
// Send the request
70-
let mut request = request::Head::default();
71-
request.method = method::POST;
72-
request.uri = "https://http2.akamai.com/".parse().unwrap();
73-
let h2 = h2.send_request(1.into(), request, false).wait().expect("send request");
69+
let request = Request::builder()
70+
.method(method::POST)
71+
.uri("https://http2.akamai.com/")
72+
.body(()).unwrap();
73+
74+
info!("sending request");
75+
let mut stream = h2.request(request, false).unwrap();
76+
77+
// Send the data
78+
stream.send_data("hello", true).unwrap();
79+
80+
// Get the response
81+
let resp = h2.run(poll_fn(|| stream.poll_response())).unwrap();
82+
assert_eq!(resp.status(), status::OK);
83+
84+
// Take the body
85+
let (_, body) = resp.into_parts();
86+
87+
// Wait for all the data frames to be received
88+
let mut chunks = h2.run(body.collect()).unwrap();
7489

90+
// Only one chunk since two frames are coalesced.
91+
assert_eq!(1, chunks.len());
92+
93+
let data = chunks[0].pop_bytes().unwrap();
94+
assert_eq!(data, &b"world"[..]);
95+
96+
assert!(chunks[0].pop_bytes().is_none());
97+
98+
// The H2 connection is closed
99+
h2.wait().unwrap();
100+
101+
/*
75102
let b = "hello";
76103
77104
// Send the data
@@ -100,8 +127,8 @@ fn send_recv_data() {
100127
}
101128
102129
assert!(Stream::wait(h2).next().is_none());;
130+
*/
103131
}
104-
*/
105132

106133
#[test]
107134
fn send_headers_recv_data_single_frame() {
@@ -151,6 +178,8 @@ fn send_headers_recv_data_single_frame() {
151178
let data = chunks[0].pop_bytes().unwrap();
152179
assert_eq!(data, &b"world"[..]);
153180

181+
assert!(chunks[0].pop_bytes().is_none());
182+
154183
// The H2 connection is closed
155184
h2.wait().unwrap();
156185
}

0 commit comments

Comments
 (0)