Skip to content

A new Websocket type and crate. #49

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
thedodd opened this issue Mar 25, 2019 · 35 comments · Fixed by #191
Closed

A new Websocket type and crate. #49

thedodd opened this issue Mar 25, 2019 · 35 comments · Fixed by #191

Comments

@thedodd
Copy link

thedodd commented Mar 25, 2019

Summary

This is a proposal for a new WebSocket abstraction. One crate with a submodule for a callbacks-based type and another submodule which builds on top of the previous for futures support.

Motivation

WebSockets are fundamental in the web world, and JS land has a plethora of WebSocket libraries with simple APIs. Let's build something on par with these APIs in order to provide the best possible experience.

Without the improvements proposed here:

  • users will have to resort to the callbacks-based usage pattern provided by web_sys::WebSocket type.
  • users may resort to leaking the memory of their WebSocket callbacks which are passed to JS land as there is no current abstraction provided for storing the callbacks.
  • there is some complexity in extracting binary data from a socket frame, with stumbling blocks which could lead to runtime panics.
  • there is currently no support for automatically reconnecting a WebSocket when a connection is lost.

Detailed Explanation

See the original discussion over here in the seed project.

Expand each of the sections below for more details on the two types of WebSocket abstractions being proposed. Everything here has been updated per our discussions in this thread up until this point in time.

callbacks / events based design

websocket with callbacks / events

The plan here is to build on top of the gloo_events crate, which has not yet landed as of 2019.04.04, but which should be landing quite soon.

The essential idea is that we build a new WebSocket type which exposes a builder pattern for creating new instances. This will allow users to easily register their callbacks while building the WebSocket instance. The constructor will then wrap any given callbacks in a gloo_events::EventHandler, register the event handler on the appropriate event coming from the web_sys::WebSocket, and will then retain the event handler for proper cleanup when the WebSocket is dropped.

This abstraction will also enable the futures based WebSocket type (discussed below) to easily build upon this events based WebSocket type.

builder interface example

// Begin building a new WebSocket instance.
// The given `on*` closures will be wrapped by `gloo_events::EventHandler`s.
let ws = WebSocket::connect("/ws")
    // Any protocols which the user would like this connection to use may be specified here.
    .protocols(&["custom.proto.com"])

    // For `onmessage`, users register an FnMut which takes the unpacked
    // data from the received frame. The `WsMessage` enum instance indicates
    // if the message was text or binary. See below for the reasoning behind this.
    .onmessage(|event: WsMessage| ())

    // The other `on*` handlers will be given the raw event object.
    .onopen(|event: Event| ())
    .onerror(|event: Event| ())
    .onclose(|event: Event| ())

    // By default, instances will be configured to reconnect with our `ReconnectConfig::default()`.
    // However, for more control, users may specify customized reconnect config.
    .reconnect(ReconnectConfig)

    // If users want this instance to not perform reconnects at all, this must be called.
    .no_reconnect()

    .build(); // Finalize the build, returning a new WebSocket instance.

/// An enumeration of the different types of WebSocket messages which may be received.
/// 
/// The gloo WebSocket will unpack received messages and present them as the appropriate variant.
pub enum WsMessage {
    Text(String),
    Binary(Vec<u8>),
}

reconnecting

Reconnects will be handled by inserting some logic in the onclose handler (even if a user is has not registered an onclose callback). We will probably just use the Window.set_timeout_with_callback_and_timeout_and_arguments_0 for scheduling the reconnect events based on the backoff algorithm.

We will blow away the old web_sys::WebSocket, build the new one and register the same original EventHandlers. When the new connection opens, we will update the retry state as needed. If an error takes place and the new WebSocket is closed before it ever goes into an open state, we will proceed with the retry algorithm.

We will most likely just use Rc<Cell<_>> or Rc<RefCell<_>> on the web_sys::WebSocket and the reconnect config instance for internally mutating them.

futures based design

websocket with futures

This type will implement the futures Stream + Sink traits which will allow for futures-based reading and writing on the underlying WebSocket, and is built on top of the gloo_events based type described above.

We are also planning to implement AsyncRead + AsyncWrite on this type (per @yoshuawuyts recommendation) which will allow folks to use the tokio_codec::{Encode, Decode} traits for more robust framing on top of the WebSocket.

A set of Rust enums are used for representing the various event types and variants, as all events will come through an internally held futures::sync::mpsc::UnboundedReceiver.

/// An enumeration of all possible event types coming from the underlying WebSocket.
///
/// These are all sent through a futures unbounded channel which the callbacks
/// publish to from within the callbacks-based WebSocket type.
pub enum WSEvent {
    Open(Event),
    Message(WsMessage), // WsMessage is described in the callbacks-based section above.
    Error(Event),
    Close(Event),
}

A few examples of how to build an instance of this futures based type.

// Will build an instance with the default reconnect config.
let ws = WebSocket::connect("/ws");

// Will build an instance which is not configured to reconnect.
let ws = WebSocket::no_reconnect("/ws");

// Will build an instance which uses a custom reconnect config.
let ws = WebSocket::custom_reconnect("/ws", cfg);

// Use sink to send messages & stream to receive messages.
// This comes from https://docs.rs/futures/0.1.25/futures/stream/trait.Stream.html#method.split
let (sink, stream) = ws.split();

// This type also impls AsyncRead + AsyncWrite, so that users can implement
// their own framing protocols (thanks @yoshuawuyts).
use MyEncoderDecoder; // User's custom Encoder + Decoder.
let proto_stream = MyEncoderDecoder::framed(ws)?;
let (mut reader, mut writer) = proto_stream.split(); // Framed reader & writer.

Stream + Sink & AsyncRead + AsyncWrite

Most of this work is actually done. A pull request will be open soon so that we can start looking at an actual implementation.

reconnecting

Reconnecting will actually be handled by the underlying events-based WebSocket instance. Nothing new should need to be implemented here.

Unresolved Questions

  • For the futures based type: what do we want to communicate to users of the AsyncRead + AsyncWrite impls, inasmuch as they would need to treat all data sent and received as binary data (server side included)?
  • This only applies to reconnect-enabled futures-based instances: do we want to buffer outbound frames? If so, for how long? Else, should we just return Ok(AsyncSink::NotReady(msg)) when the underlying WebSocket is being rebuilt?
  • Need to discuss the reconnect config system that we will use. Should we implement our own, or should we use some exponential backoff crate already available?
@Pauan
Copy link
Contributor

Pauan commented Mar 28, 2019

The callbacks can be simply stored in an Rc wrapped inside of an enum to account for function type variance (the Rc may not be needed). This is what I did. Worked out pretty well.

With gloo-events, it will be possible to store them directly, as EventListener:

struct WebSocket {
    events: Vec<EventListener>,
}

This makes it a bit tricky to remove a particular event though, so you'd probably need Rc + RefCell for that.

The only potential drawback discussed so far is the name. web-sys uses the name Websocket and the type being proposed here would bear the same name.

Is that a downside? Do you think users will get confused between web_sys::WebSocket and gloo::websockets::WebSocket?

@thedodd
Copy link
Author

thedodd commented Mar 28, 2019

@Pauan as far as storage of event listeners, that sounds great.

As far as the name being a potential drawback, I honestly don't think it will be an issue. It was the only thing that I could think of as a detractor though :).

Thanks for the feedback!

@najamelan
Copy link

@thehdodd you could have a look at #25

@fitzgen
Copy link
Member

fitzgen commented Mar 29, 2019

@thedodd were you going to add a sketch of types and function/method signatures to this design proposal, as discussed in the WG meeting?

@thedodd
Copy link
Author

thedodd commented Mar 29, 2019

@fitzgen yes, will do. I added a TODO item at the bottom of the description :). Will hopefully be able to get around to it shortly.

@thedodd
Copy link
Author

thedodd commented Apr 2, 2019

@fitzgen @Pauan here are the thoughts on the design so far. This essentially communicates what I had in mind, and it is compatible with stable rust.

The main thing I am looking for feedback on is what you all think of the SplitSink & SplitStream as the main abstraction for using this type (as most users will want to read from and write to the socket). It is pretty much perfectly in line with the futures ecosystem, so I suspect this wont be a big deal. However, any and all feedback is welcome.

I will update the main body of this issue based on our discussion here.

EDIT: a distillation of this content has been moved up to the opening body of this issue. The original content here has been preserved, but put into a collapsible section for brevity.

Original Design Post

A high-level futures-based API for Websockets built on top of web-sys.

This type will implement both futures Stream & Sink which will allow for futures-based reading and writing on the underlying Websocket.

The internal interface of this type handles all aspects of the underlying Websocket. The type will handle all events coming from the underlying Websocket in order to handle reconnects. A set of Rust enums are used for representing the various event types and variants, as well as the Websocket state. In order to interface with the web-sys callback-based event handling, this type uses futures::mpsc channels to receive the events coming from the underlying Websocket.

initial design

The following is an example of how to build an instance of the gloo Websocket type.

// Build a new Websocket instance.
// EDIT: Per some initial discussion, the second argument is a place holder
// for some exponential backoff pattern. Needs more discussion, but the idea
// is that this is where reconnect patterns are configured.
let ws = Websocket::new("wss://api.example.com/ws", Some(5));

// Use sink to send messages & stream to receive messages.
// This comes from https://docs.rs/futures/0.1.25/futures/stream/trait.Stream.html#method.split
let (sink, stream) = ws.split();

Internally, the Websocket will look something like this.

struct Websocket {
    /// The underlying Websocket instance.
    /// 
    /// If this instance is configured to reconnect, this web-sys::Websocket will be swapped out
    /// on reconnects.
    ws: web_sys::Websocket,

    /// The optional configuration for handling reconnects.
    reconnect: Option<u32>,

    /// The channel receiver used for streaming in the events from the underlying Websocket.
    /// 
    /// The sending side is used when building the 4 `on_*` closures sent over to JS land. We do
    /// no retain it as we should never need it again after this type is built.
    receiver: UnboundedReceiver<WSEvent>,

    /// An array of the already cast wasm-bindgen closures used internally by this type.
    /// 
    /// Their ordering is as follows:
    /// 
    /// 1. on_message
    /// 2. on_open
    /// 3. on_error
    /// 4. on_close
    /// 
    /// **NB:** The ordering here is very important. In order to avoid having to recast the
    /// various closures when we need to reconnect, we store the 4 different closures as
    /// `Rc<js_sys::Function>`s and then we ensure that we pass them to the appropriate handlers
    /// during reconnect.
    /// 
    /// ALTERNATIVELY: we could just store these if four different fields as their closure types
    /// and then re-cast whenever we need to do a reconnect.
    callbacks_internal: [Rc<js_sys::Function>; 4],

    /// For non-reconnecting instances, this will be true when the underlying Websocket is closed.
    /// 
    /// At that point in time, the next iteration of this instance's stream will return `None` &
    /// any attempts to send messages via this sink will immediately return an error.
    is_closed: bool,
}

enum WSEvent {
    Open(Event),
    Message(WSMessage),
    Error(Event),
    Close(Event),
}

/// An enumeration of the different types of Websocket messages which may be received.
/// 
/// The gloo Websocket will unpack received messages and present them as the appropriate variant.
enum WSMessage {
    Text(String),
    Binary(Vec<u8>),
}

stream | sink | split

Given the above types, we can implement Stream & Sink on Websocket so that it may be used for sending and receiving Websocket messages asynchronosly.

sink

Sink will be a very simple implementation. We will implement Sink over WSMessage, as shown above, which will allow users to send simple string based messages, and will also allow them to send more sophisticated binary data using the same Sink interface.

Ultimately, no buffering will be employed by this sink implementation. Reconnecting instances will simply return NotReady when the underlying Websocket is being rebuilt per an error or close event. Any call to start_send will start and finish the send operation on the underlying Websocket.

stream

Stream will also be a very simple implementation. It will mutably borrow the mpsc::UnboundedReceiver which is encapsulated by this type. All events from the underlying Websocket will come through this stream.

split

Many users of this type will need to read from and write to the Websocket. Use of Stream.split is the recommended way of managing this. The two handles to this type may then be used throughout the user's app as needed.

reconnecting

It would seem that the only logic location for the reconnect logic to be driven from would be the stream impl. The stream is the location where error & close events from the underlying Websocket will be detected. This means that the Websocket stream must be polled in order for the reconnect functionality to work. I suspect this will hardly be an issue as most users will want to be reading from the stream already, and for those whom do not, spawning a .for_each() future is simple enough.

@yoshuawuyts
Copy link
Collaborator

yoshuawuyts commented Apr 2, 2019

edit: my post here is about a high-level API. If the proposal was about a mid-level API, then I'm sorry if this is slightly derailing things!

So something that comes to mind with WebSockets is when using it people will likely want to add their own framing on top to convert raw messages into actual structs. It could be as simple a simple step such as decoding some json, but perhaps also more involved with custom headers and parsing steps.

A crate such as tokio-codec allows creating reusable parsers through the Encoder and Decoder traits that convert an AsyncReader -> Encoder -> Stream, and Sink -> Decoder -> AsyncWriter.

In practice this means that means that if our websocket abstraction could be a duplex of AsyncRead and AsyncWrite, all userland parsers that know how to convert those to duplex streams would work out of the box, with little to no changes needed. Essentially turning a websocket into something more closely resembling OS sockets.

Examples

echo client
receive, decode, re-encode, and send back

use my_protocol;

let socket = await? WebSocket::connect("/ws");
let proto_stream = my_protocol::frame(socket)?;

let (mut reader, mut writer) = proto_stream.split();
await? reader.write_all(&mut writer);

print client
receive, parse, and print to console

use my_protocol;

let socket = await? WebSocket::connect("/ws");
let proto_stream = my_protocol::frame(socket)?;

for await? item in proto_stream {
    println!("msg received {:?}", item?);
}

@thedodd
Copy link
Author

thedodd commented Apr 3, 2019

@yoshuawuyts that is excellent feed back. Those traits would not only give us the benefits outlined here, but would also add the benefits you’ve outlined.

Ok, I’ll update the design with that in mind. I think that is in line with what @najamelan was doing as well.

@Pauan
Copy link
Contributor

Pauan commented Apr 3, 2019

The second optional argument will cause the Websocket to handle disconnects by automatically attempting to reconnect at the given interval in seconds.

I normally try to avoid harsh language, but this seems really wrong.

Imagine a server is connected to 1 million WebSocket clients (which is not unreasonable). The server goes down (maybe for routine maintenance, maybe a crash).

All 1 million WebSocket clients attempt to reconnect at the same time. This of course fails, so then they try to reconnect again 5 seconds later, then another 5 seconds later...

This causes an incredibly massive "thundering herd" of reconnection attempts which overwhelms the network, which can cause other servers to fail under the pressure, which then causes a further cascade of server crashes...

The correct thing to do is to use exponential backoff to progressively slow down the rate of reconnecting.

It also needs to use some randomization to prevent the "thundering herd" problem of millions of clients attempting to reconnect at the same time.

Because this is such an important problem, and it's hard to get it right, we should just Do The Right Thing and handle the exponential backoff internally.

(Exponential backoff is also used in other areas, for example to prevent lock contention in databases.)

enum WSEvent

Is there any need for the Close event? Can't that be handled by the Stream returning None?

Similarly, I imagine Error isn't needed either, since that will just cause the WebSocket to retry.

Is there any use case for Open, or can we just remove WSEvent entirely?

Ultimately, no buffering will be employed by this sink implementation.

Why not? Since reconnecting will be an important use case, that essentially means that users will have to implement their own buffering strategy, which doesn't seem better than having it be built-in.

Reconnecting instances will simply return NotReady when the underlying Websocket is being rebuilt per an error or close event.

I don't see NotReady mentioned anywhere.

The stream is the location where error & close events from the underlying Websocket will be detected. This means that the Websocket stream must be polled in order for the reconnect functionality to work.

It should be possible to handle that all internally, inside of the actual error and close event callbacks.

@Pauan
Copy link
Contributor

Pauan commented Apr 3, 2019

I suppose one benefit of the Open and Close enum variants is that they (potentially) allow for the application to show a message/loading screen when the WebSocket goes down.

But in that case it needs to pass more information, such as how long until the next retry.

@thedodd
Copy link
Author

thedodd commented Apr 3, 2019

retry / backoff

Yes, I almost included exponential backoff in the design above. It was the first thing I reached for, but decided to go with a more simple proposal so that we could focus on the details of the WebSocket stream+sink abstraction first. I'll go ahead and add an // EDIT: item to the above design about using exp backoff.

WSEvent

With this model, the WSEvent enum will be used by the callbacks given to the web_sys::WebSocket. These closures pass their events over the unbounded sender which corresponds to the receiver you see above. They use WSEvent so that they can all use the same mpsc channel. I mention all of this in the comment above.

The variants of that enum are used to drive the logic for reconnect, disconnect &c. That is why they are needed. I was planning on forwarding those events as well so that users can trigger custom events in their apps.

sink buffering

The choice not to buffer is due to the nature of the type of messages. They are frames to be sent over the socket, sure, but they are analogous to a network request. If we choose to buffer, then we need to consider setting up timeouts on the buffered frames, and this adds a lot of complexity. Read on ...

The bit about NotReady is in the context of using Sink, where I also mention Sink.start_send(). The Sink trait's start_send must return a result of AsyncSink. Its NotReady variant is generic over the Sink's item type, and will return the element which was given to be sent through the sink.

Failing a user initiated network request when the network is disconnected seems pretty logical. It draws attention to the problem immediately. If we buffer, it will cause the appearance of the request being in progress and simply waiting for a response; where in reality, the request has not even been sent yet. IMHO, better to just fail the request immediately, remove the perceived latency, and just return NotReady(msg) from the start_send(msg) call.

As these sorts of things go, it is going to be six one way, and half a dozen the other. Different apps have different needs ... so, see the next section.

builder pattern

I was also considering introducing a builder pattern for the WebSocket at first so that we could do more complex configuration based on reconnects / buffering &c. @Pauan if you think we really need to support Sink buffering and some of the other WG folks agree, I am happy to put together a design which includes a builder pattern which will allow us to more clearly configure things like buffering, buffer timeouts, exponential backoff config and the like.

Thoughts?

@thedodd
Copy link
Author

thedodd commented Apr 3, 2019

@yoshuawuyts another thing which I take from your comment above is that we may want to support short-hand connection strings. IE, if a user provides a connection string of "/ws", we will parse it and see that it is not a valid url, and in such a case, we will use the window's proto & host, and then append the "/ws" to the end.

Is this something you think we should look into as well?

@thedodd
Copy link
Author

thedodd commented Apr 3, 2019

@yoshuawuyts a concern that I have about implementing AsyncRead + AsyncWrite on this type is that we would be forced to deal with all data sent and received as binary data, essentially discarding a frame's opcode indicating if it is a string or bytes frame. The encoder expects to be able to send all data as bytes, and the decoder expects to decode an item out of a bytes buffer.

We can certainly do this, but we will have to communicate a disclaimer to users that all data sent via the AsyncWrite will be framed with the binary opcode in the WebSocket frame. Similarly, we will have to make a choice on how to handle messages from the AsyncRead side. Should we just shoehorn all string and binary frames into the bytes buf? We can, but we will have to communicate this.

The implementation is simple enough, I just wanted to bring it up. I'm certainly ok with this, but I just wanted to make sure we are all on the same page here. Thoughts?


Outside of the context of AsyncRead + AsyncWrite, users could simply filter the standard Stream<WSEvent> to extract only messages, and then from there, and_then the messages onto a decoder which will extract their application data (like protobuf or whatever). Similarly with sending messages, they can simply encode their data and pass it to the Sink as a WSMessage::Binary(data).

@najamelan
Copy link

najamelan commented Apr 3, 2019 via email

@thedodd
Copy link
Author

thedodd commented Apr 3, 2019

@najamelan you're good. Definitely not spamming :). I read through the code there, and it looks like it is all implemented for futures 0.3 & nightly async/await. I noticed a few things as well which appeared to be unimplemented and such, so I wasn't sure where that effort was at.

IIRC, we are trying to keep things on stable, so that might be problematic. Have you done anything with reconnects? If not, perhaps that is something we can cover here as well. Let me know, and def don't worry about spamming. Everything you've said so far is definitely pertinent.

@thedodd
Copy link
Author

thedodd commented Apr 3, 2019

Ok, I've update the body of this issue to reflect the discussion so far, including @yoshuawuyts AsyncRead+AsyncWrite recommendations and @Pauan's exponential backoff recommendation.

@yoshuawuyts
Copy link
Collaborator

@thedodd Good questsions! Weird idea: implement AsyncRead + AsyncWrite + Stream + Sink on the socket, and use it to allow passing down bytes through io, and strings through the stream.

Not sure if that'd be fantastic or terrible. But perhaps worth considering?

@thedodd
Copy link
Author

thedodd commented Apr 3, 2019

@yoshuawuyts glad to hear you say that, because that’s exactly what I am doing right now 🙌

I’ll have PR up tomorrow.

@fitzgen
Copy link
Member

fitzgen commented Apr 3, 2019

Does it make sense in this case to have a mid-level API in between some higher-level futures-y/streams-y/channel-y API and web-sys that gives the WebSocket API but with F: FnMut(..) callbacks isntead of &js_sys::Function and uses gloo_events internally?

@yoshuawuyts
Copy link
Collaborator

@fitzgen The binding logic will need to be written anyway as part of the higher-level API, so creating an intermediate API as a basis to build the streams on seems like good engineering practice. 👍

@thedodd
Copy link
Author

thedodd commented Apr 4, 2019

@fitzgen & @yoshuawuyts so, yes. We can definitely do that. Building on top of the gloo_events system is definitely a good idea.

We will probably want two crates for this, one for each. Thoughts? I'll update the body of the issue above with this info, and then put together details on the callbacks-based API.

@fitzgen
Copy link
Member

fitzgen commented Apr 4, 2019

We will probably want two crates for this, one for each. Thoughts? I'll update the body of the issue above with this info, and then put together details on the callbacks-based API.

We've been doing a crate per-API, which exposes multiple submodules for different levels/layers of that API (eg a submodule for callbacks, and another submodule for futures). Unless we have strong motivation otherwise in this case, I think we should be consistent and do that.

@thedodd
Copy link
Author

thedodd commented Apr 4, 2019

Sounds good! Will do.

@thedodd
Copy link
Author

thedodd commented Apr 4, 2019

@fitzgen @Pauan @yoshuawuyts hey all, just wanted to give a heads-up that I've updated the body of this issue (the very first card) with details and refinements based on our discussions so far.

I've also organized the two proposals under collapsible sections so that we can more easily navigate and read the proposal overall. I have implemented much of code already. I'll have a PR open soon (in a WIP state, of course) so that we can begin looking at this in more depth.

Any and all feedback is welcome.

EDIT: so based on the updates to the CONTRIBUTING workflow, it looks like I shouldn't open a PR yet. That's fine. I've already written a lot of the code just to explore the possibilities we've been discussing, but I don't mind holding off on the PR. Let me know.

@yoshuawuyts
Copy link
Collaborator

@thedodd Regarding the high-level API: I think given how web pages work, it would probably make sense to keep the connection open as long as the page is open, and try and reconnect if it isn't. I think it'd be nice if people could get this for free without needing to think about it, and provide an escape hatch for when they want to configure everything manually.

In terms of API I'd propose:

  • WebSocket::connect("/ws"); provides reconnects
  • WebSocket::builder("/ws").build(); does not by default, but provides a .reconnect() method that can be set to pass a strategy. This API could be added later, also.

By building the API this way around, we also open ourselves up to later improve the default reconnect behavior. E.g. we could become clever about detecting network loss, and hold off on reconnecting until connectivity is restored. Or find some other heuristics that might be useful to go by.

Also perhaps we should consider having reconnect strategies that can be shared between other network modules?

@thedodd
Copy link
Author

thedodd commented Apr 5, 2019

@yoshuawuyts that's a good call.

For folks that really need predictability on that front, they can use a custom config, else they will get the default reconnect config.

Perhaps we should make it even more clear about opting out of reconnects by doing something like: WebSocket::builder("/ws").no_reconnect().build(). This would give us a more uniform builder pattern as well.

EDIT: so at this point, I'm thinking we should have a few different more simple constructors for the high-level type. Mainly because using a builder pattern for this when it is only one parameter which could change seems ... not so great. How about this:

  • WebSocket::connect("/ws"): uses default reconnect config.
  • WebSocket::no_reconnect("/ws"): creates an instance which will not attempt to reconnect.
  • WebSocket::custom_reconnect("/ws", cfg): creates an instance with custom reconnect config.

Thoughts?

@thedodd
Copy link
Author

thedodd commented Apr 5, 2019

@yoshuawuyts & @Pauan two additional items which come to mind as I've been building out the mid-level API:

  • First, we need to allow the users to specify subprotocols on the connection. This is already supported by the web_sys::WebSocket, we will just need to pass the information through. Already done in the code for the mid-level API, and is updated in the design spec above. We just need to determine how we want to incorporate this into the high-level builder (for the futures type).
    Basically what this indicates to me is that perhaps we should just use a builder type exactly like the one for the mid-level type (but without callback, obvs). See mid-level builder details above.
  • Second, there is really only one place where we are dealing with fallible method calls. One of them is the call to build the web_sys::WebSocket, which returns Result<Self, JsValue>. Forcing users of Gloo to deal with JsValue errors seems antithetical to Gloo's goals (particularly the Idiomatic goal).
    So, should we create a new error type which can wrap JsValue errors coming from the Web APIs which all of the Gloo child crates can use? We can pop another issue for this if we don't already have a solution in place.

@thedodd
Copy link
Author

thedodd commented Apr 5, 2019

I have some code in place. It is not ready for peer review, and there is plenty of work to be done, but this will help to coordinate our design discussion as we move forward.

Once our design session has solidified, we can finish up implementation & open the PR against this repo.

https://github.com/thedodd/gloo/pull/1/files

@Pauan
Copy link
Contributor

Pauan commented Apr 6, 2019

Thanks, this is looking a lot better!

If we buffer, it will cause the appearance of the request being in progress and simply waiting for a response

I think I should clarify how this all fits together. The Sink trait is exactly intended for these situations where there may be some delay before sending. The documentation says:

Sending to a sink is "asynchronous" in the sense that the
value may not be sent in its entirety immediately.

Instead, values are sent in a two-phase way: first by
initiating a send, and then by polling for completion.

This two-phase setup is analogous to buffered writing in
synchronous code, where writes often succeed immediately,
but internally are buffered and are actually written only
upon flushing.

The Sink trait is absolutely intended to handle buffering (since almost everything uses buffering!)

The way it works is that when start_send is called, it will start buffering the message (it returns AsyncSink::NotReady if the buffer is full, not if the message cannot be sent right away!)

After that poll_complete is called, which says when the message has been fully sent (so it would return Async::NotReady if the network is down).

However, those are internal methods used by the implementation of Sink. Instead, users would normally use Sink::send, which attempts to send the message and returns a Future which will resolve with the Sink after the message has been sent.

As an example of how it would look like with async/await syntax:

let ws = WebSocket::connect("/ws");

let (sink, stream) = ws.split();

// Attempts to send the message and waits for it to complete
let sink = await!(sink.send(WsMessage::Text(...)));

// Attempts to send another message and waits for it to complete
let sink = await!(sink.send(WsMessage::Text(...)));

// Attempts to send multiple messages and waits for them all to complete
let (sink, _) = await!(sink.send_all(iter_ok::<_, ()>(vec![
    WsMessage::Text(...),
    WsMessage::Text(...),
    WsMessage::Text(...),
])));

Because it returns a Future, it's quite clear that the message will take some time to send (and you have to wait for the Future to resolve in order to be sure that the message has been sent).

So in the case of a network failure, sink.send would simply stop and wait until the network is back. It fits perfectly with the retry semantics.

(Of course you can use various things to make it happen in parallel if you want to, but the default is sequential)

If the user wishes to put a timeout for the message send, they can, but in that case they would use a generic timeout system which works with any Future:

let sink = await!(Timeout::new(10000, sink.send(WsMessage::Text(...))));

There's no need to build in timeouts into WebSocket, since it is handled externally.

So, should we create a new error type which can wrap JsValue errors coming from the Web APIs which all of the Gloo child crates can use?

Yes, absolutely. Usually this would be handled by creating a Rust enum and then mapping from the JsValue into it (and back again).

@richard-uk1
Copy link
Contributor

richard-uk1 commented Jan 7, 2020

I've made a websocket abstraction for myself, and I thought I'd share it here in case it can serve as inspiration.

What I've come to realise is that the websocket api maps pretty cleanly to futures::Sink and futures::Stream, and so implemented a mapping between them.

A change I definitely want to make is to make the new function synchronous, and use poll_ready to indicate when to start writing messages.

websocket abstraction using futures
//! Websocket client wrapper

use ::{
    futures::{
        channel::{mpsc, oneshot},
        prelude::*,
        select,
        stream::FusedStream,
    },
    gloo::events::EventListener,
    std::{
        pin::Pin,
        task::{Context, Poll},
    },
    wasm_bindgen::{prelude::*, JsCast},
};

#[derive(Debug)]
pub struct WebSocket {
    inner: web_sys::WebSocket,
    close_listener: EventListener,
    close_rx: oneshot::Receiver<()>,
    pending_close: Option<()>,
    message_listener: EventListener,
    message_rx: mpsc::UnboundedReceiver<JsValue>,
    pending_message: Option<JsValue>,
    error_listener: EventListener,
    error_rx: mpsc::UnboundedReceiver<JsValue>,
    closed: bool,
}

impl WebSocket {
    /// Try to connect to the given url.
    ///
    /// Currently the future returned by this function eagerly initiates the connection before it
    /// is polled.
    pub async fn new(url: &str) -> Result<Self, JsValue> {
        let inner = web_sys::WebSocket::new(url)?;
        inner.set_binary_type(web_sys::BinaryType::Arraybuffer);

        let (open_tx, open_rx) = oneshot::channel();
        let open_listener = EventListener::once(&inner, "open", move |_| {
            open_tx.send(()).unwrap_throw();
        });

        let (error_tx, mut error_rx) = mpsc::unbounded();
        let error_listener = EventListener::new(&inner, "error", move |event| {
            error_tx
                .clone()
                .unbounded_send((***event).to_owned())
                .unwrap_throw();
        });

        select! {
            _ = open_rx.fuse() => (),
            err = error_rx.next() => {
                return Err(err.unwrap_throw());
            },
        };

        let (message_tx, message_rx) = mpsc::unbounded();
        let message_listener = EventListener::new(&inner, "message", move |event| {
            let message: &web_sys::MessageEvent = event.dyn_ref().unwrap_throw();
            message_tx
                .clone()
                .unbounded_send(message.data())
                .unwrap_throw();
        });

        let (close_tx, close_rx) = oneshot::channel();
        let close_listener = EventListener::once(&inner, "close", move |event| {
            close_tx.send(()).unwrap_throw();
        });

        Ok(WebSocket {
            inner,
            close_listener,
            close_rx,
            pending_close: None,
            message_listener,
            message_rx,
            pending_message: None,
            error_listener,
            error_rx,
            closed: false,
        })
    }

    /// Initiate closing of the connection. The websocket should only be dropped once the stream
    /// has been exhausted.
    pub fn close(self) {
        self.inner.close().expect_throw(
            "we are not using code or reason, so they cannot be incorrectly formatted",
        );
    }
}

impl Stream for WebSocket {
    type Item = Result<Vec<u8>, JsValue>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        if self.closed {
            return Poll::Ready(None);
        }

        if let Some(msg) = self.pending_message.take() {
            return Poll::Ready(Some(Ok(decode(msg))));
        }

        // check this last so that the last event is a close event.
        if let Some(msg) = self.pending_close.take() {
            self.closed = true;
            return Poll::Ready(None);
        }

        match (
            Stream::poll_next(Pin::new(&mut self.error_rx), cx),
            Stream::poll_next(Pin::new(&mut self.message_rx), cx),
            Future::poll(Pin::new(&mut self.close_rx), cx),
        ) {
            (Poll::Ready(error), message_poll, close_poll) => {
                let error =
                    error.expect_throw("the error channel should never be polled when closed");
                if let Poll::Ready(msg) = message_poll {
                    self.pending_message = Some(
                        msg.expect_throw("the message channel should never be polled when closed"),
                    ); // we know the old value is none from before.
                }
                if let Poll::Ready(close) = close_poll {
                    self.pending_close = Some(
                        close.expect_throw("the close channel should never be polled when closed"),
                    );
                }
                Poll::Ready(Some(Err(error)))
            }
            (Poll::Pending, Poll::Ready(message), close_poll) => {
                let message =
                    message.expect_throw("the message channel should never be polled when closed");
                if let Poll::Ready(close) = close_poll {
                    self.pending_close = Some(
                        close.expect_throw("the close channel should never be polled when closed"),
                    );
                }
                Poll::Ready(Some(Ok(decode(message))))
            }
            (Poll::Pending, Poll::Pending, Poll::Ready(_)) => {
                self.closed = true;
                Poll::Ready(None)
            }
            (Poll::Pending, Poll::Pending, Poll::Pending) => Poll::Pending,
        }
    }
}

impl FusedStream for WebSocket {
    fn is_terminated(&self) -> bool {
        self.closed
    }
}

impl Sink<Vec<u8>> for WebSocket {
    type Error = JsValue;

    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        // because we only return a websocket once setup is complete, this always returns Ok. todo
        // investigate getting rid of async in `connect` and instead using this function.
        Poll::Ready(Ok(()))
    }

    fn start_send(self: Pin<&mut Self>, mut item: Vec<u8>) -> Result<(), Self::Error> {
        self.inner.send_with_u8_array(&mut item)
    }

    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        // once we have sent a message, we get no confirmation of whether sending was successful.
        Poll::Ready(Ok(()))
    }

    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        // todo investigate sending the close message.
        Poll::Ready(Ok(()))
    }
}

/// Get byte data out of a JsValue
fn decode(val: JsValue) -> Vec<u8> {
    js_sys::Uint8Array::new(&val).to_vec()
}

@richard-uk1
Copy link
Contributor

Writing this was hard and time-consuming, and so I would definitely have appreciated it if gloo had provided an abstraction I could just use! :)

@najamelan
Copy link

@derekdreery I think you are looking for ws_stream_wasm.

@richard-uk1
Copy link
Contributor

@najamelan cool, would you think about helping with the gloo effort, to get something into here?

@najamelan
Copy link

@derekdreery I tried, but the webassembly wg preferred to roll their own version, of which this issue is the result.

If gloo wants to adopt ws_stream_wasm, that's fine by me, and if they intend to maintain it and keep it working, they can even run rustfmt on it. It can be renamed and I can deprecate the current crate.

Or it could be just renamed to gloo-websocket, move in this repository and I continue to maintain it. Currently it does depend on a server backend in ws_stream_tungstenite for testing and the two crates make sense to be used in tandem, since otherwise there is no websocket crate that provides AsyncRead/AsyncWrite on the server side.

@ranile
Copy link
Collaborator

ranile commented Aug 20, 2021

I've been implementing this for reqwasm in ranile/reqwasm#4 which can then potentially be re-exported by gloo. I've also mentioned about re-exporting here: #4 (comment)

PS: It'd be great if anyone could review that PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants