Skip to content

Commit 29b49ae

Browse files
committed
Provides an EventSource implementation
Modelled on the WebSocket API.
1 parent 1039074 commit 29b49ae

File tree

8 files changed

+316
-11
lines changed

8 files changed

+316
-11
lines changed

.github/workflows/tests.yml

+5-3
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ jobs:
132132
ports:
133133
- 8080:80
134134
echo_server:
135-
image: jmalloc/echo-server@sha256:c461e7e54d947a8777413aaf9c624b4ad1f1bac5d8272475da859ae82c1abd7d
135+
image: jmalloc/echo-server@sha256:e43a10c9ecbd025df7ed6dac1e45551ce7bd676142600b0734fe7dcd10a47abe
136136
ports:
137137
- 8081:8080
138138

@@ -162,7 +162,8 @@ jobs:
162162
- name: Run browser tests
163163
env:
164164
HTTPBIN_URL: "http://localhost:8080"
165-
ECHO_SERVER_URL: "ws://localhost:8081"
165+
WS_ECHO_SERVER_URL: "ws://localhost:8081"
166+
SSE_ECHO_SERVER_URL: "http://localhost:8081/.sse"
166167
run: |
167168
cd crates/net
168169
wasm-pack test --chrome --firefox --headless --all-features
@@ -177,7 +178,8 @@ jobs:
177178
- name: Run native tests
178179
env:
179180
HTTPBIN_URL: "http://localhost:8080"
180-
ECHO_SERVER_URL: "ws://localhost:8081"
181+
WS_ECHO_SERVER_URL: "ws://localhost:8081"
182+
SSE_ECHO_SERVER_URL: "http://localhost:8081/.sse"
181183
uses: actions-rs/cargo@v1
182184
with:
183185
command: test

crates/net/Cargo.toml

+12-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ wasm-bindgen = "0.2"
1919
web-sys = "0.3"
2020
js-sys = "0.3"
2121
gloo-utils = { version = "0.1", path = "../utils", features = ["serde"] }
22+
gloo-console = { path = "../console" }
2223

2324
wasm-bindgen-futures = "0.4"
2425
futures-core = { version = "0.3", optional = true }
@@ -37,7 +38,7 @@ wasm-bindgen-test = "0.3"
3738
futures = "0.3"
3839

3940
[features]
40-
default = ["json", "websocket", "http"]
41+
default = ["json", "websocket", "http", "eventsource"]
4142

4243
# Enables `.json()` on `Response`
4344
json = ["serde", "serde_json"]
@@ -79,3 +80,13 @@ http = [
7980
'web-sys/Blob',
8081
'web-sys/FormData',
8182
]
83+
# Enables the EventSource API
84+
eventsource = [
85+
"futures-channel",
86+
"futures-core",
87+
"pin-project",
88+
'web-sys/Event',
89+
'web-sys/EventTarget',
90+
'web-sys/EventSource',
91+
'web-sys/MessageEvent',
92+
]

crates/net/README.md

+20-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
<sub>Built with 🦀🕸 by <a href="https://rustwasm.github.io/">The Rust and WebAssembly Working Group</a></sub>
2020
</div>
2121

22-
HTTP requests library for WASM Apps. It provides idiomatic Rust bindings for the `web_sys` `fetch` and `WebSocket` API
22+
HTTP requests library for WASM Apps. It provides idiomatic Rust bindings for the `web_sys` `EventSource`, `fetch` and `WebSocket` APIs.
2323

2424
## Examples
2525

@@ -55,3 +55,22 @@ spawn_local(async move {
5555
console_log!("WebSocket Closed")
5656
})
5757
```
58+
59+
### EventSource
60+
61+
```rust
62+
use gloo_net::eventsource::futures::EventSource;
63+
use wasm_bindgen_futures::spawn_local;
64+
use futures::StreamExt;
65+
66+
let mut es = EventSource::new("http://api.example.com/ssedemo.php").unwrap();
67+
es.subscribe_event("some-event-type").unwrap();
68+
es.subscribe_event("another-event-type").unwrap();
69+
70+
spawn_local(async move {
71+
while let Some((event_type, msg)) = es.next().await {
72+
console_log!(format!("1. {}: {:?}", event_type. msg))
73+
}
74+
console_log!("EventSource Closed");
75+
})
76+
```

crates/net/src/error.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ pub enum Error {
1818
),
1919
}
2020

21-
#[cfg(any(feature = "http", feature = "websocket"))]
21+
#[cfg(any(feature = "http", feature = "websocket", feature = "eventsource"))]
2222
pub(crate) use conversion::*;
23-
#[cfg(any(feature = "http", feature = "websocket"))]
23+
#[cfg(any(feature = "http", feature = "websocket", feature = "eventsource"))]
2424
mod conversion {
2525
use gloo_utils::errors::JsError;
2626
use std::convert::TryFrom;

crates/net/src/eventsource/futures.rs

+232
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
//! A wrapper around the `EventSource` API using the Futures API to be used with async rust.
2+
//!
3+
//! EventSource is similar to WebSocket with the major differences being:
4+
//!
5+
//! * they are a one-way stream of server generated events
6+
//! * their connection is managed entirely by the browser
7+
//! * their data is slightly more structured including an id, type and data
8+
//!
9+
//! EventSource is therefore suitable for simpler scenarios than WebSocket.
10+
//!
11+
//! See the [MDN Documentation](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) to learn more.
12+
//!
13+
//! # Example
14+
//!
15+
//! ```rust
16+
//! use gloo_net::eventsource::futures::EventSource;
17+
//! use wasm_bindgen_futures::spawn_local;
18+
//! use futures::StreamExt;
19+
//!
20+
//! # macro_rules! console_log {
21+
//! # ($($expr:expr),*) => {{}};
22+
//! # }
23+
//! # fn no_run() {
24+
//! let mut es = EventSource::new("http://api.example.com/ssedemo.php").unwrap();
25+
//! es.subscribe_event("some-event-type").unwrap();
26+
//! es.subscribe_event("another-event-type").unwrap();
27+
//!
28+
//! spawn_local(async move {
29+
//! while let Some((event_type, msg)) = es.next().await {
30+
//! console_log!(format!("1. {}: {:?}", event_type, msg))
31+
//! }
32+
//! console_log!("EventSource Closed");
33+
//! })
34+
//! # }
35+
//! ```
36+
use crate::eventsource::{EventSourceError, State};
37+
use crate::js_to_js_error;
38+
use futures_channel::mpsc;
39+
use futures_core::{ready, Stream};
40+
use gloo_utils::errors::JsError;
41+
use pin_project::{pin_project, pinned_drop};
42+
use std::ops::DerefMut;
43+
use std::pin::Pin;
44+
use std::sync::{Arc, Mutex};
45+
use std::task::{Context, Poll};
46+
use wasm_bindgen::prelude::*;
47+
use wasm_bindgen::JsCast;
48+
use web_sys::MessageEvent;
49+
50+
/// Wrapper around browser's EventSource API.
51+
#[allow(missing_debug_implementations)]
52+
#[pin_project(PinnedDrop)]
53+
pub struct EventSource {
54+
es: web_sys::EventSource,
55+
message_sender: mpsc::UnboundedSender<StreamMessage>,
56+
#[pin]
57+
message_receiver: mpsc::UnboundedReceiver<StreamMessage>,
58+
#[allow(clippy::type_complexity)]
59+
closures: Arc<
60+
Mutex<(
61+
Vec<Closure<dyn FnMut(MessageEvent)>>,
62+
Closure<dyn FnMut(web_sys::Event)>,
63+
)>,
64+
>,
65+
}
66+
67+
impl EventSource {
68+
/// Establish an EventSource.
69+
///
70+
/// This function may error in the following cases:
71+
/// - The connection url is invalid
72+
///
73+
/// The error returned is [`JsError`]. See the
74+
/// [MDN Documentation](https://developer.mozilla.org/en-US/docs/Web/API/EventSource/EventSource#exceptions_thrown)
75+
/// to learn more.
76+
pub fn new(url: &str) -> Result<Self, JsError> {
77+
let es = web_sys::EventSource::new(url).map_err(js_to_js_error)?;
78+
79+
let (message_sender, message_receiver) = mpsc::unbounded();
80+
81+
let error_callback: Closure<dyn FnMut(web_sys::Event)> = {
82+
let sender = message_sender.clone();
83+
Closure::wrap(Box::new(move |e: web_sys::Event| {
84+
let sender = sender.clone();
85+
let is_connecting = e
86+
.current_target()
87+
.and_then(|target| target.dyn_into::<web_sys::EventSource>().ok())
88+
.map(|es| es.ready_state() == web_sys::EventSource::CONNECTING)
89+
.unwrap_or(false);
90+
if !is_connecting {
91+
let _ = sender.unbounded_send(StreamMessage::ErrorEvent);
92+
};
93+
}) as Box<dyn FnMut(web_sys::Event)>)
94+
};
95+
96+
es.set_onerror(Some(error_callback.as_ref().unchecked_ref()));
97+
98+
Ok(Self {
99+
es,
100+
message_sender,
101+
message_receiver,
102+
closures: Arc::new(Mutex::new((vec![], error_callback))),
103+
})
104+
}
105+
106+
/// Subscribes to listening for a specific type of event. Can be
107+
/// called multiple times.
108+
///
109+
/// All event types are streamed back with the element of the stream
110+
/// being a tuple of event type and message event.
111+
///
112+
/// The event type of "message" is a special case, as it will capture
113+
/// events without an event field as well as events that have the
114+
/// specific type `event: message`. It will not trigger on any
115+
/// other event type.
116+
pub fn subscribe_event(&mut self, event_type: &str) -> Result<(), JsError> {
117+
let event_type = event_type.to_string();
118+
match self.closures.lock() {
119+
Ok(mut closures) => {
120+
let (message_callbacks, _) = closures.deref_mut();
121+
122+
let message_callback: Closure<dyn FnMut(MessageEvent)> = {
123+
let sender = self.message_sender.clone();
124+
let event_type = event_type.to_string();
125+
Closure::wrap(Box::new(move |e: MessageEvent| {
126+
let sender = sender.clone();
127+
let event_type = event_type.to_string();
128+
let _ = sender.unbounded_send(StreamMessage::Message(event_type, e));
129+
}) as Box<dyn FnMut(MessageEvent)>)
130+
};
131+
132+
self.es
133+
.add_event_listener_with_callback(
134+
&event_type,
135+
message_callback.as_ref().unchecked_ref(),
136+
)
137+
.map_err(js_to_js_error)?;
138+
139+
message_callbacks.push(message_callback);
140+
Ok(())
141+
}
142+
Err(e) => Err(js_sys::Error::new(&format!("Failed to subscribe: {}", e)).into()),
143+
}
144+
}
145+
146+
/// Closes the EventSource.
147+
///
148+
/// See the [MDN Documentation](https://developer.mozilla.org/en-US/docs/Web/API/EventSource/close#parameters)
149+
/// to learn about this function
150+
pub fn close(self) {
151+
self.es.close();
152+
}
153+
154+
/// The current state of the EventSource.
155+
pub fn state(&self) -> State {
156+
let ready_state = self.es.ready_state();
157+
match ready_state {
158+
0 => State::Connecting,
159+
1 => State::Open,
160+
2 => State::Closed,
161+
_ => unreachable!(),
162+
}
163+
}
164+
}
165+
166+
#[derive(Clone)]
167+
enum StreamMessage {
168+
ErrorEvent,
169+
Message(String, MessageEvent),
170+
}
171+
172+
impl Stream for EventSource {
173+
type Item = Result<(String, MessageEvent), EventSourceError>;
174+
175+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
176+
let msg = ready!(self.project().message_receiver.poll_next(cx));
177+
match msg {
178+
Some(StreamMessage::Message(event_type, msg)) => {
179+
Poll::Ready(Some(Ok((event_type, msg))))
180+
}
181+
Some(StreamMessage::ErrorEvent) => {
182+
Poll::Ready(Some(Err(EventSourceError::ConnectionError)))
183+
}
184+
None => Poll::Ready(None),
185+
}
186+
}
187+
}
188+
189+
#[pinned_drop]
190+
impl PinnedDrop for EventSource {
191+
fn drop(self: Pin<&mut Self>) {
192+
self.es.close();
193+
}
194+
}
195+
196+
#[cfg(test)]
197+
mod tests {
198+
use super::*;
199+
use futures::StreamExt;
200+
use wasm_bindgen_futures::spawn_local;
201+
use wasm_bindgen_test::*;
202+
203+
wasm_bindgen_test_configure!(run_in_browser);
204+
205+
const SSE_ECHO_SERVER_URL: &str = env!("SSE_ECHO_SERVER_URL");
206+
207+
#[wasm_bindgen_test]
208+
fn eventsource_works() {
209+
let mut es = EventSource::new(SSE_ECHO_SERVER_URL).unwrap();
210+
es.subscribe_event("server").unwrap();
211+
es.subscribe_event("request").unwrap();
212+
213+
spawn_local(async move {
214+
assert_eq!(es.next().await.unwrap().unwrap().0, "server".to_string());
215+
assert_eq!(es.next().await.unwrap().unwrap().0, "request".to_string());
216+
});
217+
}
218+
219+
#[wasm_bindgen_test]
220+
fn eventsource_close_works() {
221+
let mut es = EventSource::new("rubbish").unwrap();
222+
223+
spawn_local(async move {
224+
// we should expect an immediate failure
225+
226+
assert_eq!(
227+
es.next().await,
228+
Some(Err(EventSourceError::ConnectionError))
229+
);
230+
})
231+
}
232+
}

crates/net/src/eventsource/mod.rs

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
//! Wrapper around the `EventSource` API
2+
//!
3+
//! This API is provided in the following flavors:
4+
//! - [Futures API][futures]
5+
6+
pub mod futures;
7+
8+
use std::fmt;
9+
10+
/// The state of the EventSource.
11+
///
12+
/// See [`EventSource.readyState` on MDN](https://developer.mozilla.org/en-US/docs/Web/API/EventSource/readyState)
13+
/// to learn more.
14+
#[derive(Copy, Clone, Debug)]
15+
pub enum State {
16+
/// The connection has not yet been established.
17+
Connecting,
18+
/// The EventSource connection is established and communication is possible.
19+
Open,
20+
/// The connection has been closed or could not be opened.
21+
Closed,
22+
}
23+
24+
/// Error returned by the EventSource
25+
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
26+
#[non_exhaustive]
27+
pub enum EventSourceError {
28+
/// The `error` event
29+
ConnectionError,
30+
}
31+
32+
impl fmt::Display for EventSourceError {
33+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
34+
match self {
35+
EventSourceError::ConnectionError => write!(f, "EventSource connection failed"),
36+
}
37+
}
38+
}

crates/net/src/lib.rs

+3
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
#![cfg_attr(docsrs, feature(doc_cfg))]
1212

1313
mod error;
14+
#[cfg(feature = "eventsource")]
15+
#[cfg_attr(docsrs, doc(cfg(feature = "eventsource")))]
16+
pub mod eventsource;
1417
#[cfg(feature = "http")]
1518
#[cfg_attr(docsrs, doc(cfg(feature = "http")))]
1619
pub mod http;

0 commit comments

Comments
 (0)