Skip to content

Commit a880251

Browse files
send reset signal when read Error occurs
1 parent 75e25b5 commit a880251

File tree

2 files changed

+30
-12
lines changed

2 files changed

+30
-12
lines changed

misc/webrtc-utils/src/stream.rs

+15-7
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use std::{
3030

3131
use crate::proto::{Flag, Message};
3232
use crate::{
33-
stream::drop_listener::GracefullyClosed,
33+
stream::drop_listener::DropMessage,
3434
stream::framed_dc::FramedDc,
3535
stream::state::{Closing, State},
3636
};
@@ -62,7 +62,7 @@ pub struct Stream<T> {
6262
state: State,
6363
read_buffer: Bytes,
6464
/// Dropping this will close the oneshot and notify the receiver by emitting `Canceled`.
65-
drop_notifier: Option<oneshot::Sender<GracefullyClosed>>,
65+
drop_notifier: Option<oneshot::Sender<DropMessage>>,
6666
}
6767

6868
impl<T> Stream<T>
@@ -139,21 +139,29 @@ where
139139
..
140140
} = &mut *self;
141141

142-
match ready!(io_poll_next(io, cx))? {
143-
Some((flag, message)) => {
142+
match ready!(io_poll_next(io, cx)) {
143+
Ok(Some((flag, message))) => {
144144
if let Some(flag) = flag {
145145
state.handle_inbound_flag(flag, read_buffer);
146146
}
147147

148-
debug_assert!(read_buffer.is_empty());
149148
if let Some(message) = message {
150149
*read_buffer = message.into();
151150
}
152151
}
153-
None => {
152+
Ok(None) => {
154153
state.handle_inbound_flag(Flag::FIN, read_buffer);
155154
return Poll::Ready(Ok(0));
156155
}
156+
Err(e) => {
157+
log::error!("Error while reading next message: {:?}", e);
158+
let _ = self
159+
.drop_notifier
160+
.take()
161+
.expect("should be able to take drop_notifier value")
162+
.send(DropMessage::SendReset);
163+
return Poll::Ready(Err(e));
164+
}
157165
}
158166
}
159167
}
@@ -231,7 +239,7 @@ where
231239
.drop_notifier
232240
.take()
233241
.expect("to not close twice")
234-
.send(GracefullyClosed {});
242+
.send(DropMessage::GracefullyClosed);
235243

236244
return Poll::Ready(Ok(()));
237245
}

misc/webrtc-utils/src/stream/drop_listener.rs

+15-5
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ pub struct DropListener<T> {
3636
}
3737

3838
impl<T> DropListener<T> {
39-
pub fn new(stream: FramedDc<T>, receiver: oneshot::Receiver<GracefullyClosed>) -> Self {
39+
pub fn new(stream: FramedDc<T>, receiver: oneshot::Receiver<DropMessage>) -> Self {
4040
Self {
4141
state: State::Idle { stream, receiver },
4242
}
@@ -47,7 +47,7 @@ enum State<T> {
4747
/// The [`DropListener`] is idle and waiting to be activated.
4848
Idle {
4949
stream: FramedDc<T>,
50-
receiver: oneshot::Receiver<GracefullyClosed>,
50+
receiver: oneshot::Receiver<DropMessage>,
5151
},
5252
/// The stream got dropped and we are sending a reset flag.
5353
SendingReset {
@@ -75,9 +75,14 @@ where
7575
stream,
7676
mut receiver,
7777
} => match receiver.poll_unpin(cx) {
78-
Poll::Ready(Ok(GracefullyClosed {})) => {
78+
Poll::Ready(Ok(DropMessage::GracefullyClosed)) => {
7979
return Poll::Ready(Ok(()));
8080
}
81+
Poll::Ready(Ok(DropMessage::SendReset)) => {
82+
log::info!("Stream gracefully errored, sending Reset");
83+
*state = State::SendingReset { stream };
84+
continue;
85+
}
8186
Poll::Ready(Err(Canceled)) => {
8287
log::info!("Stream dropped without graceful close, sending Reset");
8388
*state = State::SendingReset { stream };
@@ -117,5 +122,10 @@ where
117122
}
118123
}
119124

120-
/// Indicates that our stream got gracefully closed.
121-
pub struct GracefullyClosed {}
125+
/// The reason we are dropping the Stream.
126+
pub enum DropMessage {
127+
/// The stream was closed gracefully.
128+
GracefullyClosed,
129+
/// The stream errored (such as receiving too much data) and we are sending a reset signal
130+
SendReset,
131+
}

0 commit comments

Comments
 (0)