Skip to content

Is there a test client? #332

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
jacopofar opened this issue Aug 8, 2019 · 29 comments
Closed

Is there a test client? #332

jacopofar opened this issue Aug 8, 2019 · 29 comments
Assignees

Comments

@jacopofar
Copy link
Contributor

Hello and thanks for the library!

I'm using it with Starlette and trying to implement some integration test. Is there a test client for socketio similar to the one they provide for the basic HTTP/websocket (here), or examples about how to implement such a test?

@miguelgrinberg
Copy link
Owner

miguelgrinberg commented Aug 8, 2019

There is currently no test client. I have one for the Flask integration with this package, it would be nice to have something similar that is generic.

The best approximation is to use a real server with a real client, both possibly running within the same process.

@jacopofar
Copy link
Contributor Author

Thanks. I tried to implement the test in such a way, but cannot manage to have the ASGI server running in a separate thread or process while the test client connects to it.

My test looks like this:

@pytest.mark.asyncio
async def test_websocket():
    config = uvicorn.Config(app.create_app(), host='localhost', port=8000)
    server = uvicorn.Server(config)
    loop = asyncio.get_running_loop()

    serve_coroutine = await server.serve()
    executor = concurrent.futures.ProcessPoolExecutor(
        max_workers=5,
    )
    loop.run_in_executor(executor, serve_coroutine)

    # this line is reached only when I press Ctrl-C and kill the server

    async def connect_to_ws():
        sio = socketio.Client()
        sio.connect('http://localhost:8000')
        # here would go assertions on the socket responses

    k = await connect_to_ws()
    loop.run_in_executor(executor, k)

the app.create_app function is:

def create_app():

    app = Starlette(debug=True)
    app.mount('/', StaticFiles(directory='static'), name='static')

    sio = socketio.AsyncServer(async_mode='asgi')
    extended_app = socketio.ASGIApp(sio, app)

    # here define HTTP and Socketio handlers

    return extended_app

the basic idea is to start the complete server and just connect to it, I assumed I could run the server and the test client in the same event loop but apparently when I run the server (that is indeed started and I can reach with the browser) it blocks the test code. Only when I use Ctrl-C to stop it the server is killed and the rest of the test runs but of course it doesn't find the server.

Probably I'm missing something essential here, I expected the server and the test client to run concurrently on the same event loop without need for multithreading or multiprocessing.

@miguelgrinberg
Copy link
Owner

miguelgrinberg commented Aug 22, 2019

In your example you are using a process executor, so you are in fact using multiprocessing there. I think this can be done in a much simpler way. Here is a rough attempt that appears to be work well:

import asyncio
import socketio

sio = socketio.AsyncServer(async_mode='asgi', monitor_clients=False)
app = socketio.ASGIApp(sio)

def start_server():
    import asyncio
    from uvicorn import Config, Server
    config = Config(app, host='127.0.0.1', port=5000)
    server = Server(config=config)
    config.setup_event_loop()
    loop = asyncio.get_event_loop()
    server_task = server.serve()
    asyncio.ensure_future(server_task)
    return server_task

async def run_client():
    client = socketio.AsyncClient()
    await client.connect('http://localhost:5000')
    await asyncio.sleep(5)
    await client.disconnect()

start_server()
loop = asyncio.get_event_loop()
loop.run_until_complete(run_client())

Hopefully this will get you started.

@jacopofar
Copy link
Contributor Author

jacopofar commented Aug 22, 2019

Thanks a lot! Indeed from this example I was able to make it work :)

For whoever will encounter the problem in the future, in case someone in the future is interested this is my implementation:

The app:

def create_app():

    app = Starlette(debug=True)
    app.mount('/', StaticFiles(directory='static'), name='static')

    sio = socketio.AsyncServer(async_mode='asgi')
    extended_app = socketio.ASGIApp(sio, app)

    @sio.on('double')
    async def double(sid, data):
        logging.info(f"doubling for {sid}")
        return 'DOUBLED:' + data * 2
   # here add HTTP and WS handlers...
   return extended_app

The test, based on pytest-asyncio, uses an async ficture to start and stop the server has this structure:

import asyncio

import socketio
import uvicorn
import pytest

from myapp import app

def get_server():
    config = uvicorn.Config(app.create_app(), host='localhost', port=8000)
    server = uvicorn.Server(config=config)
    config.setup_event_loop()
    return server


@pytest.fixture
async def async_get_server():
    server = get_server()
    server_task = server.serve()
    asyncio.ensure_future(server_task)

@pytest.mark.asyncio
async def test_websocket(async_get_server):
    client = socketio.AsyncClient()
    await client.connect('http://localhost:8000')
    result = await client.call('double', 'hello')
    assert result == 'DOUBLED:hellohello'
    await client.disconnect()

This work although it produces a lot of warnings (I suspect some problem with the logs).

A weird thing I noticed is that to run this it is required to install aiohttp, if not I get timeout errors. Could it be worth to raise an explicit error in this case? I can try and do a PR if it's fine for you

@miguelgrinberg
Copy link
Owner

The aiohttp package provides the WebSocket client, without it the connection stays as long-polling. Not sure why the timeouts without it however, I'll have to test that.

@nbanmp
Copy link

nbanmp commented Mar 26, 2020

I'm at a point where having a test client with socketio would be really helpful for me too. Is there any update on the progress of this?

@miguelgrinberg
Copy link
Owner

@nbanmp I am not currently working on a test client. The main reason is that there is a real Python client now. Is there anything that prevents you from using the real client against your real server for tests?

@nbanmp
Copy link

nbanmp commented Mar 26, 2020

Thanks for the update.

Running the real client against the real server has some issues, the main one for me is that it is more difficult to run multiple tests asynchronously. But also important is that, I would like my unit tests to be as independent as possible, and I was expecting to run the actual server for integration testing.

@miguelgrinberg
Copy link
Owner

In any case, the test client that exists in the Flask-SocketIO package is very limited, if/when I get to do a generic test client it would be based on the real client talking to the real server. It would make it easier to start/stop the server for each test, but other than that I expect it will be the real thing, not a fake.

@databasedav
Copy link
Contributor

There should to be a way to gracefully shutdown the server given the setup above. This would need to cancel the _service_task and disconnect the remaining connect clients. I can make this contribution but need some guidance on a few things, particularly how the _service_task is started from a socketio.AsyncServer; I can find the task being started in engineio.AsyncServer but not in the former.

@miguelgrinberg
Copy link
Owner

@databasedav The service task does not need to run when testing. A testing set up can be made by subclassing the Server and Client classes (and their asyncio counterparts) to re-implement the networking parts through direct calls.

@databasedav
Copy link
Contributor

@miguelgrinberg I agree; I was just talking in the context of using a live server like discussed above

@miguelgrinberg
Copy link
Owner

@databasedav start your server with monitor_clients=False to disable the service task. I'll actually add that.

@Korijn
Copy link

Korijn commented Sep 7, 2020

In your example you are using a process executor, so you are in fact using multiprocessing there. I think this can be done in a much simpler way. Here is a rough attempt that appears to be work well:

...

Hopefully this will get you started.

This definitely did the trick! For future readers, I also had to do the following:

  • Install aiohttp in order for the client to work
  • Set start_service_task = False on the server engineio object

If you also want to work with HTTP requests in the same client session, use client.eio._send_request or client.eio.http, that way things like cookies will be shared

I also used the following to shutdown after the test:

server.should_exit = True
loop.run_until_complete(server_task)

I do still wonder if it's possible to set this up directly on ASGI level, instead of actually binding to ports and hostnames...

@erny
Copy link

erny commented Oct 20, 2020

Ok, here goes a complete example, using FastAPI as the primary ASGI app and socketio as the secondary one. The chat server echoes the message to all clients.

  • UPDATE 1: includes @Korijn's improvements.
  • UPDATE 2: Now, we use a future to wait for the result
  • UPDATE 3: @Korijn's improvements (timed wait for server startup) has been replaced with a asyncio.Event sync mechanism

src/app/main.py:

import os
import socketio

from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles

app = FastAPI()
path = os.path.dirname(__file__)
app.mount("/static", StaticFiles(directory=os.path.join(path, "static")), name="static")

sio = socketio.AsyncServer(async_mode='asgi')
app.mount('/sio', socketio.ASGIApp(sio))  # socketio adds automatically /socket.io/ to the URL.


@sio.on('connect')
def sio_connect(sid, environ):
    """Track user connection"""
    print('A user connected')


@sio.on('disconnect')
def sio_disconnect(sid):
    """Track user disconnection"""
    print('User disconnected')


@sio.on('chat message')
async def chat_message(sid, msg):
    """Receive a chat message and send to all clients"""
    print(f"Server received: {msg}")
    await sio.emit('chat message', msg)

src/app/tests/test_chat.py:

from typing import List, Optional
# stdlib imports
import asyncio

# 3rd party imports
import pytest
import socketio
import uvicorn

# FastAPI imports
from fastapi import FastAPI

# project imports
from .. import main

PORT = 8000

# deactivate monitoring task in python-socketio to avoid errores during shutdown
main.sio.eio.start_service_task = False


class UvicornTestServer(uvicorn.Server):
    """Uvicorn test server

    Usage:
        @pytest.fixture
        async def start_stop_server():
            server = UvicornTestServer()
            await server.up()
            yield
            await server.down()
    """

    def __init__(self, app: FastAPI = main.app, host: str = '127.0.0.1', port: int = PORT):
        """Create a Uvicorn test server

        Args:
            app (FastAPI, optional): the FastAPI app. Defaults to main.app.
            host (str, optional): the host ip. Defaults to '127.0.0.1'.
            port (int, optional): the port. Defaults to PORT.
        """
        self._startup_done = asyncio.Event()
        super().__init__(config=uvicorn.Config(app, host=host, port=port))

    async def startup(self, sockets: Optional[List] = None) -> None:
        """Override uvicorn startup"""
        await super().startup(sockets=sockets)
        self.config.setup_event_loop()
        self._startup_done.set()

    async def up(self) -> None:
        """Start up server asynchronously"""
        self._serve_task = asyncio.create_task(self.serve())
        await self._startup_done.wait()

    async def down(self) -> None:
        """Shut down server asynchronously"""
        self.should_exit = True
        await self._serve_task


@pytest.fixture
async def startup_and_shutdown_server():
    """Start server as test fixture and tear down after test"""
    server = UvicornTestServer()
    await server.up()
    yield
    await server.down()


@pytest.mark.asyncio
async def test_chat_simple(startup_and_shutdown_server):
    """A simple websocket test"""

    sio = socketio.AsyncClient()
    future = asyncio.get_running_loop().create_future()

    @sio.on('chat message')
    def on_message_received(data):
        print(f"Client received: {data}")
        # set the result
        future.set_result(data)

    message = 'Hello!'
    await sio.connect(f'http://localhost:{PORT}', socketio_path='/sio/socket.io/')
    print(f"Client sends: {message}")
    await sio.emit('chat message', message)
    # wait for the result to be set (avoid waiting forever)
    await asyncio.wait_for(future, timeout=1.0)
    await sio.disconnect()
    assert future.result() == message

Here goes a test run:
$ pytest -s src/app/tests/test_chat.py

=============================================================== test session starts ================================================================
platform darwin -- Python 3.7.9, pytest-6.1.1, py-1.9.0, pluggy-0.13.1
rootdir: /Users/erevilla/Documents/proyectos/next/product/playground
plugins: cov-2.10.1, asyncio-0.14.0
collected 1 item                                                                                                                                  

src/app/tests/test_chat.py INFO:     Started server process [2927]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:5000 (Press CTRL+C to quit)
A user connected
INFO:     127.0.0.1:63597 - "GET /sio/socket.io/?transport=polling&EIO=3&t=1603207299.770268 HTTP/1.1" 200 OK
INFO:     ('127.0.0.1', 63597) - "WebSocket /sio/socket.io/" [accepted]
Client sends: Hello!
Server received and sends to all clients: Hello!
Client received: Hello!
.User disconnected
INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [2927]


================================================================ 1 passed in 1.02s =================================================================

Notes:

  • Start the test without -s to hide the server output.

@Korijn
Copy link

Korijn commented Oct 20, 2020

@erny some suggestions for improvement over the hardcoded sleeps (untested), which should make your test suite faster overall:

async def wait_ready(server, interval=0.05, max_wait=5):
    i = 0
    while not server.started:
        await asyncio.sleep(interval)
        i += interval
        if i > max_wait:
            raise RuntimeError()

@pytest.fixture
async def async_get_server():
    """Start server as test fixture and tear down after test"""
    server = get_server()
    serve_task = asyncio.create_task(server.serve())
    await wait_ready(server)   # wait for the server to startup
    yield 1
    # teardown code
    server.should_exit = True
    await serve_task()  # allow server run tasks before shut down

@miguelgrinberg
Copy link
Owner

Awesome, thanks @erny! Given that this appears to be figured out, I'm going to close this issue.

@erny
Copy link

erny commented Oct 20, 2020

@Korijn I included your improvements in the updated example. Thank you very much.

@erny
Copy link

erny commented Oct 20, 2020

@Korijn, @miguelgrinberg , I was not able to remove the sio.sleep(0.1) after sio.emit. Is there any alternative?

@Korijn
Copy link

Korijn commented Oct 20, 2020

@Korijn, @miguelgrinberg , I was not able to remove the sio.sleep(0.1) after sio.emit. Is there any alternative?

You would need to wait in a loop for result.message_received to become True, very similar to how wait_ready is defined. You could pass the condition to wait for as an argument to make it reusable.

async def wait_ready(condition, interval=0.05, max_wait=5):
    i = 0
    while not condition():
        await asyncio.sleep(interval)
        i += interval
        if i > max_wait:
            raise RuntimeError()

Usage examples:

await wait_ready(lambda: server.started)
await wait_ready(lambda: result.message_received)

Also I guess you could still lower the interval quite a bit, like 0.001 or even lower maybe.

@erny
Copy link

erny commented Oct 20, 2020 via email

@Korijn
Copy link

Korijn commented Oct 20, 2020

An idea would be to define a test server app with a catch-all event handler which writes all messages it receives to a list, and a helper to wait for a new message to come in which could also return that new message.

@SeeringPhil
Copy link

SeeringPhil commented Nov 2, 2020

Would it be possible for one of you guys that have been able to make this work to list the versions of the libraries/dependencies you're using?

I attempted to replicate what's discussed here (#332 (comment)) using windows, but I keep having issues and I'm wondering if it's related to my python version or dependencies's.

Thanks !

Update:
I was able to make it work using linux (wsl2), but I'm experimenting an issue where the test takes a whole minute to "complete". In fact, I added a test case where I only changed the message to another word, and it takes exactly 2 minutes before completion. Every test stays stuck at

INFO:     Shutting down
INFO:     Waiting for background tasks to complete. (CTRL+C to force quit)

for a whole minute before going on with the next one.

@Korijn
Copy link

Korijn commented Nov 2, 2020

Put your code up somewhere so we can have a look 👍

@erny
Copy link

erny commented Nov 15, 2020

Hi.

Sorry for the late answer.

Would it be possible for one of you guys that have been able to make this work to list the versions of the libraries/dependencies you're using?

Of course, here we go:

python 3.7.9 (using a local pyenv installed instance and also using docker image python:3.7.9-slim)
uvicorn==0.12.2
  - click [required: ==7.*, installed: 7.1.2]
  - h11 [required: >=0.8, installed: 0.11.0]
  - typing-extensions [required: Any, installed: 3.7.4.3]
uvloop==0.14.0
aiofiles==0.5.0
aiohttp==3.6.3
  - async-timeout [required: >=3.0,<4.0, installed: 3.0.1]
  - attrs [required: >=17.3.0, installed: 20.2.0]
  - chardet [required: >=2.0,<4.0, installed: 3.0.4]
  - multidict [required: >=4.5,<5.0, installed: 4.7.6]
  - yarl [required: >=1.0,<1.6.0, installed: 1.5.1]
    - idna [required: >=2.0, installed: 2.10]
    - multidict [required: >=4.0, installed: 4.7.6]
    - typing-extensions [required: >=3.7.4, installed: 3.7.4.3]
fastapi==0.61.1
  - pydantic [required: >=1.0.0,<2.0.0, installed: 1.6.1]
  - starlette [required: ==0.13.6, installed: 0.13.6]
flake8==3.8.4
  - importlib-metadata [required: Any, installed: 2.0.0]
    - zipp [required: >=0.5, installed: 3.3.1]
  - mccabe [required: >=0.6.0,<0.7.0, installed: 0.6.1]
  - pycodestyle [required: >=2.6.0a1,<2.7.0, installed: 2.6.0]
  - pyflakes [required: >=2.2.0,<2.3.0, installed: 2.2.0]
httptools==0.1.1
pytest-asyncio==0.14.0
  - pytest [required: >=5.4.0, installed: 6.1.1]
    - attrs [required: >=17.4.0, installed: 20.2.0]
    - importlib-metadata [required: >=0.12, installed: 2.0.0]
      - zipp [required: >=0.5, installed: 3.3.1]
    - iniconfig [required: Any, installed: 1.1.1]
    - packaging [required: Any, installed: 20.4]
      - pyparsing [required: >=2.0.2, installed: 2.4.7]
      - six [required: Any, installed: 1.15.0]
    - pluggy [required: >=0.12,<1.0, installed: 0.13.1]
      - importlib-metadata [required: >=0.12, installed: 2.0.0]
        - zipp [required: >=0.5, installed: 3.3.1]
    - py [required: >=1.8.2, installed: 1.9.0]
    - toml [required: Any, installed: 0.10.1]
pytest-cov==2.10.1
  - coverage [required: >=4.4, installed: 5.3]
  - pytest [required: >=4.6, installed: 6.1.1]
    - attrs [required: >=17.4.0, installed: 20.2.0]
    - importlib-metadata [required: >=0.12, installed: 2.0.0]
      - zipp [required: >=0.5, installed: 3.3.1]
    - iniconfig [required: Any, installed: 1.1.1]
    - packaging [required: Any, installed: 20.4]
      - pyparsing [required: >=2.0.2, installed: 2.4.7]
      - six [required: Any, installed: 1.15.0]
    - pluggy [required: >=0.12,<1.0, installed: 0.13.1]
      - importlib-metadata [required: >=0.12, installed: 2.0.0]
        - zipp [required: >=0.5, installed: 3.3.1]
    - py [required: >=1.8.2, installed: 1.9.0]
    - toml [required: Any, installed: 0.10.1]
python-dotenv==0.14.0
python-socketio==4.6.0
  - python-engineio [required: >=3.13.0, installed: 3.13.2]
    - six [required: >=1.9.0, installed: 1.15.0]
  - six [required: >=1.9.0, installed: 1.15.0]
PyYAML==5.3.1
watchgod==0.6
websockets==8.1

(I just put the uvicorn deps on the top and skipped the mypy and jupyterlab dependencies which are very long...)

I attempted to replicate what's discussed here (#332 (comment)) using windows, but I keep having issues and I'm wondering if it's related to my python version or dependencies's.
May be...

Thanks !

Update:
I was able to make it work using linux (wsl2), but I'm experimenting an issue where the test takes a whole minute to "complete". In fact, I added a test case where I only changed the message to another word, and it takes exactly 2 minutes before completion. Every test stays stuck at

INFO:     Shutting down
INFO:     Waiting for background tasks to complete. (CTRL+C to force quit)
```0

for a whole minute before going on with the next one.

I have no "Waiting for background tasks to complete." message. Running pytest -s I get:

$ pytest -s
========================================================================================== test session starts ===========================================================================================
platform darwin -- Python 3.7.9, pytest-6.1.1, py-1.9.0, pluggy-0.13.1
rootdir: /Users/erevilla/Documents/proyectos/next/product/playground/src
plugins: cov-2.10.1, asyncio-0.14.0
collected 14 items                                                                                                                                                                                       

app/tests/test_chat.py INFO:     Started server process [33326]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
A user connected
INFO:     127.0.0.1:51226 - "GET /sio/socket.io/?transport=polling&EIO=3&t=1605442406.685194 HTTP/1.1" 200 OK
INFO:     ('127.0.0.1', 51226) - "WebSocket /sio/socket.io/" [accepted]
Client sends: Hello!
Server received and sends to all clients: Hello!
Client received: Hello!
.User disconnected
INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [33326]
Chat page: /Users/erevilla/Documents/proyectos/next/product/playground/src/app/tests/../chat.html
.INFO:     Started server process [33326]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO:     127.0.0.1:51227 - "GET /chat HTTP/1.1" 200 OK
.INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [33326]

Searching inside the uvicorn source code we get in its main.py

        # Wait for existing tasks to complete.
        if self.server_state.tasks and not self.force_exit:
            msg = "Waiting for background tasks to complete. (CTRL+C to force quit)"
            logger.info(msg)
            while self.server_state.tasks and not self.force_exit:
                await asyncio.sleep(0.1)

It seems that I don't have a background task, but you do. What version of uvicorn are you using? Can you try to additionally set force_exit = True.

@leverage-analytics
Copy link

leverage-analytics commented Mar 4, 2025

I came across this thread as I am also working on some tests in a project that uses python-socketio. To help, I have been trying to replicate the tests shared by @erny, but I'm having issues adapting them. I was hoping someone could take a look at this and provide some feedback?

Also looping in @miguelgrinberg and @Korijn to see if you have any ideas!

import socketio
import asyncio
import pytest
import pytest_asyncio
import uvicorn
import typing as t


class UvicornTestServer(uvicorn.Server):
    def __init__(self, app, host: str = "127.0.0.1", port: int = 8000):
        """Create a Uvicorn test server

        Args:
            app: the ASGI app
            host (str, optional): the host ip. Defaults to '127.0.0.1'.
            port (int, optional): the port. Defaults to PORT.
        """
        self._startup_done = asyncio.Event()
        self._serve_task: t.Awaitable[t.Any] | None = None
        super().__init__(config=uvicorn.Config(app, host=host, port=port))

    async def startup(self, sockets: list[t.Any] | None = None) -> None:
        """Override uvicorn startup"""
        await super().startup(sockets=sockets)
        self.config.setup_event_loop()
        self._startup_done.set()

    async def up(self) -> None:
        """Start up server asynchronously"""
        self._serve_task = asyncio.create_task(self.serve())
        await self._startup_done.wait()

    async def down(self) -> None:
        """Shut down server asynchronously"""
        self.should_exit = True
        if self._serve_task:
            await self._serve_task


@pytest.fixture()
def sio() -> socketio.AsyncServer:
    """Create a Socket.IO server."""
    sio = socketio.AsyncServer(
        async_mode="asgi",
        namespaces=["/chat"],
        logger=True,
        engineio_logger=True,
    )

    @sio.on("chat message", namespace="/chat")
    async def chat_message(sid, msg):
        """Receive a chat message and send to all clients"""
        print(f"Server received: {msg}")
        await sio.emit("chat message", msg, namespace="/chat")

    return sio


@pytest_asyncio.fixture(autouse=True)
async def startup_and_shutdown_server(
    sio: socketio.AsyncServer,
) -> t.AsyncIterator[None]:
    """Start server as test fixture and tear down after test"""
    server = UvicornTestServer(socketio.ASGIApp(sio, socketio_path="/socket.io"))
    await server.up()
    yield
    await server.down()


@pytest_asyncio.fixture
async def client() -> t.AsyncIterator[socketio.AsyncClient]:
    """Create a Socket.IO client."""
    client = socketio.AsyncClient(logger=True, engineio_logger=True, ssl_verify=False)
    await client.connect("http://localhost:8000", namespaces=["/chat"])

    yield client

    # await client.wait()
    # await client.disconnect()


@pytest.mark.asyncio
async def test_emit_event_from_client2(
    client: socketio.AsyncClient,
) -> None:
    """Test emitting a message."""

    future = asyncio.get_running_loop().create_future()

    @client.on("chat message", namespace="/chat")
    def on_message_received(data):
        print(f"Client received: {data}")
        # set the result
        future.set_result(data)

    message = "Hello!"

    print(f"Client sends: {message}")
    await client.emit("chat message", message, namespace="/chat")
    # wait for the result to be set (avoid waiting forever)
    await asyncio.wait_for(future, timeout=1.0)
    await client.disconnect()
    assert future.result() == message

Here are the logs

------------------------------------------------------------------------------ Captured stdout setup ------------------------------------------------------------------------------
INFO:     127.0.0.1:39046 - "GET /socket.io/?transport=polling&EIO=4&t=1741118187.710102 HTTP/1.1" 200 OK
------------------------------------------------------------------------------ Captured stderr setup ------------------------------------------------------------------------------
Server initialized for asgi.
INFO:     Started server process [59408]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
Attempting polling connection to http://localhost:8000/socket.io/?transport=polling&EIO=4
MhB57eBIrWwMvotWAAAA: Sending packet OPEN data {'sid': 'MhB57eBIrWwMvotWAAAA', 'upgrades': ['websocket'], 'pingTimeout': 20000, 'pingInterval': 25000, 'maxPayload': 1000000}
Polling connection accepted with {'sid': 'MhB57eBIrWwMvotWAAAA', 'upgrades': ['websocket'], 'pingTimeout': 20000, 'pingInterval': 25000, 'maxPayload': 1000000}
Engine.IO connection established
Sending packet MESSAGE data 0/chat,{}
Attempting WebSocket upgrade to ws://localhost:8000/socket.io/?transport=websocket&EIO=4
MhB57eBIrWwMvotWAAAA: Received request to upgrade to websocket
INFO:     ('127.0.0.1', 39062) - "WebSocket /socket.io/?transport=websocket&EIO=4&sid=MhB57eBIrWwMvotWAAAA&t=1741118187.7403438" [accepted]
INFO:     connection open
WebSocket upgrade was successful
MhB57eBIrWwMvotWAAAA: Upgrade to websocket successful
MhB57eBIrWwMvotWAAAA: Received packet MESSAGE data 0/chat,{}
MhB57eBIrWwMvotWAAAA: Sending packet MESSAGE data 0/chat,{"sid":"sv5-pmmoMiYWBbTmAAAB"}
Received packet NOOP data 
Received packet MESSAGE data 0/chat,{"sid":"sv5-pmmoMiYWBbTmAAAB"}
Namespace /chat is connected
------------------------------------------------------------------------------- Captured log setup --------------------------------------------------------------------------------
INFO     engineio.server:base_server.py:112 Server initialized for asgi.
INFO     engineio.client:async_client.py:237 Attempting polling connection to http://localhost:8000/socket.io/?transport=polling&EIO=4
INFO     engineio.server:async_socket.py:77 MhB57eBIrWwMvotWAAAA: Sending packet OPEN data {'sid': 'MhB57eBIrWwMvotWAAAA', 'upgrades': ['websocket'], 'pingTimeout': 20000, 'pingInterval': 25000, 'maxPayload': 1000000}
INFO     engineio.client:async_client.py:264 Polling connection accepted with {'sid': 'MhB57eBIrWwMvotWAAAA', 'upgrades': ['websocket'], 'pingTimeout': 20000, 'pingInterval': 25000, 'maxPayload': 1000000}
INFO     socketio.client:async_client.py:550 Engine.IO connection established
INFO     engineio.client:async_client.py:430 Sending packet MESSAGE data 0/chat,{}
INFO     engineio.client:async_client.py:298 Attempting WebSocket upgrade to ws://localhost:8000/socket.io/?transport=websocket&EIO=4
INFO     engineio.server:async_socket.py:89 MhB57eBIrWwMvotWAAAA: Received request to upgrade to websocket
INFO     engineio.client:async_client.py:377 WebSocket upgrade was successful
INFO     engineio.server:async_socket.py:217 MhB57eBIrWwMvotWAAAA: Upgrade to websocket successful
INFO     engineio.server:async_socket.py:37 MhB57eBIrWwMvotWAAAA: Received packet MESSAGE data 0/chat,{}
INFO     engineio.server:async_socket.py:77 MhB57eBIrWwMvotWAAAA: Sending packet MESSAGE data 0/chat,{"sid":"sv5-pmmoMiYWBbTmAAAB"}
INFO     engineio.client:async_client.py:409 Received packet NOOP data 
INFO     engineio.client:async_client.py:409 Received packet MESSAGE data 0/chat,{"sid":"sv5-pmmoMiYWBbTmAAAB"}
INFO     socketio.client:async_client.py:391 Namespace /chat is connected
------------------------------------------------------------------------------ Captured stdout call -------------------------------------------------------------------------------
Client sends: Hello!
------------------------------------------------------------------------------ Captured stderr call -------------------------------------------------------------------------------
Emitting event "chat message" [/chat]
Sending packet MESSAGE data 2/chat,["chat message","Hello!"]
-------------------------------------------------------------------------------- Captured log call --------------------------------------------------------------------------------
INFO     socketio.client:async_client.py:230 Emitting event "chat message" [/chat]
INFO     engineio.client:async_client.py:430 Sending packet MESSAGE data 2/chat,["chat message","Hello!"]
---------------------------------------------------------------------------- Captured stdout teardown -----------------------------------------------------------------------------
Server received: Hello!
Client received: Hello!
---------------------------------------------------------------------------- Captured stderr teardown -----------------------------------------------------------------------------
MhB57eBIrWwMvotWAAAA: Received packet MESSAGE data 2/chat,["chat message","Hello!"]
received event "chat message" from sv5-pmmoMiYWBbTmAAAB [/chat]
emitting event "chat message" to all [/chat]
MhB57eBIrWwMvotWAAAA: Sending packet MESSAGE data 2/chat,["chat message","Hello!"]
Received packet MESSAGE data 2/chat,["chat message","Hello!"]
Received event "chat message" [/chat]
INFO:     Shutting down
Server sent close packet data 1012, aborting
Waiting for write loop task to end
INFO:     connection closed
Exiting write loop task
Engine.IO connection dropped
Connection failed, new attempt in 1.37 seconds
Exiting read loop task
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [59408]
------------------------------------------------------------------------------ Captured log teardown ------------------------------------------------------------------------------
INFO     engineio.server:async_socket.py:37 MhB57eBIrWwMvotWAAAA: Received packet MESSAGE data 2/chat,["chat message","Hello!"]
INFO     socketio.server:async_server.py:594 received event "chat message" from sv5-pmmoMiYWBbTmAAAB [/chat]
INFO     socketio.server:async_server.py:176 emitting event "chat message" to all [/chat]
INFO     engineio.server:async_socket.py:77 MhB57eBIrWwMvotWAAAA: Sending packet MESSAGE data 2/chat,["chat message","Hello!"]
INFO     engineio.client:async_client.py:409 Received packet MESSAGE data 2/chat,["chat message","Hello!"]
INFO     socketio.client:async_client.py:411 Received event "chat message" [/chat]
ERROR    asyncio:base_events.py:1821 Task exception was never retrieved
future: <Task finished name='Task-29' coro=<AsyncClient._handle_eio_message() done, defined at /home/ahlav/pyek/pyek-concierge/.venv/lib/python3.12/site-packages/socketio/async_client.py:557> exception=InvalidStateError('invalid state')>
Traceback (most recent call last):
  File "/home/ahlav/pyek/pyek-concierge/.venv/lib/python3.12/site-packages/socketio/async_client.py", line 574, in _handle_eio_message
    await self._handle_event(pkt.namespace, pkt.id, pkt.data)
  File "/home/ahlav/pyek/pyek-concierge/.venv/lib/python3.12/site-packages/socketio/async_client.py", line 412, in _handle_event
    r = await self._trigger_event(data[0], namespace, *data[1:])
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ahlav/pyek/pyek-concierge/.venv/lib/python3.12/site-packages/socketio/async_client.py", line 478, in _trigger_event
    ret = handler(*args)
          ^^^^^^^^^^^^^^
  File "/home/ahlav/pyek/pyek-concierge/tests/integration/application/message_bus/providers/test_demo2.py", line 225, in on_message_received
    future.set_result(data)
asyncio.exceptions.InvalidStateError: invalid state
WARNING  engineio.client:async_client.py:569 Server sent close packet data 1012, aborting
INFO     engineio.client:async_client.py:603 Waiting for write loop task to end
INFO     engineio.client:async_client.py:680 Exiting write loop task
INFO     socketio.client:async_client.py:587 Engine.IO connection dropped
INFO     socketio.client:async_client.py:506 Connection failed, new attempt in 1.37 seconds
INFO     engineio.client:async_client.py:613 Exiting read loop task

@miguelgrinberg
Copy link
Owner

@leverage-analytics My only suggestion would be to remove the pytest fixtures from the equation and instead try to make it work without all the magical stuff that makes fixtures work, and only then introduce fixtures one change at a time. I have shared working code above that does not use pytest that you can use as a start.

@leverage-analytics
Copy link

@miguelgrinberg thanks for the input! I appreciate your time responding and maintaining the project. I will update when I have a chance.

@leverage-analytics
Copy link

leverage-analytics commented Mar 14, 2025

I resolved my issue! I reproduced both @miguelgrinberg's example in a fresh virtual environment, and it worked. I then attempted to run the same example using the virtual environment for my package (to check if there was a problem with dependency versions), and it failed. After some careful comparison of the installed packages across the two virtual environments, I found that uvloop was installed in my virtual environment (as a dependency of uvicorn), and uvicorn, as of version 0.34.0, defaults to an event loop provided by uvloop (if installed) instead of the default asyncio loop. I realized then there must have been a discrepancy between uvloop's handling of the loop compared to asyncio. Thankfully, the Config object from uvicorn has keyword argument loop that I set to asyncio to use the loop provided by asyncio. After changing the configuration, the tests worked!

Edit: This example prints error Task was destroyed but it is pending!. After some research, I came across this discussion, and I added some cleanup to the startup_and_shutdown_server fixture.

Here is my updated example, including pytest and pytest-asyncio fixtures, for anyone who might be interested in the future!

import asyncio
import typing as t

import pytest
import pytest_asyncio
import socketio
import uvicorn


class UvicornTestServer(uvicorn.Server):
    def __init__(self, app, host: str = "127.0.0.1", port: int = 8000):
        self._startup_done = asyncio.Event()
        self._serve_task: t.Awaitable[t.Any] | None = None
        super().__init__(
            config=uvicorn.Config(app, host=host, port=port, loop="asyncio")
        )

    async def startup(self, sockets: list[t.Any] | None = None) -> None:
        """Override uvicorn startup."""
        await super().startup(sockets=sockets)
        self.config.setup_event_loop()
        self._startup_done.set()

    async def up(self) -> None:
        """Start up server asynchronously."""
        self._serve_task = asyncio.create_task(self.serve())
        await self._startup_done.wait()

    async def down(self) -> None:
        """Shut down server asynchronously."""
        self.should_exit = True
        if self._serve_task:
            await self._serve_task


@pytest_asyncio.fixture
async def sio() -> socketio.AsyncServer:
    """Create a Socket.IO server."""
    sio = socketio.AsyncServer(
        async_mode="asgi",
        monitor_clients=False,
        logger=True,
        engineio_logger=True,
    )

    @sio.on("double")
    async def double(sid, data):
        print(f"doubling for {sid}")
        return "DOUBLED:" + data * 2

    return sio


@pytest_asyncio.fixture()
async def startup_and_shutdown_server(
    sio: socketio.AsyncServer,
) -> t.AsyncIterator[None]:
    """Start server as test fixture and tear down after test."""
    server = UvicornTestServer(socketio.ASGIApp(sio))
    await server.up()
    yield
    await server.down()

    tasks = [
        task
        for task in asyncio.all_tasks(loop=asyncio.get_running_loop())
        if not task.done()
    ]
    for task in tasks:
        task.cancel()

    try:
        await asyncio.wait(tasks)
    except asyncio.CancelledError:
        # If the task was cancelled, it will raise a CancelledError
        # We can ignore this error as it is expected
        pass


@pytest_asyncio.fixture
async def client() -> t.AsyncIterator[socketio.AsyncClient]:
    """Create a Socket.IO client."""
    client = socketio.AsyncClient(logger=True, engineio_logger=True)
    await client.connect("http://localhost:8000")
    yield client


@pytest.mark.asyncio
@pytest.mark.usefixtures("startup_and_shutdown_server")
async def test_client(client: socketio.AsyncClient) -> None:
    result = await client.call("double", "hello")
    assert result == "DOUBLED:hellohello"
    await client.disconnect()

@miguelgrinberg Thanks again for your help!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants