Giter VIP home page Giter VIP logo

Comments (35)

yoshuawuyts avatar yoshuawuyts commented on July 30, 2024 2

@thedodd Regarding the high-level API: I think given how web pages work, it would probably make sense to keep the connection open as long as the page is open, and try and reconnect if it isn't. I think it'd be nice if people could get this for free without needing to think about it, and provide an escape hatch for when they want to configure everything manually.

In terms of API I'd propose:

  • WebSocket::connect("/ws"); provides reconnects
  • WebSocket::builder("/ws").build(); does not by default, but provides a .reconnect() method that can be set to pass a strategy. This API could be added later, also.

By building the API this way around, we also open ourselves up to later improve the default reconnect behavior. E.g. we could become clever about detecting network loss, and hold off on reconnecting until connectivity is restored. Or find some other heuristics that might be useful to go by.

Also perhaps we should consider having reconnect strategies that can be shared between other network modules?

from gloo.

ranile avatar ranile commented on July 30, 2024 2

I've been implementing this for reqwasm in ranile/reqwasm#4 which can then potentially be re-exported by gloo. I've also mentioned about re-exporting here: #4 (comment)

PS: It'd be great if anyone could review that PR.

from gloo.

Pauan avatar Pauan commented on July 30, 2024 1

The callbacks can be simply stored in an Rc wrapped inside of an enum to account for function type variance (the Rc may not be needed). This is what I did. Worked out pretty well.

With gloo-events, it will be possible to store them directly, as EventListener:

struct WebSocket {
    events: Vec<EventListener>,
}

This makes it a bit tricky to remove a particular event though, so you'd probably need Rc + RefCell for that.

The only potential drawback discussed so far is the name. web-sys uses the name Websocket and the type being proposed here would bear the same name.

Is that a downside? Do you think users will get confused between web_sys::WebSocket and gloo::websockets::WebSocket?

from gloo.

yoshuawuyts avatar yoshuawuyts commented on July 30, 2024 1

edit: my post here is about a high-level API. If the proposal was about a mid-level API, then I'm sorry if this is slightly derailing things!

So something that comes to mind with WebSockets is when using it people will likely want to add their own framing on top to convert raw messages into actual structs. It could be as simple a simple step such as decoding some json, but perhaps also more involved with custom headers and parsing steps.

A crate such as tokio-codec allows creating reusable parsers through the Encoder and Decoder traits that convert an AsyncReader -> Encoder -> Stream, and Sink -> Decoder -> AsyncWriter.

In practice this means that means that if our websocket abstraction could be a duplex of AsyncRead and AsyncWrite, all userland parsers that know how to convert those to duplex streams would work out of the box, with little to no changes needed. Essentially turning a websocket into something more closely resembling OS sockets.

Examples

echo client
receive, decode, re-encode, and send back

use my_protocol;

let socket = await? WebSocket::connect("/ws");
let proto_stream = my_protocol::frame(socket)?;

let (mut reader, mut writer) = proto_stream.split();
await? reader.write_all(&mut writer);

print client
receive, parse, and print to console

use my_protocol;

let socket = await? WebSocket::connect("/ws");
let proto_stream = my_protocol::frame(socket)?;

for await? item in proto_stream {
    println!("msg received {:?}", item?);
}

from gloo.

thedodd avatar thedodd commented on July 30, 2024 1

@yoshuawuyts that is excellent feed back. Those traits would not only give us the benefits outlined here, but would also add the benefits you’ve outlined.

Ok, I’ll update the design with that in mind. I think that is in line with what @najamelan was doing as well.

from gloo.

thedodd avatar thedodd commented on July 30, 2024 1

Ok, I've update the body of this issue to reflect the discussion so far, including @yoshuawuyts AsyncRead+AsyncWrite recommendations and @Pauan's exponential backoff recommendation.

from gloo.

yoshuawuyts avatar yoshuawuyts commented on July 30, 2024 1

@thedodd Good questsions! Weird idea: implement AsyncRead + AsyncWrite + Stream + Sink on the socket, and use it to allow passing down bytes through io, and strings through the stream.

Not sure if that'd be fantastic or terrible. But perhaps worth considering?

from gloo.

thedodd avatar thedodd commented on July 30, 2024 1

@fitzgen & @yoshuawuyts so, yes. We can definitely do that. Building on top of the gloo_events system is definitely a good idea.

We will probably want two crates for this, one for each. Thoughts? I'll update the body of the issue above with this info, and then put together details on the callbacks-based API.

from gloo.

fitzgen avatar fitzgen commented on July 30, 2024 1

We will probably want two crates for this, one for each. Thoughts? I'll update the body of the issue above with this info, and then put together details on the callbacks-based API.

We've been doing a crate per-API, which exposes multiple submodules for different levels/layers of that API (eg a submodule for callbacks, and another submodule for futures). Unless we have strong motivation otherwise in this case, I think we should be consistent and do that.

from gloo.

najamelan avatar najamelan commented on July 30, 2024 1

@derekdreery I think you are looking for ws_stream_wasm.

from gloo.

thedodd avatar thedodd commented on July 30, 2024

@Pauan as far as storage of event listeners, that sounds great.

As far as the name being a potential drawback, I honestly don't think it will be an issue. It was the only thing that I could think of as a detractor though :).

Thanks for the feedback!

from gloo.

najamelan avatar najamelan commented on July 30, 2024

@thehdodd you could have a look at #25

from gloo.

fitzgen avatar fitzgen commented on July 30, 2024

@thedodd were you going to add a sketch of types and function/method signatures to this design proposal, as discussed in the WG meeting?

from gloo.

thedodd avatar thedodd commented on July 30, 2024

@fitzgen yes, will do. I added a TODO item at the bottom of the description :). Will hopefully be able to get around to it shortly.

from gloo.

thedodd avatar thedodd commented on July 30, 2024

@fitzgen @Pauan here are the thoughts on the design so far. This essentially communicates what I had in mind, and it is compatible with stable rust.

The main thing I am looking for feedback on is what you all think of the SplitSink & SplitStream as the main abstraction for using this type (as most users will want to read from and write to the socket). It is pretty much perfectly in line with the futures ecosystem, so I suspect this wont be a big deal. However, any and all feedback is welcome.

I will update the main body of this issue based on our discussion here.

EDIT: a distillation of this content has been moved up to the opening body of this issue. The original content here has been preserved, but put into a collapsible section for brevity.

Original Design Post

A high-level futures-based API for Websockets built on top of web-sys.

This type will implement both futures Stream & Sink which will allow for futures-based reading and writing on the underlying Websocket.

The internal interface of this type handles all aspects of the underlying Websocket. The type will handle all events coming from the underlying Websocket in order to handle reconnects. A set of Rust enums are used for representing the various event types and variants, as well as the Websocket state. In order to interface with the web-sys callback-based event handling, this type uses futures::mpsc channels to receive the events coming from the underlying Websocket.

initial design

The following is an example of how to build an instance of the gloo Websocket type.

// Build a new Websocket instance.
// EDIT: Per some initial discussion, the second argument is a place holder
// for some exponential backoff pattern. Needs more discussion, but the idea
// is that this is where reconnect patterns are configured.
let ws = Websocket::new("wss://api.example.com/ws", Some(5));

// Use sink to send messages & stream to receive messages.
// This comes from https://docs.rs/futures/0.1.25/futures/stream/trait.Stream.html#method.split
let (sink, stream) = ws.split();

Internally, the Websocket will look something like this.

struct Websocket {
    /// The underlying Websocket instance.
    /// 
    /// If this instance is configured to reconnect, this web-sys::Websocket will be swapped out
    /// on reconnects.
    ws: web_sys::Websocket,

    /// The optional configuration for handling reconnects.
    reconnect: Option<u32>,

    /// The channel receiver used for streaming in the events from the underlying Websocket.
    /// 
    /// The sending side is used when building the 4 `on_*` closures sent over to JS land. We do
    /// no retain it as we should never need it again after this type is built.
    receiver: UnboundedReceiver<WSEvent>,

    /// An array of the already cast wasm-bindgen closures used internally by this type.
    /// 
    /// Their ordering is as follows:
    /// 
    /// 1. on_message
    /// 2. on_open
    /// 3. on_error
    /// 4. on_close
    /// 
    /// **NB:** The ordering here is very important. In order to avoid having to recast the
    /// various closures when we need to reconnect, we store the 4 different closures as
    /// `Rc<js_sys::Function>`s and then we ensure that we pass them to the appropriate handlers
    /// during reconnect.
    /// 
    /// ALTERNATIVELY: we could just store these if four different fields as their closure types
    /// and then re-cast whenever we need to do a reconnect.
    callbacks_internal: [Rc<js_sys::Function>; 4],

    /// For non-reconnecting instances, this will be true when the underlying Websocket is closed.
    /// 
    /// At that point in time, the next iteration of this instance's stream will return `None` &
    /// any attempts to send messages via this sink will immediately return an error.
    is_closed: bool,
}

enum WSEvent {
    Open(Event),
    Message(WSMessage),
    Error(Event),
    Close(Event),
}

/// An enumeration of the different types of Websocket messages which may be received.
/// 
/// The gloo Websocket will unpack received messages and present them as the appropriate variant.
enum WSMessage {
    Text(String),
    Binary(Vec<u8>),
}

stream | sink | split

Given the above types, we can implement Stream & Sink on Websocket so that it may be used for sending and receiving Websocket messages asynchronosly.

sink

Sink will be a very simple implementation. We will implement Sink over WSMessage, as shown above, which will allow users to send simple string based messages, and will also allow them to send more sophisticated binary data using the same Sink interface.

Ultimately, no buffering will be employed by this sink implementation. Reconnecting instances will simply return NotReady when the underlying Websocket is being rebuilt per an error or close event. Any call to start_send will start and finish the send operation on the underlying Websocket.

stream

Stream will also be a very simple implementation. It will mutably borrow the mpsc::UnboundedReceiver which is encapsulated by this type. All events from the underlying Websocket will come through this stream.

split

Many users of this type will need to read from and write to the Websocket. Use of Stream.split is the recommended way of managing this. The two handles to this type may then be used throughout the user's app as needed.

reconnecting

It would seem that the only logic location for the reconnect logic to be driven from would be the stream impl. The stream is the location where error & close events from the underlying Websocket will be detected. This means that the Websocket stream must be polled in order for the reconnect functionality to work. I suspect this will hardly be an issue as most users will want to be reading from the stream already, and for those whom do not, spawning a .for_each() future is simple enough.

from gloo.

Pauan avatar Pauan commented on July 30, 2024

The second optional argument will cause the Websocket to handle disconnects by automatically attempting to reconnect at the given interval in seconds.

I normally try to avoid harsh language, but this seems really wrong.

Imagine a server is connected to 1 million WebSocket clients (which is not unreasonable). The server goes down (maybe for routine maintenance, maybe a crash).

All 1 million WebSocket clients attempt to reconnect at the same time. This of course fails, so then they try to reconnect again 5 seconds later, then another 5 seconds later...

This causes an incredibly massive "thundering herd" of reconnection attempts which overwhelms the network, which can cause other servers to fail under the pressure, which then causes a further cascade of server crashes...

The correct thing to do is to use exponential backoff to progressively slow down the rate of reconnecting.

It also needs to use some randomization to prevent the "thundering herd" problem of millions of clients attempting to reconnect at the same time.

Because this is such an important problem, and it's hard to get it right, we should just Do The Right Thing and handle the exponential backoff internally.

(Exponential backoff is also used in other areas, for example to prevent lock contention in databases.)

enum WSEvent

Is there any need for the Close event? Can't that be handled by the Stream returning None?

Similarly, I imagine Error isn't needed either, since that will just cause the WebSocket to retry.

Is there any use case for Open, or can we just remove WSEvent entirely?

Ultimately, no buffering will be employed by this sink implementation.

Why not? Since reconnecting will be an important use case, that essentially means that users will have to implement their own buffering strategy, which doesn't seem better than having it be built-in.

Reconnecting instances will simply return NotReady when the underlying Websocket is being rebuilt per an error or close event.

I don't see NotReady mentioned anywhere.

The stream is the location where error & close events from the underlying Websocket will be detected. This means that the Websocket stream must be polled in order for the reconnect functionality to work.

It should be possible to handle that all internally, inside of the actual error and close event callbacks.

from gloo.

Pauan avatar Pauan commented on July 30, 2024

I suppose one benefit of the Open and Close enum variants is that they (potentially) allow for the application to show a message/loading screen when the WebSocket goes down.

But in that case it needs to pass more information, such as how long until the next retry.

from gloo.

thedodd avatar thedodd commented on July 30, 2024

retry / backoff

Yes, I almost included exponential backoff in the design above. It was the first thing I reached for, but decided to go with a more simple proposal so that we could focus on the details of the WebSocket stream+sink abstraction first. I'll go ahead and add an // EDIT: item to the above design about using exp backoff.

WSEvent

With this model, the WSEvent enum will be used by the callbacks given to the web_sys::WebSocket. These closures pass their events over the unbounded sender which corresponds to the receiver you see above. They use WSEvent so that they can all use the same mpsc channel. I mention all of this in the comment above.

The variants of that enum are used to drive the logic for reconnect, disconnect &c. That is why they are needed. I was planning on forwarding those events as well so that users can trigger custom events in their apps.

sink buffering

The choice not to buffer is due to the nature of the type of messages. They are frames to be sent over the socket, sure, but they are analogous to a network request. If we choose to buffer, then we need to consider setting up timeouts on the buffered frames, and this adds a lot of complexity. Read on ...

The bit about NotReady is in the context of using Sink, where I also mention Sink.start_send(). The Sink trait's start_send must return a result of AsyncSink. Its NotReady variant is generic over the Sink's item type, and will return the element which was given to be sent through the sink.

Failing a user initiated network request when the network is disconnected seems pretty logical. It draws attention to the problem immediately. If we buffer, it will cause the appearance of the request being in progress and simply waiting for a response; where in reality, the request has not even been sent yet. IMHO, better to just fail the request immediately, remove the perceived latency, and just return NotReady(msg) from the start_send(msg) call.

As these sorts of things go, it is going to be six one way, and half a dozen the other. Different apps have different needs ... so, see the next section.

builder pattern

I was also considering introducing a builder pattern for the WebSocket at first so that we could do more complex configuration based on reconnects / buffering &c. @Pauan if you think we really need to support Sink buffering and some of the other WG folks agree, I am happy to put together a design which includes a builder pattern which will allow us to more clearly configure things like buffering, buffer timeouts, exponential backoff config and the like.

Thoughts?

from gloo.

thedodd avatar thedodd commented on July 30, 2024

@yoshuawuyts another thing which I take from your comment above is that we may want to support short-hand connection strings. IE, if a user provides a connection string of "/ws", we will parse it and see that it is not a valid url, and in such a case, we will use the window's proto & host, and then append the "/ws" to the end.

Is this something you think we should look into as well?

from gloo.

thedodd avatar thedodd commented on July 30, 2024

@yoshuawuyts a concern that I have about implementing AsyncRead + AsyncWrite on this type is that we would be forced to deal with all data sent and received as binary data, essentially discarding a frame's opcode indicating if it is a string or bytes frame. The encoder expects to be able to send all data as bytes, and the decoder expects to decode an item out of a bytes buffer.

We can certainly do this, but we will have to communicate a disclaimer to users that all data sent via the AsyncWrite will be framed with the binary opcode in the WebSocket frame. Similarly, we will have to make a choice on how to handle messages from the AsyncRead side. Should we just shoehorn all string and binary frames into the bytes buf? We can, but we will have to communicate this.

The implementation is simple enough, I just wanted to bring it up. I'm certainly ok with this, but I just wanted to make sure we are all on the same page here. Thoughts?


Outside of the context of AsyncRead + AsyncWrite, users could simply filter the standard Stream<WSEvent> to extract only messages, and then from there, and_then the messages onto a decoder which will extract their application data (like protobuf or whatever). Similarly with sending messages, they can simply encode their data and pass it to the Sink as a WSMessage::Binary(data).

from gloo.

najamelan avatar najamelan commented on July 30, 2024

from gloo.

thedodd avatar thedodd commented on July 30, 2024

@najamelan you're good. Definitely not spamming :). I read through the code there, and it looks like it is all implemented for futures 0.3 & nightly async/await. I noticed a few things as well which appeared to be unimplemented and such, so I wasn't sure where that effort was at.

IIRC, we are trying to keep things on stable, so that might be problematic. Have you done anything with reconnects? If not, perhaps that is something we can cover here as well. Let me know, and def don't worry about spamming. Everything you've said so far is definitely pertinent.

from gloo.

thedodd avatar thedodd commented on July 30, 2024

@yoshuawuyts glad to hear you say that, because that’s exactly what I am doing right now 🙌

I’ll have PR up tomorrow.

from gloo.

fitzgen avatar fitzgen commented on July 30, 2024

Does it make sense in this case to have a mid-level API in between some higher-level futures-y/streams-y/channel-y API and web-sys that gives the WebSocket API but with F: FnMut(..) callbacks isntead of &js_sys::Function and uses gloo_events internally?

from gloo.

yoshuawuyts avatar yoshuawuyts commented on July 30, 2024

@fitzgen The binding logic will need to be written anyway as part of the higher-level API, so creating an intermediate API as a basis to build the streams on seems like good engineering practice. 👍

from gloo.

thedodd avatar thedodd commented on July 30, 2024

Sounds good! Will do.

from gloo.

thedodd avatar thedodd commented on July 30, 2024

@fitzgen @Pauan @yoshuawuyts hey all, just wanted to give a heads-up that I've updated the body of this issue (the very first card) with details and refinements based on our discussions so far.

I've also organized the two proposals under collapsible sections so that we can more easily navigate and read the proposal overall. I have implemented much of code already. I'll have a PR open soon (in a WIP state, of course) so that we can begin looking at this in more depth.

Any and all feedback is welcome.

EDIT: so based on the updates to the CONTRIBUTING workflow, it looks like I shouldn't open a PR yet. That's fine. I've already written a lot of the code just to explore the possibilities we've been discussing, but I don't mind holding off on the PR. Let me know.

from gloo.

thedodd avatar thedodd commented on July 30, 2024

@yoshuawuyts that's a good call.

For folks that really need predictability on that front, they can use a custom config, else they will get the default reconnect config.

Perhaps we should make it even more clear about opting out of reconnects by doing something like: WebSocket::builder("/ws").no_reconnect().build(). This would give us a more uniform builder pattern as well.

EDIT: so at this point, I'm thinking we should have a few different more simple constructors for the high-level type. Mainly because using a builder pattern for this when it is only one parameter which could change seems ... not so great. How about this:

  • WebSocket::connect("/ws"): uses default reconnect config.
  • WebSocket::no_reconnect("/ws"): creates an instance which will not attempt to reconnect.
  • WebSocket::custom_reconnect("/ws", cfg): creates an instance with custom reconnect config.

Thoughts?

from gloo.

thedodd avatar thedodd commented on July 30, 2024

@yoshuawuyts & @Pauan two additional items which come to mind as I've been building out the mid-level API:

  • First, we need to allow the users to specify subprotocols on the connection. This is already supported by the web_sys::WebSocket, we will just need to pass the information through. Already done in the code for the mid-level API, and is updated in the design spec above. We just need to determine how we want to incorporate this into the high-level builder (for the futures type).
    Basically what this indicates to me is that perhaps we should just use a builder type exactly like the one for the mid-level type (but without callback, obvs). See mid-level builder details above.
  • Second, there is really only one place where we are dealing with fallible method calls. One of them is the call to build the web_sys::WebSocket, which returns Result<Self, JsValue>. Forcing users of Gloo to deal with JsValue errors seems antithetical to Gloo's goals (particularly the Idiomatic goal).
    So, should we create a new error type which can wrap JsValue errors coming from the Web APIs which all of the Gloo child crates can use? We can pop another issue for this if we don't already have a solution in place.

from gloo.

thedodd avatar thedodd commented on July 30, 2024

I have some code in place. It is not ready for peer review, and there is plenty of work to be done, but this will help to coordinate our design discussion as we move forward.

Once our design session has solidified, we can finish up implementation & open the PR against this repo.

https://github.com/thedodd/gloo/pull/1/files

from gloo.

Pauan avatar Pauan commented on July 30, 2024

Thanks, this is looking a lot better!

If we buffer, it will cause the appearance of the request being in progress and simply waiting for a response

I think I should clarify how this all fits together. The Sink trait is exactly intended for these situations where there may be some delay before sending. The documentation says:

Sending to a sink is "asynchronous" in the sense that the
value may not be sent in its entirety immediately.

Instead, values are sent in a two-phase way: first by
initiating a send, and then by polling for completion.

This two-phase setup is analogous to buffered writing in
synchronous code, where writes often succeed immediately,
but internally are buffered and are actually written only
upon flushing.

The Sink trait is absolutely intended to handle buffering (since almost everything uses buffering!)

The way it works is that when start_send is called, it will start buffering the message (it returns AsyncSink::NotReady if the buffer is full, not if the message cannot be sent right away!)

After that poll_complete is called, which says when the message has been fully sent (so it would return Async::NotReady if the network is down).

However, those are internal methods used by the implementation of Sink. Instead, users would normally use Sink::send, which attempts to send the message and returns a Future which will resolve with the Sink after the message has been sent.

As an example of how it would look like with async/await syntax:

let ws = WebSocket::connect("/ws");

let (sink, stream) = ws.split();

// Attempts to send the message and waits for it to complete
let sink = await!(sink.send(WsMessage::Text(...)));

// Attempts to send another message and waits for it to complete
let sink = await!(sink.send(WsMessage::Text(...)));

// Attempts to send multiple messages and waits for them all to complete
let (sink, _) = await!(sink.send_all(iter_ok::<_, ()>(vec![
    WsMessage::Text(...),
    WsMessage::Text(...),
    WsMessage::Text(...),
])));

Because it returns a Future, it's quite clear that the message will take some time to send (and you have to wait for the Future to resolve in order to be sure that the message has been sent).

So in the case of a network failure, sink.send would simply stop and wait until the network is back. It fits perfectly with the retry semantics.

(Of course you can use various things to make it happen in parallel if you want to, but the default is sequential)

If the user wishes to put a timeout for the message send, they can, but in that case they would use a generic timeout system which works with any Future:

let sink = await!(Timeout::new(10000, sink.send(WsMessage::Text(...))));

There's no need to build in timeouts into WebSocket, since it is handled externally.

So, should we create a new error type which can wrap JsValue errors coming from the Web APIs which all of the Gloo child crates can use?

Yes, absolutely. Usually this would be handled by creating a Rust enum and then mapping from the JsValue into it (and back again).

from gloo.

derekdreery avatar derekdreery commented on July 30, 2024

I've made a websocket abstraction for myself, and I thought I'd share it here in case it can serve as inspiration.

What I've come to realise is that the websocket api maps pretty cleanly to futures::Sink and futures::Stream, and so implemented a mapping between them.

A change I definitely want to make is to make the new function synchronous, and use poll_ready to indicate when to start writing messages.

websocket abstraction using futures
//! Websocket client wrapper

use ::{
    futures::{
        channel::{mpsc, oneshot},
        prelude::*,
        select,
        stream::FusedStream,
    },
    gloo::events::EventListener,
    std::{
        pin::Pin,
        task::{Context, Poll},
    },
    wasm_bindgen::{prelude::*, JsCast},
};

#[derive(Debug)]
pub struct WebSocket {
    inner: web_sys::WebSocket,
    close_listener: EventListener,
    close_rx: oneshot::Receiver<()>,
    pending_close: Option<()>,
    message_listener: EventListener,
    message_rx: mpsc::UnboundedReceiver<JsValue>,
    pending_message: Option<JsValue>,
    error_listener: EventListener,
    error_rx: mpsc::UnboundedReceiver<JsValue>,
    closed: bool,
}

impl WebSocket {
    /// Try to connect to the given url.
    ///
    /// Currently the future returned by this function eagerly initiates the connection before it
    /// is polled.
    pub async fn new(url: &str) -> Result<Self, JsValue> {
        let inner = web_sys::WebSocket::new(url)?;
        inner.set_binary_type(web_sys::BinaryType::Arraybuffer);

        let (open_tx, open_rx) = oneshot::channel();
        let open_listener = EventListener::once(&inner, "open", move |_| {
            open_tx.send(()).unwrap_throw();
        });

        let (error_tx, mut error_rx) = mpsc::unbounded();
        let error_listener = EventListener::new(&inner, "error", move |event| {
            error_tx
                .clone()
                .unbounded_send((***event).to_owned())
                .unwrap_throw();
        });

        select! {
            _ = open_rx.fuse() => (),
            err = error_rx.next() => {
                return Err(err.unwrap_throw());
            },
        };

        let (message_tx, message_rx) = mpsc::unbounded();
        let message_listener = EventListener::new(&inner, "message", move |event| {
            let message: &web_sys::MessageEvent = event.dyn_ref().unwrap_throw();
            message_tx
                .clone()
                .unbounded_send(message.data())
                .unwrap_throw();
        });

        let (close_tx, close_rx) = oneshot::channel();
        let close_listener = EventListener::once(&inner, "close", move |event| {
            close_tx.send(()).unwrap_throw();
        });

        Ok(WebSocket {
            inner,
            close_listener,
            close_rx,
            pending_close: None,
            message_listener,
            message_rx,
            pending_message: None,
            error_listener,
            error_rx,
            closed: false,
        })
    }

    /// Initiate closing of the connection. The websocket should only be dropped once the stream
    /// has been exhausted.
    pub fn close(self) {
        self.inner.close().expect_throw(
            "we are not using code or reason, so they cannot be incorrectly formatted",
        );
    }
}

impl Stream for WebSocket {
    type Item = Result<Vec<u8>, JsValue>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        if self.closed {
            return Poll::Ready(None);
        }

        if let Some(msg) = self.pending_message.take() {
            return Poll::Ready(Some(Ok(decode(msg))));
        }

        // check this last so that the last event is a close event.
        if let Some(msg) = self.pending_close.take() {
            self.closed = true;
            return Poll::Ready(None);
        }

        match (
            Stream::poll_next(Pin::new(&mut self.error_rx), cx),
            Stream::poll_next(Pin::new(&mut self.message_rx), cx),
            Future::poll(Pin::new(&mut self.close_rx), cx),
        ) {
            (Poll::Ready(error), message_poll, close_poll) => {
                let error =
                    error.expect_throw("the error channel should never be polled when closed");
                if let Poll::Ready(msg) = message_poll {
                    self.pending_message = Some(
                        msg.expect_throw("the message channel should never be polled when closed"),
                    ); // we know the old value is none from before.
                }
                if let Poll::Ready(close) = close_poll {
                    self.pending_close = Some(
                        close.expect_throw("the close channel should never be polled when closed"),
                    );
                }
                Poll::Ready(Some(Err(error)))
            }
            (Poll::Pending, Poll::Ready(message), close_poll) => {
                let message =
                    message.expect_throw("the message channel should never be polled when closed");
                if let Poll::Ready(close) = close_poll {
                    self.pending_close = Some(
                        close.expect_throw("the close channel should never be polled when closed"),
                    );
                }
                Poll::Ready(Some(Ok(decode(message))))
            }
            (Poll::Pending, Poll::Pending, Poll::Ready(_)) => {
                self.closed = true;
                Poll::Ready(None)
            }
            (Poll::Pending, Poll::Pending, Poll::Pending) => Poll::Pending,
        }
    }
}

impl FusedStream for WebSocket {
    fn is_terminated(&self) -> bool {
        self.closed
    }
}

impl Sink<Vec<u8>> for WebSocket {
    type Error = JsValue;

    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        // because we only return a websocket once setup is complete, this always returns Ok. todo
        // investigate getting rid of async in `connect` and instead using this function.
        Poll::Ready(Ok(()))
    }

    fn start_send(self: Pin<&mut Self>, mut item: Vec<u8>) -> Result<(), Self::Error> {
        self.inner.send_with_u8_array(&mut item)
    }

    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        // once we have sent a message, we get no confirmation of whether sending was successful.
        Poll::Ready(Ok(()))
    }

    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        // todo investigate sending the close message.
        Poll::Ready(Ok(()))
    }
}

/// Get byte data out of a JsValue
fn decode(val: JsValue) -> Vec<u8> {
    js_sys::Uint8Array::new(&val).to_vec()
}

from gloo.

derekdreery avatar derekdreery commented on July 30, 2024

Writing this was hard and time-consuming, and so I would definitely have appreciated it if gloo had provided an abstraction I could just use! :)

from gloo.

derekdreery avatar derekdreery commented on July 30, 2024

@najamelan cool, would you think about helping with the gloo effort, to get something into here?

from gloo.

najamelan avatar najamelan commented on July 30, 2024

@derekdreery I tried, but the webassembly wg preferred to roll their own version, of which this issue is the result.

If gloo wants to adopt ws_stream_wasm, that's fine by me, and if they intend to maintain it and keep it working, they can even run rustfmt on it. It can be renamed and I can deprecate the current crate.

Or it could be just renamed to gloo-websocket, move in this repository and I continue to maintain it. Currently it does depend on a server backend in ws_stream_tungstenite for testing and the two crates make sense to be used in tandem, since otherwise there is no websocket crate that provides AsyncRead/AsyncWrite on the server side.

from gloo.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.