diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index cebd058f..43732e43 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -130,7 +130,7 @@ jobs: ports: - 8080:80 echo_server: - image: jmalloc/echo-server@sha256:c461e7e54d947a8777413aaf9c624b4ad1f1bac5d8272475da859ae82c1abd7d + image: jmalloc/echo-server@sha256:e43a10c9ecbd025df7ed6dac1e45551ce7bd676142600b0734fe7dcd10a47abe ports: - 8081:8080 @@ -160,7 +160,8 @@ jobs: - name: Run browser tests env: HTTPBIN_URL: "http://localhost:8080" - ECHO_SERVER_URL: "ws://localhost:8081" + WS_ECHO_SERVER_URL: "ws://localhost:8081" + SSE_ECHO_SERVER_URL: "http://localhost:8081/.sse" run: | cd crates/net wasm-pack test --chrome --firefox --headless --all-features @@ -175,7 +176,8 @@ jobs: - name: Run native tests env: HTTPBIN_URL: "http://localhost:8080" - ECHO_SERVER_URL: "ws://localhost:8081" + WS_ECHO_SERVER_URL: "ws://localhost:8081" + SSE_ECHO_SERVER_URL: "http://localhost:8081/.sse" uses: actions-rs/cargo@v1 with: command: test diff --git a/crates/net/Cargo.toml b/crates/net/Cargo.toml index 37ac91d9..21c1dad6 100644 --- a/crates/net/Cargo.toml +++ b/crates/net/Cargo.toml @@ -37,7 +37,7 @@ wasm-bindgen-test = "0.3" futures = "0.3" [features] -default = ["json", "websocket", "http"] +default = ["json", "websocket", "http", "eventsource"] # Enables `.json()` on `Response` json = ["serde", "serde_json"] @@ -79,3 +79,13 @@ http = [ 'web-sys/Blob', 'web-sys/FormData', ] +# Enables the EventSource API +eventsource = [ + "futures-channel", + "futures-core", + "pin-project", + 'web-sys/Event', + 'web-sys/EventTarget', + 'web-sys/EventSource', + 'web-sys/MessageEvent', +] diff --git a/crates/net/README.md b/crates/net/README.md index 8874f42f..9207ba31 100644 --- a/crates/net/README.md +++ b/crates/net/README.md @@ -19,7 +19,7 @@ Built with 🦀🕸 by The Rust and WebAssembly Working Group -HTTP requests library for WASM Apps. It provides idiomatic Rust bindings for the `web_sys` `fetch` and `WebSocket` API +HTTP requests library for WASM Apps. It provides idiomatic Rust bindings for the `web_sys` [`fetch`](https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API), [`WebSocket`](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) and [`EventSource`](https://developer.mozilla.org/en-US/docs/Web/API/EventSource) APIs. ## Examples @@ -55,3 +55,23 @@ spawn_local(async move { console_log!("WebSocket Closed") }) ``` + +### EventSource + +```rust +use gloo_net::eventsource::futures::EventSource; +use wasm_bindgen_futures::spawn_local; +use futures::{stream, StreamExt}; + +let mut es = EventSource::new("http://api.example.com/ssedemo.php").unwrap(); +let stream_1 = es.subscribe("some-event-type").unwrap(); +let stream_2 = es.subscribe("another-event-type").unwrap(); + +spawn_local(async move { + let mut all_streams = stream::select(stream_1, stream_2); + while let Some(Ok((event_type, msg))) = all_streams.next().await { + console_log!(format!("1. {}: {:?}", event_type, msg)) + } + console_log!("EventSource Closed"); +}) +``` diff --git a/crates/net/src/error.rs b/crates/net/src/error.rs index 3c254c21..cbb09ff6 100644 --- a/crates/net/src/error.rs +++ b/crates/net/src/error.rs @@ -18,9 +18,9 @@ pub enum Error { ), } -#[cfg(any(feature = "http", feature = "websocket"))] +#[cfg(any(feature = "http", feature = "websocket", feature = "eventsource"))] pub(crate) use conversion::*; -#[cfg(any(feature = "http", feature = "websocket"))] +#[cfg(any(feature = "http", feature = "websocket", feature = "eventsource"))] mod conversion { use gloo_utils::errors::JsError; use std::convert::TryFrom; diff --git a/crates/net/src/eventsource/futures.rs b/crates/net/src/eventsource/futures.rs new file mode 100644 index 00000000..844dbfb1 --- /dev/null +++ b/crates/net/src/eventsource/futures.rs @@ -0,0 +1,272 @@ +//! A wrapper around the `EventSource` API using the Futures API to be used with async rust. +//! +//! EventSource is similar to WebSocket with the major differences being: +//! +//! * they are a one-way stream of server generated events +//! * their connection is managed entirely by the browser +//! * their data is slightly more structured including an id, type and data +//! +//! EventSource is therefore suitable for simpler scenarios than WebSocket. +//! +//! See the [MDN Documentation](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) to learn more. +//! +//! # Example +//! +//! ```rust +//! use gloo_net::eventsource::futures::EventSource; +//! use wasm_bindgen_futures::spawn_local; +//! use futures::{stream, StreamExt}; +//! +//! # macro_rules! console_log { +//! # ($($expr:expr),*) => {{}}; +//! # } +//! # fn no_run() { +//! let mut es = EventSource::new("http://api.example.com/ssedemo.php").unwrap(); +//! let stream_1 = es.subscribe("some-event-type").unwrap(); +//! let stream_2 = es.subscribe("another-event-type").unwrap(); +//! +//! spawn_local(async move { +//! let mut all_streams = stream::select(stream_1, stream_2); +//! while let Some(Ok((event_type, msg))) = all_streams.next().await { +//! console_log!(format!("1. {}: {:?}", event_type, msg)) +//! } +//! console_log!("EventSource Closed"); +//! }) +//! # } +//! ``` +use crate::eventsource::{EventSourceError, State}; +use crate::js_to_js_error; +use futures_channel::mpsc; +use futures_core::{ready, Stream}; +use gloo_utils::errors::JsError; +use pin_project::{pin_project, pinned_drop}; +use std::fmt; +use std::fmt::Formatter; +use std::pin::Pin; +use std::task::{Context, Poll}; +use wasm_bindgen::prelude::*; +use wasm_bindgen::JsCast; +use web_sys::MessageEvent; + +/// Wrapper around browser's EventSource API. Dropping +/// this will close the underlying event source. +#[derive(Clone)] +pub struct EventSource { + es: web_sys::EventSource, +} + +impl fmt::Debug for EventSource { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("EventSource") + .field("url", &self.es.url()) + .field("with_credentials", &self.es.with_credentials()) + .field("ready_state", &self.state()) + .finish_non_exhaustive() + } +} + +/// Wrapper around browser's EventSource API. +#[pin_project(PinnedDrop)] +pub struct EventSourceSubscription { + #[allow(clippy::type_complexity)] + error_callback: Closure, + es: web_sys::EventSource, + event_type: String, + message_callback: Closure, + #[pin] + message_receiver: mpsc::UnboundedReceiver, +} + +impl fmt::Debug for EventSourceSubscription { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("EventSourceSubscription") + .field("event_source", &self.es) + .field("event_type", &self.event_type) + .finish_non_exhaustive() + } +} + +impl EventSource { + /// Establish an EventSource. + /// + /// This function may error in the following cases: + /// - The connection url is invalid + /// + /// The error returned is [`JsError`]. See the + /// [MDN Documentation](https://developer.mozilla.org/en-US/docs/Web/API/EventSource/EventSource#exceptions_thrown) + /// to learn more. + pub fn new(url: &str) -> Result { + let es = web_sys::EventSource::new(url).map_err(js_to_js_error)?; + + Ok(Self { es }) + } + + /// Subscribes to listening for a specific type of event. + /// + /// All events for this type are streamed back given the subscription + /// returned. + /// + /// The event type of "message" is a special case, as it will capture + /// events without an event field as well as events that have the + /// specific type `event: message`. It will not trigger on any + /// other event type. + pub fn subscribe( + &mut self, + event_type: impl Into, + ) -> Result { + let event_type = event_type.into(); + let (message_sender, message_receiver) = mpsc::unbounded(); + + let message_callback: Closure = { + let event_type = event_type.clone(); + let sender = message_sender.clone(); + Closure::wrap(Box::new(move |e: MessageEvent| { + let event_type = event_type.clone(); + let _ = sender.unbounded_send(StreamMessage::Message(event_type, e)); + }) as Box) + }; + + self.es + .add_event_listener_with_callback( + &event_type, + message_callback.as_ref().unchecked_ref(), + ) + .map_err(js_to_js_error)?; + + let error_callback: Closure = { + Closure::wrap(Box::new(move |e: web_sys::Event| { + let is_connecting = e + .current_target() + .map(|target| target.unchecked_into::()) + .map(|es| es.ready_state() == web_sys::EventSource::CONNECTING) + .unwrap_or(false); + if !is_connecting { + let _ = message_sender.unbounded_send(StreamMessage::ErrorEvent); + }; + }) as Box) + }; + + self.es + .add_event_listener_with_callback("error", error_callback.as_ref().unchecked_ref()) + .map_err(js_to_js_error)?; + + Ok(EventSourceSubscription { + error_callback, + es: self.es.clone(), + event_type, + message_callback, + message_receiver, + }) + } + + /// Closes the EventSource. + /// + /// See the [MDN Documentation](https://developer.mozilla.org/en-US/docs/Web/API/EventSource/close#parameters) + /// to learn about this function + pub fn close(mut self) { + self.close_and_notify(); + } + + fn close_and_notify(&mut self) { + self.es.close(); + // Fire an error event to cause all subscriber + // streams to close down. + if let Ok(event) = web_sys::Event::new("error") { + let _ = self.es.dispatch_event(&event); + } + } + + /// The current state of the EventSource. + pub fn state(&self) -> State { + let ready_state = self.es.ready_state(); + match ready_state { + 0 => State::Connecting, + 1 => State::Open, + 2 => State::Closed, + _ => unreachable!(), + } + } +} + +impl Drop for EventSource { + fn drop(&mut self) { + self.close_and_notify(); + } +} + +#[derive(Clone)] +enum StreamMessage { + ErrorEvent, + Message(String, MessageEvent), +} + +impl Stream for EventSourceSubscription { + type Item = Result<(String, MessageEvent), EventSourceError>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let msg = ready!(self.project().message_receiver.poll_next(cx)); + match msg { + Some(StreamMessage::Message(event_type, msg)) => { + Poll::Ready(Some(Ok((event_type, msg)))) + } + Some(StreamMessage::ErrorEvent) => { + Poll::Ready(Some(Err(EventSourceError::ConnectionError))) + } + None => Poll::Ready(None), + } + } +} + +#[pinned_drop] +impl PinnedDrop for EventSourceSubscription { + fn drop(self: Pin<&mut Self>) { + let _ = self.es.remove_event_listener_with_callback( + "error", + self.error_callback.as_ref().unchecked_ref(), + ); + + let _ = self.es.remove_event_listener_with_callback( + &self.event_type, + self.message_callback.as_ref().unchecked_ref(), + ); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::StreamExt; + use wasm_bindgen_futures::spawn_local; + use wasm_bindgen_test::*; + + wasm_bindgen_test_configure!(run_in_browser); + + const SSE_ECHO_SERVER_URL: &str = env!("SSE_ECHO_SERVER_URL"); + + #[wasm_bindgen_test] + fn eventsource_works() { + let mut es = EventSource::new(SSE_ECHO_SERVER_URL).unwrap(); + let mut servers = es.subscribe("server").unwrap(); + let mut requests = es.subscribe("request").unwrap(); + + spawn_local(async move { + assert_eq!(servers.next().await.unwrap().unwrap().0, "server"); + assert_eq!(requests.next().await.unwrap().unwrap().0, "request"); + }); + } + + #[wasm_bindgen_test] + fn eventsource_connect_failure_works() { + let mut es = EventSource::new("rubbish").unwrap(); + let mut servers = es.subscribe("server").unwrap(); + + spawn_local(async move { + // we should expect an immediate failure + + assert_eq!( + servers.next().await, + Some(Err(EventSourceError::ConnectionError)) + ); + }) + } +} diff --git a/crates/net/src/eventsource/mod.rs b/crates/net/src/eventsource/mod.rs new file mode 100644 index 00000000..ee28d883 --- /dev/null +++ b/crates/net/src/eventsource/mod.rs @@ -0,0 +1,39 @@ +//! Wrapper around the `EventSource` API +//! +//! This API is provided in the following flavors: +//! - [Futures API][futures] + +pub mod futures; + +use std::fmt; + +/// The state of the EventSource. +/// +/// See [`EventSource.readyState` on MDN](https://developer.mozilla.org/en-US/docs/Web/API/EventSource/readyState) +/// to learn more. +#[derive(Copy, Clone, Debug)] +pub enum State { + /// The connection has not yet been established. + Connecting, + /// The EventSource connection is established and communication is possible. + Open, + /// The connection has been closed or could not be opened. + Closed, +} + +/// Error returned by the EventSource +#[derive(Clone, Debug, Eq, PartialEq)] +#[non_exhaustive] +#[allow(missing_copy_implementations)] +pub enum EventSourceError { + /// The `error` event + ConnectionError, +} + +impl fmt::Display for EventSourceError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + EventSourceError::ConnectionError => write!(f, "EventSource connection failed"), + } + } +} diff --git a/crates/net/src/lib.rs b/crates/net/src/lib.rs index d3cabced..0a9cf638 100644 --- a/crates/net/src/lib.rs +++ b/crates/net/src/lib.rs @@ -11,6 +11,9 @@ #![cfg_attr(docsrs, feature(doc_cfg))] mod error; +#[cfg(feature = "eventsource")] +#[cfg_attr(docsrs, doc(cfg(feature = "eventsource")))] +pub mod eventsource; #[cfg(feature = "http")] #[cfg_attr(docsrs, doc(cfg(feature = "http")))] pub mod http; diff --git a/crates/net/src/websocket/futures.rs b/crates/net/src/websocket/futures.rs index e2289de6..d6ac503e 100644 --- a/crates/net/src/websocket/futures.rs +++ b/crates/net/src/websocket/futures.rs @@ -313,11 +313,11 @@ mod tests { wasm_bindgen_test_configure!(run_in_browser); - const ECHO_SERVER_URL: &str = env!("ECHO_SERVER_URL"); + const WS_ECHO_SERVER_URL: &str = env!("WS_ECHO_SERVER_URL"); #[wasm_bindgen_test] fn websocket_works() { - let ws = WebSocket::open(ECHO_SERVER_URL).unwrap(); + let ws = WebSocket::open(WS_ECHO_SERVER_URL).unwrap(); let (mut sender, mut receiver) = ws.split(); spawn_local(async move { @@ -333,8 +333,8 @@ mod tests { spawn_local(async move { // ignore first message - // the echo-server used sends it's info in the first message - // let _ = ws.next().await; + // the echo-server uses it to send it's info in the first message + let _ = receiver.next().await; assert_eq!( receiver.next().await.unwrap().unwrap(),