Giter VIP home page Giter VIP logo

fastapi-distributed-websocket's People

Contributors

andrewthetechie avatar dependabot[bot] avatar dontpanico avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

fastapi-distributed-websocket's Issues

Update `WebSocketProxy.__call__` to use `asyncio.TaskGroup`

Feature or Enhancement

Move from asyncio.gather to asyncio.TaskGroup in WebSocketProxy.__call__.

Pitch

asyncio.TaskGroup (only available wih Python >= 3.11) has a more structured cancellation logic and (as stated from python docs) it should be preferred over asyncio.gather when there are no reasons to use one of the two over the other.

  • Important: this will require for the next release to drop support for Python < 3.11

Actually, the implementation is:

async def _forward(
    client: WebSocket, target: websockets.WebSocketClientProtocol
) -> None:
    ...

async def _reverse(
    client: WebSocket, target: websockets.WebSocketClientProtocol
) -> None:
    ...


class WebSocketProxy:
    ...

    async def __call__(self) -> None:
        async with websockets.connect(self._server_endpoint) as target:
            self._forward_task = asyncio.create_task(
                _forward(self._client, target)
            )
            self._reverse_task = asyncio.create_task(
                _reverse(self._client, target)
            )
            await asyncio.gather(self._forward_task, self._reverse_task)

With asyncio.TaskGroup it'd be like:

...

    async def __call__(self) -> None:
        async with asyncio.TaskGroup() as tg:
            async with websockets.connect(self._server_endpoint) as target:
                self._forward_task = tg.create_task(
                    _forward(self._client, target)
                )
                self._reverse_task = tg.create_task(
                    _reverse(self._client, target)
                )

Entering websockets.connect in the taskgroup context ensures that if any failure with target occurs, our child tasks (and the partent too) would properly cancel.

Readme code does not reflect how methods should actually be used

Feature or enhancement

Make the README.md examples consistent with how the library actually works.

Issue

One of the examples of how to receive data in the README.md is as follows:

@app.websocket('/ws/{conn_id}')
async def websocket_endpoint(
    ws: WebSocket,
    conn_id: str,
    *,
    topic: Optional[Any] = None,
) -> None:
    connection: Connection = await manager.new_connection(ws, conn_id)
    try:
        while True:
            msg = await connection.receive_json()
            await manager.broadcast(msg)
    except WebSocketDisconnect:
        await manager.remove_connection(connection)

However, remove_connection() is not async so should not be awaited. Furthermore, the type of msg in the example is a dict and since the broadcast() method only accepts the Message type, this can't actually be used. The topic param isn't being passed to new_connection() either.

Similar story to the example after:

@app.websocket('/ws/{conn_id}')
async def websocket_endpoint(
    ws: WebSocket,
    conn_id: str,
    *,
    topic: Optional[Any] = None,
) -> None:
    connection: Connection = await manager.new_connection(ws, conn_id)
    # This is the preferred way of handling WebSocketDisconnect
    async for msg in connection.iter_json():
        await manager.receive(connection, msg)
    await manager.remove_connection(connection)

remove_connection() is not async.
iter_json() is also returning dicts which don't fit with the receive() method.
topic param is unused.

Pitch

The first example could become something like this:

@app.websocket('/ws/{conn_id}')
async def websocket_endpoint(
    ws: WebSocket,
    conn_id: str,
    *,
    topic: Optional[Any] = None,
) -> None:
    connection: Connection = await manager.new_connection(ws, conn_id, topic)
    try:
        while True:
            data = await connection.receive_json()
            msg = Message.from_client_message(data=data)
            manager.broadcast(msg)
    except WebSocketDisconnect:
        manager.remove_connection(connection)

There's a lot of potential solutions though. You could

  • Add a receive_message() method to abstract the conversion to Message object
  • Change broadcast() to detect the type of the message arg
  • Add a broadcast_json() method instead

For the second example, it could be something like this:

@app.websocket('/ws/{conn_id}')
async def websocket_endpoint(
    ws: WebSocket,
    conn_id: str,
    *,
    topic: Optional[Any] = None,
) -> None:
    connection: Connection = await manager.new_connection(ws, conn_id, topic)
    # This is the preferred way of handling WebSocketDisconnect
    async for msg in connection:
        await manager.receive(connection, msg)
    manager.remove_connection(connection)

Just a side-note but it could be cool to see some sort of context manager or putting into an iter so that it removes the connection on WebSocketDisconnect automatically.

Decouple `Message` objects initialisation from `WebSocketManager`

Feature or enhancement

Initialize Message objects in Connection.iter_json instead of WebSocketManager.receive

Pitch

Currently, Message objects are initialized from client data in WebSocketManager.receive after a validation step (which might raises a ValueError).
Even if that's totally fine, a better way of doing it is decoupling the message handling from its initialisation.
We can make Connection.iter_json yielding Message istances and WebSocketManager.receive accepting a Message instance (either manually instantiated by the user or through iter_json generator).

Example Usage:

# imports and WebSocketManager initialisation
# skippe for brevity

async def app(scope, receive, send):
    websocket = WebSocket(scope, receive, send)
    conn = await manager.new_connection(websocket, 'conn_id')
    async for msg in conn.iter_json():
        await manager.receive(msg)
    manager.remove_connection(conn)

Update

Users might want to not use WebSocketManager.receive but directly calls send, broadcast, etc...
With the above implementation, that's start become expensive (users using iter_json, should then reconvert Message to dict).
So we should leave iter_json as is and implement an async iterator for the Connection class. This involve a little change in Message.from_client_message, switching from dict.get(k) to dict.pop(k, None) that's surely out of scope for this issue, but it's ok to have it done here.
Also message validation should happen in __anext__ so we can either return None to the caller and/or send a message to the client sayng that the data was not formatted as expected.

# ./_connection.py
...
class Connection:
    ...
    def __aiter__(self):
        return self

    async def __anext__(self):
        try:
            data = await self.receive_json()
            validate_incoming_message(data)
            return Message.from_client_message(data=data)
        except ValueError as exc:
            await self.send_json({'err': '{exc}'})  # maybe?
            return None
        except WebSocketDisconnect:
            raise StopAsyncIteration from None
    ...

Please Provide Example with multiple workers

Thank you for this wonderful project.
I was trying to implement to send message to all active connections, but when I'm trying with multiple uvicorn workers It's not working.
Could you please share an example code.

Switch to tests that don't require Docker

Rationale

At the moment, tests requires Docker and docker-compose to run. This make it difficult to rely on github actions workflows for testing.
Even if those tests strictly reproduce the most common use case in a real world manner, it should be better to have a test suite that can be run with pytest only (using starlette.testclient.TestClient where needed).
Also we need to improve coverage, which will require a lot of functional tests (e.g. test matches, untag_broker_message and so on). Using Docker will become a unreasonable overhead when srtictly required only by a very small segment of the test suite.

Normalize broker interfaces `__init__` signatures

Feature or enhancement

Ensure that __init__ method signature of interfaces inheriting from BrokerInterface contains a positional arg for the broker url.

Ptich

Interfaces inheriting from BrokerInterface now have different __init__ signatures. We should at least make sure that they accept a positional parameter for the broker url.
That will also improve the _init_broker method used by WebSocketManager.
We could then accept *args, **kwargs for user convenience:

# ./_broker.py

class BrokerInterface(ABC):
    @abstractmethod
    def __init__(broker_url: str, *args, **kwargs):
        ...
# Also `InMemoryBroker` and `RedisBroker`
# should be updated to match that

# ./manager.py

# this require also `create_broker` signature to
# be updated to match `*args`.
def _init_broker(url: str, broker_class: Any | None = None, *args, **kwargs) -> BrokerT:
    if broker_class:
        assert is_valid_broker(
            broker_class
        ), 'Invalid broker class. Use distributed_websocket.utils.is_valid_broker to check if your broker_class is valid.'
    broker_factory = broker_class or create_broker
    return broker_factory(url, *args, **kwargs)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.