Skip to content

Provides an EventSource implementation #246

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Sep 16, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ jobs:
ports:
- 8080:80
echo_server:
image: jmalloc/echo-server@sha256:c461e7e54d947a8777413aaf9c624b4ad1f1bac5d8272475da859ae82c1abd7d
image: jmalloc/echo-server@sha256:e43a10c9ecbd025df7ed6dac1e45551ce7bd676142600b0734fe7dcd10a47abe
ports:
- 8081:8080

Expand Down Expand Up @@ -162,7 +162,8 @@ jobs:
- name: Run browser tests
env:
HTTPBIN_URL: "http://localhost:8080"
ECHO_SERVER_URL: "ws://localhost:8081"
WS_ECHO_SERVER_URL: "ws://localhost:8081"
SSE_ECHO_SERVER_URL: "http://localhost:8081/.sse"
run: |
cd crates/net
wasm-pack test --chrome --firefox --headless --all-features
Expand All @@ -177,7 +178,8 @@ jobs:
- name: Run native tests
env:
HTTPBIN_URL: "http://localhost:8080"
ECHO_SERVER_URL: "ws://localhost:8081"
WS_ECHO_SERVER_URL: "ws://localhost:8081"
SSE_ECHO_SERVER_URL: "http://localhost:8081/.sse"
uses: actions-rs/cargo@v1
with:
command: test
Expand Down
12 changes: 11 additions & 1 deletion crates/net/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ wasm-bindgen-test = "0.3"
futures = "0.3"

[features]
default = ["json", "websocket", "http"]
default = ["json", "websocket", "http", "eventsource"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally I am reluctant to putting "too" many features into the default one, but as we already have websocket here I guess we can also have eventsource.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, seemed to make sense to continue following the convention.


# Enables `.json()` on `Response`
json = ["serde", "serde_json"]
Expand Down Expand Up @@ -79,3 +79,13 @@ http = [
'web-sys/Blob',
'web-sys/FormData',
]
# Enables the EventSource API
eventsource = [
"futures-channel",
"futures-core",
"pin-project",
'web-sys/Event',
'web-sys/EventTarget',
'web-sys/EventSource',
'web-sys/MessageEvent',
]
22 changes: 21 additions & 1 deletion crates/net/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<sub>Built with 🦀🕸 by <a href="https://rustwasm.github.io/">The Rust and WebAssembly Working Group</a></sub>
</div>

HTTP requests library for WASM Apps. It provides idiomatic Rust bindings for the `web_sys` `fetch` and `WebSocket` API
HTTP requests library for WASM Apps. It provides idiomatic Rust bindings for the `web_sys` [`fetch`](https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API), [`WebSocket`](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) and [`EventSource`](https://developer.mozilla.org/en-US/docs/Web/API/EventSource) APIs.

## Examples

Expand Down Expand Up @@ -55,3 +55,23 @@ spawn_local(async move {
console_log!("WebSocket Closed")
})
```

### EventSource

```rust
use gloo_net::eventsource::futures::EventSource;
use wasm_bindgen_futures::spawn_local;
use futures::{stream, StreamExt};

let mut es = EventSource::new("http://api.example.com/ssedemo.php").unwrap();
let stream_1 = es.subscribe("some-event-type").unwrap();
let stream_2 = es.subscribe("another-event-type").unwrap();

spawn_local(async move {
let mut all_streams = stream::select(stream_1, stream_2);
while let Some(Ok((event_type, msg))) = all_streams.next().await {
console_log!(format!("1. {}: {:?}", event_type, msg))
}
console_log!("EventSource Closed");
})
```
4 changes: 2 additions & 2 deletions crates/net/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ pub enum Error {
),
}

#[cfg(any(feature = "http", feature = "websocket"))]
#[cfg(any(feature = "http", feature = "websocket", feature = "eventsource"))]
pub(crate) use conversion::*;
#[cfg(any(feature = "http", feature = "websocket"))]
#[cfg(any(feature = "http", feature = "websocket", feature = "eventsource"))]
mod conversion {
use gloo_utils::errors::JsError;
use std::convert::TryFrom;
Expand Down
252 changes: 252 additions & 0 deletions crates/net/src/eventsource/futures.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
//! A wrapper around the `EventSource` API using the Futures API to be used with async rust.
//!
//! EventSource is similar to WebSocket with the major differences being:
//!
//! * they are a one-way stream of server generated events
//! * their connection is managed entirely by the browser
//! * their data is slightly more structured including an id, type and data
//!
//! EventSource is therefore suitable for simpler scenarios than WebSocket.
//!
//! See the [MDN Documentation](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events) to learn more.
//!
//! # Example
//!
//! ```rust
//! use gloo_net::eventsource::futures::EventSource;
//! use wasm_bindgen_futures::spawn_local;
//! use futures::{stream, StreamExt};
//!
//! # macro_rules! console_log {
//! # ($($expr:expr),*) => {{}};
//! # }
//! # fn no_run() {
//! let mut es = EventSource::new("http://api.example.com/ssedemo.php").unwrap();
//! let stream_1 = es.subscribe("some-event-type").unwrap();
//! let stream_2 = es.subscribe("another-event-type").unwrap();
//!
//! spawn_local(async move {
//! let mut all_streams = stream::select(stream_1, stream_2);
//! while let Some(Ok((event_type, msg))) = all_streams.next().await {
//! console_log!(format!("1. {}: {:?}", event_type, msg))
//! }
//! console_log!("EventSource Closed");
//! })
//! # }
//! ```
use crate::eventsource::{EventSourceError, State};
use crate::js_to_js_error;
use futures_channel::mpsc;
use futures_core::{ready, Stream};
use gloo_utils::errors::JsError;
use pin_project::{pin_project, pinned_drop};
use std::pin::Pin;
use std::task::{Context, Poll};
use wasm_bindgen::prelude::*;
use wasm_bindgen::JsCast;
use web_sys::MessageEvent;

/// Wrapper around browser's EventSource API. Dropping
/// this will close the underlying event source.
#[allow(missing_debug_implementations)]
#[derive(Clone)]
pub struct EventSource {
es: web_sys::EventSource,
}

/// Wrapper around browser's EventSource API.
#[allow(missing_debug_implementations)]
#[pin_project(PinnedDrop)]
pub struct EventSourceSubscription {
#[allow(clippy::type_complexity)]
error_callback: Closure<dyn FnMut(web_sys::Event)>,
es: web_sys::EventSource,
event_type: String,
message_callback: Closure<dyn FnMut(MessageEvent)>,
#[pin]
message_receiver: mpsc::UnboundedReceiver<StreamMessage>,
}

impl EventSource {
/// Establish an EventSource.
///
/// This function may error in the following cases:
/// - The connection url is invalid
///
/// The error returned is [`JsError`]. See the
/// [MDN Documentation](https://developer.mozilla.org/en-US/docs/Web/API/EventSource/EventSource#exceptions_thrown)
/// to learn more.
pub fn new(url: &str) -> Result<Self, JsError> {
let es = web_sys::EventSource::new(url).map_err(js_to_js_error)?;

Ok(Self { es })
}

/// Subscribes to listening for a specific type of event.
///
/// All events for this type are streamed back given the subscription
/// returned.
///
/// The event type of "message" is a special case, as it will capture
/// events without an event field as well as events that have the
/// specific type `event: message`. It will not trigger on any
/// other event type.
pub fn subscribe(&mut self, event_type: &str) -> Result<EventSourceSubscription, JsError> {
let event_type = event_type.to_string();

let (message_sender, message_receiver) = mpsc::unbounded();

let message_callback: Closure<dyn FnMut(MessageEvent)> = {
let event_type = event_type.clone();
let sender = message_sender.clone();
Closure::wrap(Box::new(move |e: MessageEvent| {
let event_type = event_type.clone();
let _ = sender.unbounded_send(StreamMessage::Message(event_type, e));
}) as Box<dyn FnMut(MessageEvent)>)
};

self.es
.add_event_listener_with_callback(
&event_type,
message_callback.as_ref().unchecked_ref(),
)
.map_err(js_to_js_error)?;

let error_callback: Closure<dyn FnMut(web_sys::Event)> = {
let sender = message_sender.clone();
Closure::wrap(Box::new(move |e: web_sys::Event| {
let is_connecting = e
.current_target()
.map(|target| target.unchecked_into::<web_sys::EventSource>())
.map(|es| es.ready_state() == web_sys::EventSource::CONNECTING)
.unwrap_or(false);
if !is_connecting {
let _ = sender.unbounded_send(StreamMessage::ErrorEvent);
};
}) as Box<dyn FnMut(web_sys::Event)>)
};

self.es
.add_event_listener_with_callback("error", error_callback.as_ref().unchecked_ref())
.map_err(js_to_js_error)?;

Ok(EventSourceSubscription {
error_callback,
es: self.es.clone(),
event_type,
message_callback,
message_receiver,
})
}

/// Closes the EventSource.
///
/// See the [MDN Documentation](https://developer.mozilla.org/en-US/docs/Web/API/EventSource/close#parameters)
/// to learn about this function
pub fn close(mut self) {
self.close_and_notify();
}

fn close_and_notify(&mut self) {
self.es.close();
// Fire an error event to cause all subscriber
// streams to close down.
if let Ok(event) = web_sys::Event::new("error") {
let _ = self.es.dispatch_event(&event);
}
}

/// The current state of the EventSource.
pub fn state(&self) -> State {
let ready_state = self.es.ready_state();
match ready_state {
0 => State::Connecting,
1 => State::Open,
2 => State::Closed,
_ => unreachable!(),
}
}
}

impl Drop for EventSource {
fn drop(&mut self) {
self.close_and_notify();
}
}

#[derive(Clone)]
enum StreamMessage {
ErrorEvent,
Message(String, MessageEvent),
}

impl Stream for EventSourceSubscription {
type Item = Result<(String, MessageEvent), EventSourceError>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let msg = ready!(self.project().message_receiver.poll_next(cx));
match msg {
Some(StreamMessage::Message(event_type, msg)) => {
Poll::Ready(Some(Ok((event_type, msg))))
}
Some(StreamMessage::ErrorEvent) => {
Poll::Ready(Some(Err(EventSourceError::ConnectionError)))
}
None => Poll::Ready(None),
}
}
}

#[pinned_drop]
impl PinnedDrop for EventSourceSubscription {
fn drop(self: Pin<&mut Self>) {
let _ = self.es.remove_event_listener_with_callback(
"error",
self.error_callback.as_ref().unchecked_ref(),
);

let _ = self.es.remove_event_listener_with_callback(
&self.event_type,
self.message_callback.as_ref().unchecked_ref(),
);
}
}

#[cfg(test)]
mod tests {
use super::*;
use futures::StreamExt;
use wasm_bindgen_futures::spawn_local;
use wasm_bindgen_test::*;

wasm_bindgen_test_configure!(run_in_browser);

const SSE_ECHO_SERVER_URL: &str = env!("SSE_ECHO_SERVER_URL");

#[wasm_bindgen_test]
fn eventsource_works() {
let mut es = EventSource::new(SSE_ECHO_SERVER_URL).unwrap();
let mut servers = es.subscribe("server").unwrap();
let mut requests = es.subscribe("request").unwrap();

spawn_local(async move {
assert_eq!(servers.next().await.unwrap().unwrap().0, "server");
assert_eq!(requests.next().await.unwrap().unwrap().0, "request");
});
}

#[wasm_bindgen_test]
fn eventsource_connect_failure_works() {
let mut es = EventSource::new("rubbish").unwrap();
let mut servers = es.subscribe("server").unwrap();

spawn_local(async move {
// we should expect an immediate failure

assert_eq!(
servers.next().await,
Some(Err(EventSourceError::ConnectionError))
);
})
}
}
39 changes: 39 additions & 0 deletions crates/net/src/eventsource/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
//! Wrapper around the `EventSource` API
//!
//! This API is provided in the following flavors:
//! - [Futures API][futures]

pub mod futures;

use std::fmt;

/// The state of the EventSource.
///
/// See [`EventSource.readyState` on MDN](https://developer.mozilla.org/en-US/docs/Web/API/EventSource/readyState)
/// to learn more.
#[derive(Copy, Clone, Debug)]
pub enum State {
/// The connection has not yet been established.
Connecting,
/// The EventSource connection is established and communication is possible.
Open,
/// The connection has been closed or could not be opened.
Closed,
}

/// Error returned by the EventSource
#[derive(Clone, Debug, Eq, PartialEq)]
#[non_exhaustive]
#[allow(missing_copy_implementations)]
pub enum EventSourceError {
/// The `error` event
ConnectionError,
}

impl fmt::Display for EventSourceError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
EventSourceError::ConnectionError => write!(f, "EventSource connection failed"),
}
}
}
3 changes: 3 additions & 0 deletions crates/net/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
#![cfg_attr(docsrs, feature(doc_cfg))]

mod error;
#[cfg(feature = "eventsource")]
#[cfg_attr(docsrs, doc(cfg(feature = "eventsource")))]
pub mod eventsource;
#[cfg(feature = "http")]
#[cfg_attr(docsrs, doc(cfg(feature = "http")))]
pub mod http;
Expand Down
Loading