Skip to content

Commit 95f8362

Browse files
committed
WIP
1 parent 8494cf6 commit 95f8362

File tree

4 files changed

+144
-22
lines changed

4 files changed

+144
-22
lines changed

services/payments/src/simcore_service_payments/core/application.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from fastapi import FastAPI
22
from servicelib.fastapi.openapi import override_fastapi_openapi_method
3+
from simcore_service_payments.services.socketio import setup_socketio
34

45
from .._meta import (
56
API_VERSION,
@@ -54,6 +55,7 @@ def create_app(settings: ApplicationSettings | None = None) -> FastAPI:
5455

5556
# Listening to Rabbitmq
5657
setup_auto_recharge_listener(app)
58+
setup_socketio(app)
5759

5860
# ERROR HANDLERS
5961
# ... add here ...

services/payments/src/simcore_service_payments/services/socketio.py

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,25 @@
11
import logging
2+
from collections.abc import Sequence
3+
from typing import Any, Final, TypedDict
24

35
import socketio
46
from fastapi import FastAPI
7+
from fastapi.encoders import jsonable_encoder
8+
from models_library.api_schemas_webserver.wallets import PaymentTransaction
9+
from models_library.users import UserID
10+
from servicelib.json_serialization import json_dumps
11+
from servicelib.utils import logged_gather
512
from settings_library.rabbit import RabbitSettings
613

714
from .rabbitmq import get_rabbitmq_settings
815

916
_logger = logging.getLogger(__name__)
1017

1118

19+
SOCKET_IO_PAYMENT_COMPLETED_EVENT: Final[str] = "paymentCompleted"
20+
SOCKET_IO_PAYMENT_METHOD_ACKED_EVENT: Final[str] = "paymentMethodAcknoledged"
21+
22+
1223
def setup_socketio(app: FastAPI):
1324
settings: RabbitSettings = get_rabbitmq_settings(app)
1425

@@ -31,5 +42,53 @@ async def _on_shutdown() -> None:
3142
app.add_event_handler("shutdown", _on_shutdown)
3243

3344

34-
async def emit_to_frontend(app: FastAPI, event_name: str, data: dict, to=None):
45+
async def emit_to_frontend(
46+
app: FastAPI, event_name: str, data: dict, to: str | None = None
47+
):
48+
49+
# Send messages to clients from external processes, such as Celery workers or auxiliary scripts.
3550
return await app.state.external_sio.emit(event_name, data=data, to=to)
51+
52+
53+
async def notify_payment_completed(
54+
app: FastAPI,
55+
*,
56+
user_id: UserID,
57+
payment: PaymentTransaction,
58+
):
59+
assert payment.completed_at is not None # nosec
60+
61+
messages: list[SocketMessageDict] = [
62+
{
63+
"event_type": SOCKET_IO_PAYMENT_COMPLETED_EVENT,
64+
"data": jsonable_encoder(payment, by_alias=True),
65+
}
66+
]
67+
await send_messages(app, user_id, messages)
68+
69+
70+
class SocketMessageDict(TypedDict):
71+
event_type: str
72+
data: dict[str, Any]
73+
74+
75+
async def send_messages(
76+
app: FastAPI, user_id: UserID, messages: Sequence[SocketMessageDict]
77+
) -> None:
78+
79+
sio = app.state.external_sio
80+
81+
socket_ids: list[str] = []
82+
# with managed_resource(user_id, None, app) as user_session:
83+
# socket_ids = await user_session.find_socket_ids()
84+
85+
await logged_gather(
86+
*(
87+
sio.emit(message["event_type"], data=json_dumps(message["data"]), room=sid)
88+
for message in messages
89+
for sid in socket_ids
90+
),
91+
reraise=False,
92+
log=_logger,
93+
max_concurrency=100,
94+
)

services/payments/tests/unit/test_services_socketio.py

Lines changed: 78 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,13 @@
1+
# pylint: disable=redefined-outer-name
2+
# pylint: disable=unused-argument
3+
# pylint: disable=unused-variable
4+
# pylint: disable=too-many-arguments
5+
6+
7+
import asyncio
18
from collections.abc import Callable
9+
from typing import Any
10+
from unittest.mock import AsyncMock
211

312
import pytest
413
import socketio
@@ -9,6 +18,7 @@
918
from pytest_simcore.helpers.utils_envs import setenvs_from_dict
1019
from settings_library.rabbit import RabbitSettings
1120
from simcore_service_payments.services.socketio import (
21+
SOCKET_IO_PAYMENT_COMPLETED_EVENT,
1222
emit_to_frontend,
1323
get_rabbitmq_settings,
1424
)
@@ -24,6 +34,7 @@
2434
def app_environment(
2535
monkeypatch: pytest.MonkeyPatch,
2636
app_environment: EnvVarsDict,
37+
with_disabled_postgres: None,
2738
rabbit_env_vars_dict: EnvVarsDict, # rabbitMQ settings from 'rabbit' service
2839
):
2940
# set environs
@@ -44,7 +55,7 @@ async def test_socketio_setup():
4455

4556

4657
@pytest.fixture
47-
async def socketio_aiohttp_server(app: FastAPI, aiohttp_server: Callable) -> TestServer:
58+
async def socketio_server(app: FastAPI, aiohttp_server: Callable) -> TestServer:
4859
"""
4960
this emulates the webserver setup: socketio server with
5061
an aiopika manager that attaches an aiohttp web app
@@ -60,35 +71,81 @@ async def socketio_aiohttp_server(app: FastAPI, aiohttp_server: Callable) -> Tes
6071
client_manager=server_manager,
6172
)
6273

63-
@sio_server.event()
64-
async def connect(sid, environ):
65-
...
74+
@sio_server.event
75+
async def connect(sid: str, environ):
76+
print("connecting", sid)
77+
78+
@sio_server.on(SOCKET_IO_PAYMENT_COMPLETED_EVENT)
79+
async def on_payment(sid, data):
80+
print(sid, Any)
81+
82+
@sio_server.event
83+
async def disconnect(sid: str):
84+
print("disconnecting", sid)
6685

6786
sio_server.attach(aiohttp_app)
6887

88+
# starts server
6989
return await aiohttp_server(aiohttp_app)
7090

7191

72-
async def test_emit_socketio_event_to_front_end(
73-
app: FastAPI, socketio_aiohttp_server: TestServer
92+
@pytest.fixture
93+
async def create_sio_client(socketio_server: TestServer):
94+
server_url = socketio_server.make_url("/")
95+
_clients = []
96+
97+
async def _():
98+
cli = socketio.AsyncClient(
99+
logger=True,
100+
engineio_logger=True,
101+
)
102+
103+
# https://python-socketio.readthedocs.io/en/stable/client.html#connecting-to-a-server
104+
# Allows WebSocket transport and disconnect HTTP long-polling
105+
await cli.connect(f"{server_url}", transports=["websocket"])
106+
107+
_clients.append(cli)
108+
109+
return cli
110+
111+
yield _
112+
113+
for client in _clients:
114+
await client.disconnect()
115+
116+
117+
async def test_emit_message_as_external_process_to_frontend_client(
118+
app: FastAPI, create_sio_client: Callable
74119
):
75-
server_url = socketio_aiohttp_server.make_url("/")
120+
"""
121+
front-end -> socketio client (many different clients)
122+
webserver -> socketio server (one/more replicas)
123+
payments -> Sends messages to clients from external processes (one/more replicas)
124+
"""
76125

77-
# create a client
78-
async with socketio.AsyncSimpleClient(logger=True, engineio_logger=True) as sio:
126+
# emulates front-end receiving message
127+
client_1: socketio.AsyncClient = await create_sio_client()
79128

80-
# https://python-socketio.readthedocs.io/en/stable/client.html#connecting-to-a-server
81-
# connect to a server
82-
await sio.connect(server_url, transports=["websocket"])
83-
session_client_id = sio.sid
129+
@client_1.on(SOCKET_IO_PAYMENT_COMPLETED_EVENT)
130+
async def on_event(data):
131+
print("client1", data)
84132

85-
# emit from external
86-
await emit_to_frontend(
87-
app, event_name="event", data={"foo": "bar"}, to=session_client_id
88-
)
133+
on_event_spy = AsyncMock(wraps=on_event)
134+
135+
await client_1.emit(SOCKET_IO_PAYMENT_COMPLETED_EVENT, data="hoi1")
136+
137+
# TODO: better to do this from a different process??
138+
# emit from external process
139+
await emit_to_frontend(
140+
app,
141+
event_name=SOCKET_IO_PAYMENT_COMPLETED_EVENT,
142+
data={"foo": "bar"},
143+
# to=client_1.sid,
144+
)
145+
146+
await client_1.emit(SOCKET_IO_PAYMENT_COMPLETED_EVENT, data="hoi2")
89147

90-
# client receives it
91-
event: list = await sio.receive(timeout=5)
92-
event_name, *event_kwargs = event
148+
await client_1.sleep(1)
149+
await asyncio.sleep(1)
93150

94-
assert event_name == "event"
151+
on_event_spy.assert_called()

services/web/server/src/simcore_service_webserver/socketio/messages.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818

1919
_logger = logging.getLogger(__name__)
2020

21+
22+
#
23+
# List of socket-io event names
24+
#
2125
SOCKET_IO_EVENT: Final[str] = "event"
2226
SOCKET_IO_HEARTBEAT_EVENT: Final[str] = "set_heartbeat_emit_interval"
2327
SOCKET_IO_LOG_EVENT: Final[str] = "logger"

0 commit comments

Comments
 (0)