diff --git a/.codeclimate.yml b/.codeclimate.yml
index a0936fee04a..e1c5a936f9d 100644
--- a/.codeclimate.yml
+++ b/.codeclimate.yml
@@ -59,29 +59,30 @@ plugins:
- .js
exclude_patterns:
- - "config/"
- - "db/"
- - "dist/"
- - "features/"
- - "**/node_modules/"
- - "script"
- - "**/spec/"
- - "**/test/"
- - "**/tests/"
- - "**/vendor/"
- - "**/*.d.ts"
- - "**/.venv/"
- ".venv/"
- - "**/healthcheck.py"
+ - "**/.venv/"
+ - "**/*.d.ts"
+ - "**/*.js"
- "**/client-sdk/"
- "**/generated_code/"
+ - "**/healthcheck.py"
- "**/migrations/"
- - "**/*.js"
- - "**/pytest-simcore/"
+ - "**/node_modules/"
- "**/pytest_plugin/"
+ - "**/pytest-simcore/"
- "**/sandbox/"
+ - "**/spec/"
+ - "**/test/"
+ - "**/tests/"
+ - "**/vendor/"
+ - "config/"
+ - "db/"
+ - "dist/"
+ - "features/"
+ - "script"
+ - "scripts/"
- packages/models-library/src/models_library/utils/_original_fastapi_encoders.py
+ - services/payments/scripts/example_payment_gateway.py
- services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/modules/db/repositories/resource_tracker.py
- services/web/server/src/simcore_service_webserver/exporter/formatters/sds/xlsx/templates/code_description.py
- services/web/server/src/simcore_service_webserver/projects/db.py
- - "scripts/"
diff --git a/packages/models-library/src/models_library/api_schemas_payments/socketio.py b/packages/models-library/src/models_library/api_schemas_payments/socketio.py
new file mode 100644
index 00000000000..b828080f9ba
--- /dev/null
+++ b/packages/models-library/src/models_library/api_schemas_payments/socketio.py
@@ -0,0 +1,4 @@
+from typing import Final
+
+SOCKET_IO_PAYMENT_COMPLETED_EVENT: Final[str] = "paymentCompleted"
+SOCKET_IO_PAYMENT_METHOD_ACKED_EVENT: Final[str] = "paymentMethodAcknoledged"
diff --git a/packages/models-library/src/models_library/socketio.py b/packages/models-library/src/models_library/socketio.py
new file mode 100644
index 00000000000..88b0e9a0beb
--- /dev/null
+++ b/packages/models-library/src/models_library/socketio.py
@@ -0,0 +1,6 @@
+from typing import Any, TypedDict
+
+
+class SocketMessageDict(TypedDict):
+ event_type: str
+ data: dict[str, Any]
diff --git a/packages/service-library/src/servicelib/socketio_utils.py b/packages/service-library/src/servicelib/socketio_utils.py
new file mode 100644
index 00000000000..efc63436715
--- /dev/null
+++ b/packages/service-library/src/servicelib/socketio_utils.py
@@ -0,0 +1,43 @@
+""" Common utilities for python-socketio library
+
+
+NOTE: we intentionally avoided importing socketio here to avoid adding an extra dependency at
+this level which would include python-socketio in all libraries
+"""
+
+import asyncio
+
+
+async def cleanup_socketio_async_pubsub_manager(server_manager):
+
+ # NOTE: this is ugly. It seems though that python-socketio does not
+ # cleanup its background tasks properly.
+ # https://github.com/miguelgrinberg/python-socketio/discussions/1092
+ cancelled_tasks = []
+
+ if hasattr(server_manager, "thread"):
+ server_thread = server_manager.thread
+ assert isinstance(server_thread, asyncio.Task) # nosec
+ server_thread.cancel()
+ cancelled_tasks.append(server_thread)
+
+ if server_manager.publisher_channel:
+ await server_manager.publisher_channel.close()
+
+ if server_manager.publisher_connection:
+ await server_manager.publisher_connection.close()
+
+ current_tasks = asyncio.tasks.all_tasks()
+ for task in current_tasks:
+ coro = task.get_coro()
+ if any(
+ coro_name in coro.__qualname__ # type: ignore
+ for coro_name in [
+ "AsyncServer._service_task",
+ "AsyncSocket.schedule_ping",
+ "AsyncPubSubManager._thread",
+ ]
+ ):
+ task.cancel()
+ cancelled_tasks.append(task)
+ await asyncio.gather(*cancelled_tasks, return_exceptions=True)
diff --git a/packages/service-library/src/servicelib/utils.py b/packages/service-library/src/servicelib/utils.py
index 3cbad7930db..001e183914e 100644
--- a/packages/service-library/src/servicelib/utils.py
+++ b/packages/service-library/src/servicelib/utils.py
@@ -75,7 +75,7 @@ def fire_and_forget_task(
task = asyncio.create_task(obj, name=f"fire_and_forget_task_{task_suffix_name}")
fire_and_forget_tasks_collection.add(task)
- def log_exception_callback(fut: asyncio.Future):
+ def _log_exception_callback(fut: asyncio.Future):
try:
fut.result()
except asyncio.CancelledError:
@@ -83,7 +83,7 @@ def log_exception_callback(fut: asyncio.Future):
except Exception: # pylint: disable=broad-except
_logger.exception("Error occurred while running task %s!", task.get_name())
- task.add_done_callback(log_exception_callback)
+ task.add_done_callback(_log_exception_callback)
task.add_done_callback(fire_and_forget_tasks_collection.discard)
return task
diff --git a/services/payments/gateway/Makefile b/services/payments/gateway/Makefile
index 87709160533..98c1c728461 100644
--- a/services/payments/gateway/Makefile
+++ b/services/payments/gateway/Makefile
@@ -2,13 +2,6 @@ include ../../../scripts/common.Makefile
-.PHONY: run-devel
-run-devel: ## runs example_payment_gateway server
- # SEE http://127.0.0.1:8000/docs
- set -o allexport; source .env-secret; set +o allexport; \
- uvicorn example_payment_gateway:the_app --reload
-
.PHONY: openapi.json
openapi.json: ## creates OAS
- @set -o allexport; source .env-secret; set +o allexport; \
- python example_payment_gateway.py openapi > $@
+ example_payment_gateway.py openapi > $@
diff --git a/services/payments/gateway/openapi.json b/services/payments/gateway/openapi.json
index ecc63c61c0f..e69de29bb2d 100644
--- a/services/payments/gateway/openapi.json
+++ b/services/payments/gateway/openapi.json
@@ -1,761 +0,0 @@
-{
- "openapi": "3.0.0",
- "info": {
- "title": "osparc-compliant payment-gateway",
- "version": "0.3.0"
- },
- "paths": {
- "/init": {
- "post": {
- "tags": [
- "payment"
- ],
- "summary": "Init Payment",
- "operationId": "init_payment",
- "parameters": [
- {
- "required": false,
- "schema": {
- "type": "string",
- "title": "X-Init-Api-Secret"
- },
- "name": "x-init-api-secret",
- "in": "header"
- }
- ],
- "requestBody": {
- "content": {
- "application/json": {
- "schema": {
- "$ref": "#/components/schemas/InitPayment"
- }
- }
- },
- "required": true
- },
- "responses": {
- "200": {
- "description": "Successful Response",
- "content": {
- "application/json": {
- "schema": {
- "$ref": "#/components/schemas/PaymentInitiated"
- }
- }
- }
- },
- "4XX": {
- "description": "Client Error",
- "content": {
- "application/json": {
- "schema": {
- "$ref": "#/components/schemas/ErrorModel"
- }
- }
- }
- }
- }
- }
- },
- "/pay": {
- "get": {
- "tags": [
- "payment"
- ],
- "summary": "Get Payment Form",
- "operationId": "get_payment_form",
- "parameters": [
- {
- "required": true,
- "schema": {
- "type": "string",
- "maxLength": 50,
- "minLength": 1,
- "title": "Id"
- },
- "name": "id",
- "in": "query"
- }
- ],
- "responses": {
- "200": {
- "description": "Successful Response",
- "content": {
- "text/html": {
- "schema": {
- "type": "string"
- }
- }
- }
- },
- "4XX": {
- "description": "Client Error",
- "content": {
- "text/html": {
- "schema": {
- "type": "string"
- }
- }
- }
- }
- }
- }
- },
- "/cancel": {
- "post": {
- "tags": [
- "payment"
- ],
- "summary": "Cancel Payment",
- "operationId": "cancel_payment",
- "parameters": [
- {
- "required": false,
- "schema": {
- "type": "string",
- "title": "X-Init-Api-Secret"
- },
- "name": "x-init-api-secret",
- "in": "header"
- }
- ],
- "requestBody": {
- "content": {
- "application/json": {
- "schema": {
- "$ref": "#/components/schemas/PaymentInitiated"
- }
- }
- },
- "required": true
- },
- "responses": {
- "200": {
- "description": "Successful Response",
- "content": {
- "application/json": {
- "schema": {
- "$ref": "#/components/schemas/PaymentCancelled"
- }
- }
- }
- },
- "4XX": {
- "description": "Client Error",
- "content": {
- "application/json": {
- "schema": {
- "$ref": "#/components/schemas/ErrorModel"
- }
- }
- }
- }
- }
- }
- },
- "/payment-methods:init": {
- "post": {
- "tags": [
- "payment-method"
- ],
- "summary": "Init Payment Method",
- "operationId": "init_payment_method",
- "parameters": [
- {
- "required": false,
- "schema": {
- "type": "string",
- "title": "X-Init-Api-Secret"
- },
- "name": "x-init-api-secret",
- "in": "header"
- }
- ],
- "requestBody": {
- "content": {
- "application/json": {
- "schema": {
- "$ref": "#/components/schemas/InitPaymentMethod"
- }
- }
- },
- "required": true
- },
- "responses": {
- "200": {
- "description": "Successful Response",
- "content": {
- "application/json": {
- "schema": {
- "$ref": "#/components/schemas/PaymentMethodInitiated"
- }
- }
- }
- },
- "4XX": {
- "description": "Client Error",
- "content": {
- "application/json": {
- "schema": {
- "$ref": "#/components/schemas/ErrorModel"
- }
- }
- }
- }
- }
- }
- },
- "/payment-methods/form": {
- "get": {
- "tags": [
- "payment-method"
- ],
- "summary": "Get Form Payment Method",
- "operationId": "get_form_payment_method",
- "parameters": [
- {
- "required": true,
- "schema": {
- "type": "string",
- "maxLength": 50,
- "minLength": 1,
- "title": "Id"
- },
- "name": "id",
- "in": "query"
- }
- ],
- "responses": {
- "200": {
- "description": "Successful Response",
- "content": {
- "text/html": {
- "schema": {
- "type": "string"
- }
- }
- }
- },
- "4XX": {
- "description": "Client Error",
- "content": {
- "text/html": {
- "schema": {
- "type": "string"
- }
- }
- }
- }
- }
- }
- },
- "/payment-methods:batchGet": {
- "post": {
- "tags": [
- "payment-method"
- ],
- "summary": "Batch Get Payment Methods",
- "operationId": "batch_get_payment_methods",
- "parameters": [
- {
- "required": false,
- "schema": {
- "type": "string",
- "title": "X-Init-Api-Secret"
- },
- "name": "x-init-api-secret",
- "in": "header"
- }
- ],
- "requestBody": {
- "content": {
- "application/json": {
- "schema": {
- "$ref": "#/components/schemas/BatchGetPaymentMethods"
- }
- }
- },
- "required": true
- },
- "responses": {
- "200": {
- "description": "Successful Response",
- "content": {
- "application/json": {
- "schema": {
- "$ref": "#/components/schemas/PaymentMethodsBatch"
- }
- }
- }
- },
- "4XX": {
- "description": "Client Error",
- "content": {
- "application/json": {
- "schema": {
- "$ref": "#/components/schemas/ErrorModel"
- }
- }
- }
- }
- }
- }
- },
- "/payment-methods/{id}": {
- "get": {
- "tags": [
- "payment-method"
- ],
- "summary": "Get Payment Method",
- "operationId": "get_payment_method",
- "parameters": [
- {
- "required": true,
- "schema": {
- "type": "string",
- "maxLength": 50,
- "minLength": 1,
- "title": "Id"
- },
- "name": "id",
- "in": "path"
- },
- {
- "required": false,
- "schema": {
- "type": "string",
- "title": "X-Init-Api-Secret"
- },
- "name": "x-init-api-secret",
- "in": "header"
- }
- ],
- "responses": {
- "200": {
- "description": "Successful Response",
- "content": {
- "application/json": {
- "schema": {
- "$ref": "#/components/schemas/GetPaymentMethod"
- }
- }
- }
- },
- "404": {
- "description": "Payment method not found: It was not added or incomplete (i.e. create flow failed or canceled)",
- "content": {
- "application/json": {
- "schema": {
- "$ref": "#/components/schemas/ErrorModel"
- }
- }
- }
- },
- "4XX": {
- "description": "Client Error",
- "content": {
- "application/json": {
- "schema": {
- "$ref": "#/components/schemas/ErrorModel"
- }
- }
- }
- }
- }
- },
- "delete": {
- "tags": [
- "payment-method"
- ],
- "summary": "Delete Payment Method",
- "operationId": "delete_payment_method",
- "parameters": [
- {
- "required": true,
- "schema": {
- "type": "string",
- "maxLength": 50,
- "minLength": 1,
- "title": "Id"
- },
- "name": "id",
- "in": "path"
- },
- {
- "required": false,
- "schema": {
- "type": "string",
- "title": "X-Init-Api-Secret"
- },
- "name": "x-init-api-secret",
- "in": "header"
- }
- ],
- "responses": {
- "204": {
- "description": "Successful Response"
- },
- "4XX": {
- "description": "Client Error",
- "content": {
- "application/json": {
- "schema": {
- "$ref": "#/components/schemas/ErrorModel"
- }
- }
- }
- }
- }
- }
- },
- "/payment-methods/{id}:pay": {
- "post": {
- "tags": [
- "payment-method"
- ],
- "summary": "Pay With Payment Method",
- "operationId": "pay_with_payment_method",
- "parameters": [
- {
- "required": true,
- "schema": {
- "type": "string",
- "maxLength": 50,
- "minLength": 1,
- "title": "Id"
- },
- "name": "id",
- "in": "path"
- },
- {
- "required": false,
- "schema": {
- "type": "string",
- "title": "X-Init-Api-Secret"
- },
- "name": "x-init-api-secret",
- "in": "header"
- }
- ],
- "requestBody": {
- "content": {
- "application/json": {
- "schema": {
- "$ref": "#/components/schemas/InitPayment"
- }
- }
- },
- "required": true
- },
- "responses": {
- "200": {
- "description": "Successful Response",
- "content": {
- "application/json": {
- "schema": {
- "$ref": "#/components/schemas/AckPaymentWithPaymentMethod"
- }
- }
- }
- },
- "4XX": {
- "description": "Client Error",
- "content": {
- "application/json": {
- "schema": {
- "$ref": "#/components/schemas/ErrorModel"
- }
- }
- }
- }
- }
- }
- }
- },
- "components": {
- "schemas": {
- "AckPaymentWithPaymentMethod": {
- "properties": {
- "success": {
- "type": "boolean",
- "title": "Success"
- },
- "message": {
- "type": "string",
- "title": "Message"
- },
- "provider_payment_id": {
- "type": "string",
- "maxLength": 50,
- "minLength": 1,
- "title": "Provider Payment Id",
- "description": "Payment ID from the provider (e.g. stripe payment ID)"
- },
- "invoice_url": {
- "type": "string",
- "maxLength": 2083,
- "minLength": 1,
- "format": "uri",
- "title": "Invoice Url",
- "description": "Link to invoice is required when success=true"
- },
- "payment_id": {
- "type": "string",
- "maxLength": 50,
- "minLength": 1,
- "title": "Payment Id",
- "description": "Payment ID from the gateway"
- }
- },
- "type": "object",
- "required": [
- "success"
- ],
- "title": "AckPaymentWithPaymentMethod",
- "example": {
- "success": true,
- "provider_payment_id": "pi_123ABC",
- "invoice_url": "https://invoices.com/id=12345",
- "payment_id": "D19EE68B-B007-4B61-A8BC-32B7115FB244"
- }
- },
- "BatchGetPaymentMethods": {
- "properties": {
- "payment_methods_ids": {
- "items": {
- "type": "string",
- "maxLength": 50,
- "minLength": 1
- },
- "type": "array",
- "title": "Payment Methods Ids"
- }
- },
- "type": "object",
- "required": [
- "payment_methods_ids"
- ],
- "title": "BatchGetPaymentMethods"
- },
- "ErrorModel": {
- "properties": {
- "message": {
- "type": "string",
- "title": "Message"
- },
- "exception": {
- "type": "string",
- "title": "Exception"
- },
- "file": {
- "anyOf": [
- {
- "type": "string",
- "format": "path"
- },
- {
- "type": "string"
- }
- ],
- "title": "File"
- },
- "line": {
- "type": "integer",
- "title": "Line"
- },
- "trace": {
- "items": {},
- "type": "array",
- "title": "Trace"
- }
- },
- "type": "object",
- "required": [
- "message"
- ],
- "title": "ErrorModel"
- },
- "GetPaymentMethod": {
- "properties": {
- "id": {
- "type": "string",
- "maxLength": 50,
- "minLength": 1,
- "title": "Id"
- },
- "card_holder_name": {
- "type": "string",
- "title": "Card Holder Name"
- },
- "card_number_masked": {
- "type": "string",
- "title": "Card Number Masked"
- },
- "card_type": {
- "type": "string",
- "title": "Card Type"
- },
- "expiration_month": {
- "type": "integer",
- "title": "Expiration Month"
- },
- "expiration_year": {
- "type": "integer",
- "title": "Expiration Year"
- },
- "created": {
- "type": "string",
- "format": "date-time",
- "title": "Created"
- }
- },
- "type": "object",
- "required": [
- "id",
- "created"
- ],
- "title": "GetPaymentMethod"
- },
- "InitPayment": {
- "properties": {
- "amount_dollars": {
- "type": "number",
- "exclusiveMaximum": true,
- "exclusiveMinimum": true,
- "title": "Amount Dollars",
- "maximum": 1000000.0,
- "minimum": 0.0
- },
- "credits": {
- "type": "number",
- "exclusiveMaximum": true,
- "exclusiveMinimum": true,
- "title": "Credits",
- "maximum": 1000000.0,
- "minimum": 0.0
- },
- "user_name": {
- "type": "string",
- "maxLength": 50,
- "minLength": 1,
- "title": "User Name"
- },
- "user_email": {
- "type": "string",
- "format": "email",
- "title": "User Email"
- },
- "wallet_name": {
- "type": "string",
- "maxLength": 50,
- "minLength": 1,
- "title": "Wallet Name"
- }
- },
- "additionalProperties": false,
- "type": "object",
- "required": [
- "amount_dollars",
- "credits",
- "user_name",
- "user_email",
- "wallet_name"
- ],
- "title": "InitPayment"
- },
- "InitPaymentMethod": {
- "properties": {
- "method": {
- "type": "string",
- "enum": [
- "CC"
- ],
- "title": "Method",
- "default": "CC"
- },
- "user_name": {
- "type": "string",
- "maxLength": 50,
- "minLength": 1,
- "title": "User Name"
- },
- "user_email": {
- "type": "string",
- "format": "email",
- "title": "User Email"
- },
- "wallet_name": {
- "type": "string",
- "maxLength": 50,
- "minLength": 1,
- "title": "Wallet Name"
- }
- },
- "additionalProperties": false,
- "type": "object",
- "required": [
- "user_name",
- "user_email",
- "wallet_name"
- ],
- "title": "InitPaymentMethod"
- },
- "PaymentCancelled": {
- "properties": {
- "message": {
- "type": "string",
- "title": "Message"
- }
- },
- "type": "object",
- "title": "PaymentCancelled"
- },
- "PaymentInitiated": {
- "properties": {
- "payment_id": {
- "type": "string",
- "maxLength": 50,
- "minLength": 1,
- "title": "Payment Id"
- }
- },
- "type": "object",
- "required": [
- "payment_id"
- ],
- "title": "PaymentInitiated"
- },
- "PaymentMethodInitiated": {
- "properties": {
- "payment_method_id": {
- "type": "string",
- "maxLength": 50,
- "minLength": 1,
- "title": "Payment Method Id"
- }
- },
- "type": "object",
- "required": [
- "payment_method_id"
- ],
- "title": "PaymentMethodInitiated"
- },
- "PaymentMethodsBatch": {
- "properties": {
- "items": {
- "items": {
- "$ref": "#/components/schemas/GetPaymentMethod"
- },
- "type": "array",
- "title": "Items"
- }
- },
- "type": "object",
- "required": [
- "items"
- ],
- "title": "PaymentMethodsBatch"
- }
- }
- }
-}
diff --git a/services/payments/requirements/_base.in b/services/payments/requirements/_base.in
index 602ea897f9e..5ef19fb42d5 100644
--- a/services/payments/requirements/_base.in
+++ b/services/payments/requirements/_base.in
@@ -20,5 +20,6 @@ httpx
packaging
python-jose
python-multipart
+python-socketio
typer[all]
uvicorn[standard]
diff --git a/services/payments/requirements/_base.txt b/services/payments/requirements/_base.txt
index c1f4db27fa6..9e1ab68c35a 100644
--- a/services/payments/requirements/_base.txt
+++ b/services/payments/requirements/_base.txt
@@ -62,6 +62,8 @@ attrs==23.1.0
# aiohttp
# jsonschema
# referencing
+bidict==0.22.1
+ # via python-socketio
certifi==2023.7.22
# via
# -c requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt
@@ -131,6 +133,7 @@ h11==0.14.0
# via
# httpcore
# uvicorn
+ # wsproto
httpcore==0.18.0
# via httpx
httptools==0.6.1
@@ -233,10 +236,14 @@ python-dateutil==2.8.2
# via arrow
python-dotenv==1.0.0
# via uvicorn
+python-engineio==4.8.0
+ # via python-socketio
python-jose==3.3.0
# via -r requirements/_base.in
python-multipart==0.0.6
# via -r requirements/_base.in
+python-socketio==5.10.0
+ # via -r requirements/_base.in
pyyaml==6.0.1
# via
# -c requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt
@@ -297,6 +304,8 @@ rsa==4.9
# python-jose
shellingham==1.5.4
# via typer
+simple-websocket==1.0.0
+ # via python-engineio
six==1.16.0
# via
# ecdsa
@@ -372,6 +381,8 @@ watchfiles==0.21.0
# via uvicorn
websockets==12.0
# via uvicorn
+wsproto==1.2.0
+ # via simple-websocket
yarl==1.9.2
# via
# -r requirements/../../../packages/postgres-database/requirements/_base.in
diff --git a/services/payments/requirements/_test.in b/services/payments/requirements/_test.in
index b4f4311171e..e67da6f24e9 100644
--- a/services/payments/requirements/_test.in
+++ b/services/payments/requirements/_test.in
@@ -17,6 +17,7 @@ docker
faker
jsonref
pytest
+pytest-aiohttp
pytest-asyncio
pytest-cov
pytest-icdiff
@@ -24,4 +25,5 @@ pytest-mock
pytest-runner
pytest-sugar
python-dotenv
+python-socketio[asyncio_client]
respx
diff --git a/services/payments/requirements/_test.txt b/services/payments/requirements/_test.txt
index b0774954946..9b76509ff98 100644
--- a/services/payments/requirements/_test.txt
+++ b/services/payments/requirements/_test.txt
@@ -4,12 +4,34 @@
#
# pip-compile --output-file=requirements/_test.txt --strip-extras requirements/_test.in
#
+aiohttp==3.8.6
+ # via
+ # -c requirements/../../../requirements/constraints.txt
+ # -c requirements/_base.txt
+ # pytest-aiohttp
+ # python-socketio
+aiosignal==1.3.1
+ # via
+ # -c requirements/_base.txt
+ # aiohttp
anyio==4.0.0
# via
# -c requirements/_base.txt
# httpcore
asgi-lifespan==2.1.0
# via -r requirements/_test.in
+async-timeout==4.0.3
+ # via
+ # -c requirements/_base.txt
+ # aiohttp
+attrs==23.1.0
+ # via
+ # -c requirements/_base.txt
+ # aiohttp
+bidict==0.22.1
+ # via
+ # -c requirements/_base.txt
+ # python-socketio
certifi==2023.7.22
# via
# -c requirements/../../../requirements/constraints.txt
@@ -20,6 +42,7 @@ certifi==2023.7.22
charset-normalizer==3.3.2
# via
# -c requirements/_base.txt
+ # aiohttp
# requests
coverage==7.3.2
# via
@@ -34,10 +57,16 @@ exceptiongroup==1.1.3
# pytest
faker==19.13.0
# via -r requirements/_test.in
+frozenlist==1.4.0
+ # via
+ # -c requirements/_base.txt
+ # aiohttp
+ # aiosignal
h11==0.14.0
# via
# -c requirements/_base.txt
# httpcore
+ # wsproto
httpcore==0.18.0
# via
# -c requirements/_base.txt
@@ -55,10 +84,16 @@ idna==3.4
# anyio
# httpx
# requests
+ # yarl
iniconfig==2.0.0
# via pytest
jsonref==1.1.0
# via -r requirements/_test.in
+multidict==6.0.4
+ # via
+ # -c requirements/_base.txt
+ # aiohttp
+ # yarl
packaging==23.2
# via
# -c requirements/_base.txt
@@ -72,13 +107,18 @@ pprintpp==0.4.0
pytest==7.4.3
# via
# -r requirements/_test.in
+ # pytest-aiohttp
# pytest-asyncio
# pytest-cov
# pytest-icdiff
# pytest-mock
# pytest-sugar
-pytest-asyncio==0.21.1
+pytest-aiohttp==1.0.5
# via -r requirements/_test.in
+pytest-asyncio==0.21.1
+ # via
+ # -r requirements/_test.in
+ # pytest-aiohttp
pytest-cov==4.1.0
# via -r requirements/_test.in
pytest-icdiff==0.8
@@ -97,10 +137,22 @@ python-dotenv==1.0.0
# via
# -c requirements/_base.txt
# -r requirements/_test.in
+python-engineio==4.8.0
+ # via
+ # -c requirements/_base.txt
+ # python-socketio
+python-socketio==5.10.0
+ # via
+ # -c requirements/_base.txt
+ # -r requirements/_test.in
requests==2.31.0
# via docker
respx==0.20.2
# via -r requirements/_test.in
+simple-websocket==1.0.0
+ # via
+ # -c requirements/_base.txt
+ # python-engineio
six==1.16.0
# via
# -c requirements/_base.txt
@@ -125,3 +177,11 @@ urllib3==2.0.7
# requests
websocket-client==1.6.4
# via docker
+wsproto==1.2.0
+ # via
+ # -c requirements/_base.txt
+ # simple-websocket
+yarl==1.9.2
+ # via
+ # -c requirements/_base.txt
+ # aiohttp
diff --git a/services/payments/gateway/example_payment_gateway.py b/services/payments/scripts/example_payment_gateway.py
old mode 100644
new mode 100755
similarity index 69%
rename from services/payments/gateway/example_payment_gateway.py
rename to services/payments/scripts/example_payment_gateway.py
index e43df1d6596..19f28790e86
--- a/services/payments/gateway/example_payment_gateway.py
+++ b/services/payments/scripts/example_payment_gateway.py
@@ -1,18 +1,27 @@
+#!/usr/bin/env python
+
+# pylint: disable=protected-access
+# pylint: disable=redefined-builtin
+# pylint: disable=redefined-outer-name
+# pylint: disable=too-many-arguments
+# pylint: disable=unused-argument
+# pylint: disable=unused-variable
+
""" This is a simple example of a payments-gateway service
- Mainly used to create the openapi specs (SEE `openapi.json`) that the payments service expects
- Also used as a fake payment-gateway for manual exploratory testing
"""
+
import argparse
+import datetime
import json
import logging
import types
-from contextlib import asynccontextmanager
from dataclasses import dataclass
-from pathlib import Path
from typing import Annotated, Any, cast
-from uuid import UUID, uuid4
+from uuid import uuid4
import httpx
import uvicorn
@@ -29,7 +38,7 @@
from fastapi.encoders import jsonable_encoder
from fastapi.responses import HTMLResponse
from fastapi.routing import APIRoute
-from pydantic import HttpUrl, SecretStr, parse_file_as
+from pydantic import HttpUrl, SecretStr
from servicelib.fastapi.openapi import override_fastapi_openapi_method
from settings_library.base import BaseCustomSettings
from simcore_service_payments.models.payments_gateway import (
@@ -47,6 +56,7 @@
)
from simcore_service_payments.models.schemas.acknowledgements import (
AckPayment,
+ AckPaymentMethod,
AckPaymentWithPaymentMethod,
)
from simcore_service_payments.models.schemas.auth import Token
@@ -64,7 +74,7 @@ class Settings(BaseCustomSettings):
PAYMENTS_PASSWORD: SecretStr = "replace-with-password"
-def set_operation_id_as_handler_function_name(router: APIRouter):
+def _set_operation_id_as_handler_function_name(router: APIRouter):
for route in router.routes:
if isinstance(route, APIRoute):
assert isinstance(route.endpoint, types.FunctionType) # nosec
@@ -76,7 +86,7 @@ def set_operation_id_as_handler_function_name(router: APIRouter):
"4XX": {"content": {"text/html": {"schema": {"type": "string"}}}}
}
-PAYMENT_HTML = """
+FORM_HTML = """
@@ -84,7 +94,7 @@ def set_operation_id_as_handler_function_name(router: APIRouter):
Enter Credit Card Information
-
"""
+ERROR_HTTP = """
+
+
+
+ Error {0}
+
+
+ {0}
+
+
+"""
+
@dataclass
class PaymentForm:
@@ -145,19 +167,49 @@ def auth_flow(self, request):
yield request
+async def ack_payment(id_: PaymentID, acked: AckPayment, settings: Settings):
+ async with httpx.AsyncClient() as client:
+ await client.post(
+ f"{settings.PAYMENTS_SERVICE_API_BASE_URL}/v1/payments/{id_}:ack",
+ json=acked.dict(),
+ auth=PaymentsAuth(
+ username=settings.PAYMENTS_USERNAME,
+ password=settings.PAYMENTS_PASSWORD.get_secret_value(),
+ ),
+ )
+
+
+async def ack_payment_method(
+ id_: PaymentMethodID, acked: AckPaymentMethod, settings: Settings
+):
+ async with httpx.AsyncClient() as client:
+ await client.post(
+ f"{settings.PAYMENTS_SERVICE_API_BASE_URL}/v1/payments-methods/{id_}:ack",
+ json=acked.dict(),
+ auth=PaymentsAuth(
+ username=settings.PAYMENTS_USERNAME,
+ password=settings.PAYMENTS_PASSWORD.get_secret_value(),
+ ),
+ )
+
+
#
# Dependencies
#
-def get_payments(request: Request) -> dict[str, Any]:
- return request.app.state.payments
-
-
def get_settings(request: Request) -> Settings:
return cast(Settings, request.app.state.settings)
+def auth_session(x_init_api_secret: Annotated[str | None, Header()] = None) -> int:
+ if x_init_api_secret is None:
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED, detail="api secret missing"
+ )
+ return 1
+
+
#
# Router factories
#
@@ -176,34 +228,27 @@ def create_payment_router():
response_model=PaymentInitiated,
responses=ERROR_RESPONSES,
)
- def init_payment(
+ async def init_payment(
payment: InitPayment,
auth: Annotated[int, Depends(auth_session)],
- all_payments: Annotated[dict[UUID, Any], Depends(get_payments)],
):
assert payment # nosec
assert auth # nosec
- payment_id = uuid4()
- all_payments[payment_id] = {"init": InitPayment}
-
- return PaymentInitiated(payment_id=payment_id)
+ id_ = f"{uuid4()}"
+ return PaymentInitiated(payment_id=id_)
@router.get(
"/pay",
response_class=HTMLResponse,
responses=ERROR_HTML_RESPONSES,
)
- def get_payment_form(
+ async def get_payment_form(
id: PaymentID,
- all_payments: Annotated[dict[UUID, Any], Depends(get_payments)],
):
assert id # nosec
- if id not in all_payments:
- raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
-
- return PAYMENT_HTML.format(f"{id}")
+ return FORM_HTML.format(f"/pay?id={id}", "Submit Payment")
@router.post(
"/pay",
@@ -211,55 +256,37 @@ def get_payment_form(
responses=ERROR_RESPONSES,
include_in_schema=False,
)
- def pay(
+ async def pay(
id: PaymentID,
payment_form: Annotated[PaymentForm, Depends()],
- all_payments: Annotated[dict[UUID, Any], Depends(get_payments)],
settings: Annotated[Settings, Depends(get_settings)],
):
- assert id # nosec
-
- if id not in all_payments:
- raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
-
- all_payments[id]["form"] = payment_form
-
- # request ACK
- httpx.post(
- f"{settings.PAYMENTS_SERVICE_API_BASE_URL}/v1/payments/{id}:ack",
- json=AckPayment.Config.schema_extra["example"], # one-time success
- auth=PaymentsAuth(
- username=settings.PAYMENTS_USERNAME,
- password=settings.PAYMENTS_PASSWORD.get_secret_value(),
- ),
+ """WARNING: this is only for faking pay. DO NOT EXPOSE TO openapi.json"""
+ acked = AckPayment(
+ success=True,
+ message=f"Fake Payment {id}",
+ invoice_url="https://fakeimg.pl/300/",
+ saved=None,
)
+ await ack_payment(id_=id, acked=acked, settings=settings)
@router.post(
"/cancel",
response_model=PaymentCancelled,
responses=ERROR_RESPONSES,
)
- def cancel_payment(
+ async def cancel_payment(
payment: PaymentInitiated,
auth: Annotated[int, Depends(auth_session)],
- all_payments: Annotated[dict[UUID, Any], Depends(get_payments)],
):
assert payment # nosec
assert auth # nosec
- try:
- all_payments[payment.payment_id] = "CANCELLED"
- return PaymentCancelled(message="CANCELLED")
- except KeyError as exc:
- raise HTTPException(status.HTTP_404_NOT_FOUND) from exc
+ return PaymentCancelled(message=f"CANCELLED {payment.payment_id}")
return router
-def auth_session(x_init_api_secret: Annotated[str | None, Header()] = None) -> int:
- return 1 if x_init_api_secret is not None else 0
-
-
def create_payment_method_router():
router = APIRouter(
prefix="/payment-methods",
@@ -274,20 +301,44 @@ def create_payment_method_router():
response_model=PaymentMethodInitiated,
responses=ERROR_RESPONSES,
)
- def init_payment_method(
+ async def init_payment_method(
payment_method: InitPaymentMethod,
auth: Annotated[int, Depends(auth_session)],
):
assert payment_method # nosec
assert auth # nosec
+ id_ = f"{uuid4()}"
+ return PaymentMethodInitiated(payment_method_id=id_)
+
@router.get(
"/form",
response_class=HTMLResponse,
responses=ERROR_HTML_RESPONSES,
)
- def get_form_payment_method(id: PaymentMethodID):
- assert id # nosec
+ async def get_form_payment_method(
+ id: PaymentMethodID,
+ ):
+ return FORM_HTML.format(f"/save?id={id}", "Save Payment")
+
+ @router.post(
+ "/save",
+ response_class=HTMLResponse,
+ responses=ERROR_RESPONSES,
+ include_in_schema=False,
+ )
+ async def save(
+ id: PaymentMethodID,
+ payment_form: Annotated[PaymentForm, Depends()],
+ settings: Annotated[Settings, Depends(get_settings)],
+ ):
+ """WARNING: this is only for faking save. DO NOT EXPOSE TO openapi.json"""
+ card_number_masked = f"**** **** **** {payment_form.card_number[-4:]}"
+ acked = AckPaymentMethod(
+ success=True,
+ message=f"Fake Payment-method saved {card_number_masked}",
+ )
+ await ack_payment_method(id_=id, acked=acked, settings=settings)
# CRUD payment-methods
@router.post(
@@ -295,12 +346,20 @@ def get_form_payment_method(id: PaymentMethodID):
response_model=PaymentMethodsBatch,
responses=ERROR_RESPONSES,
)
- def batch_get_payment_methods(
+ async def batch_get_payment_methods(
batch: BatchGetPaymentMethods,
auth: Annotated[int, Depends(auth_session)],
):
assert auth # nosec
assert batch # nosec
+ return PaymentMethodsBatch(
+ items=[
+ GetPaymentMethod(
+ id=id_, created=datetime.datetime.now(tz=datetime.timezone.utc)
+ )
+ for id_ in batch.payment_methods_ids
+ ]
+ )
@router.get(
"/{id}",
@@ -313,19 +372,23 @@ def batch_get_payment_methods(
**ERROR_RESPONSES,
},
)
- def get_payment_method(
+ async def get_payment_method(
id: PaymentMethodID,
auth: Annotated[int, Depends(auth_session)],
):
assert id # nosec
assert auth # nosec
+ return GetPaymentMethod(
+ id=id, created=datetime.datetime.now(tz=datetime.timezone.utc)
+ )
+
@router.delete(
"/{id}",
status_code=status.HTTP_204_NO_CONTENT,
responses=ERROR_RESPONSES,
)
- def delete_payment_method(
+ async def delete_payment_method(
id: PaymentMethodID,
auth: Annotated[int, Depends(auth_session)],
):
@@ -337,7 +400,7 @@ def delete_payment_method(
response_model=AckPaymentWithPaymentMethod,
responses=ERROR_RESPONSES,
)
- def pay_with_payment_method(
+ async def pay_with_payment_method(
id: PaymentMethodID,
payment: InitPayment,
auth: Annotated[int, Depends(auth_session)],
@@ -346,31 +409,25 @@ def pay_with_payment_method(
assert payment # nosec
assert auth # nosec
- return router
-
-
-@asynccontextmanager
-async def _app_lifespan(app: FastAPI):
- state_path = Path("app.state.payments.ignore.json")
- if state_path.exists():
- app.state.payments = parse_file_as(dict[str, Any], state_path)
-
- yield
+ return AckPaymentWithPaymentMethod( # nosec
+ success=True,
+ invoice_url="https://fakeimg.pl/300/",
+ payment_id=f"{uuid4()}",
+ message=f"Payed with payment-method {id}",
+ )
- state_path.write_text(json.dumps(jsonable_encoder(app.state.payments), indent=1))
+ return router # nosec
def create_app():
app = FastAPI(
title="osparc-compliant payment-gateway",
version=PAYMENTS_GATEWAY_SPECS_VERSION,
- lifespan=_app_lifespan,
debug=True,
)
app.openapi_version = "3.0.0" # NOTE: small hack to allow current version of `42Crunch.vscode-openapi` to work with openapi
override_fastapi_openapi_method(app)
- app.state.payments = {}
app.state.settings = Settings.create_from_envs()
logging.info(app.state.settings.json(indent=2))
@@ -379,7 +436,7 @@ def create_app():
create_payment_method_router,
):
router = factory()
- set_operation_id_as_handler_function_name(router)
+ _set_operation_id_as_handler_function_name(router)
app.include_router(router)
return app
diff --git a/services/payments/setup.py b/services/payments/setup.py
index 569209d1bf6..234334fa2ab 100755
--- a/services/payments/setup.py
+++ b/services/payments/setup.py
@@ -61,6 +61,7 @@ def read_reqs(reqs_path: Path) -> set[str]:
"simcore-service = simcore_service_payments.cli:main",
],
},
+ "scripts": ["scripts/example_payment_gateway.py"],
}
if __name__ == "__main__":
diff --git a/services/payments/src/simcore_service_payments/core/application.py b/services/payments/src/simcore_service_payments/core/application.py
index 815355b1cf9..f4a8397193c 100644
--- a/services/payments/src/simcore_service_payments/core/application.py
+++ b/services/payments/src/simcore_service_payments/core/application.py
@@ -1,5 +1,6 @@
from fastapi import FastAPI
from servicelib.fastapi.openapi import override_fastapi_openapi_method
+from simcore_service_payments.services.socketio import setup_socketio
from .._meta import (
API_VERSION,
@@ -54,6 +55,7 @@ def create_app(settings: ApplicationSettings | None = None) -> FastAPI:
# Listening to Rabbitmq
setup_auto_recharge_listener(app)
+ setup_socketio(app)
# ERROR HANDLERS
# ... add here ...
diff --git a/services/payments/src/simcore_service_payments/services/payments.py b/services/payments/src/simcore_service_payments/services/payments.py
index db617bcd052..eb91c5e93ba 100644
--- a/services/payments/src/simcore_service_payments/services/payments.py
+++ b/services/payments/src/simcore_service_payments/services/payments.py
@@ -155,7 +155,6 @@ async def on_payment_completed(
if notify_enabled:
_logger.debug(
"Notify front-end of payment -> sio SOCKET_IO_PAYMENT_COMPLETED_EVENT "
- "socketio.notify_payment_completed(sio, user_primary_group_id=gid, payment=transaction)"
)
if transaction.state == PaymentTransactionState.SUCCESS:
diff --git a/services/payments/src/simcore_service_payments/services/rabbitmq.py b/services/payments/src/simcore_service_payments/services/rabbitmq.py
index 8938b52cbc2..ecb0ba8f382 100644
--- a/services/payments/src/simcore_service_payments/services/rabbitmq.py
+++ b/services/payments/src/simcore_service_payments/services/rabbitmq.py
@@ -13,8 +13,13 @@
_logger = logging.getLogger(__name__)
-def setup_rabbitmq(app: FastAPI) -> None:
+def get_rabbitmq_settings(app: FastAPI) -> RabbitSettings:
settings: RabbitSettings = app.state.settings.PAYMENTS_RABBITMQ
+ return settings
+
+
+def setup_rabbitmq(app: FastAPI) -> None:
+ settings: RabbitSettings = get_rabbitmq_settings(app)
app.state.rabbitmq_client = None
app.state.rabbitmq_rpc_server = None
diff --git a/services/payments/src/simcore_service_payments/services/socketio.py b/services/payments/src/simcore_service_payments/services/socketio.py
new file mode 100644
index 00000000000..ba841739803
--- /dev/null
+++ b/services/payments/src/simcore_service_payments/services/socketio.py
@@ -0,0 +1,94 @@
+import logging
+from dataclasses import dataclass
+
+import socketio
+from fastapi import FastAPI
+from fastapi.encoders import jsonable_encoder
+from models_library.api_schemas_payments.socketio import (
+ SOCKET_IO_PAYMENT_COMPLETED_EVENT,
+ SOCKET_IO_PAYMENT_METHOD_ACKED_EVENT,
+)
+from models_library.api_schemas_webserver.wallets import (
+ PaymentMethodTransaction,
+ PaymentTransaction,
+)
+from models_library.users import GroupID
+from servicelib.socketio_utils import cleanup_socketio_async_pubsub_manager
+from settings_library.rabbit import RabbitSettings
+
+from .rabbitmq import get_rabbitmq_settings
+
+_logger = logging.getLogger(__name__)
+
+
+@dataclass
+class Notifier:
+ _sio_manager: socketio.AsyncAioPikaManager
+
+ async def notify_payment_completed(
+ self,
+ user_primary_group_id: GroupID,
+ payment: PaymentTransaction,
+ ):
+ # NOTE: We assume that the user has been added to all
+ # rooms associated to his groups
+ assert payment.completed_at is not None # nosec
+
+ return await self._sio_manager.emit(
+ SOCKET_IO_PAYMENT_COMPLETED_EVENT,
+ data=jsonable_encoder(payment, by_alias=True),
+ room=f"{user_primary_group_id}",
+ )
+
+ async def notify_payment_method_acked(
+ self,
+ user_primary_group_id: GroupID,
+ payment_method: PaymentMethodTransaction,
+ ):
+ return await self._sio_manager.emit(
+ SOCKET_IO_PAYMENT_METHOD_ACKED_EVENT,
+ data=jsonable_encoder(payment_method, by_alias=True),
+ room=f"{user_primary_group_id}",
+ )
+
+
+def setup_socketio(app: FastAPI):
+ settings: RabbitSettings = get_rabbitmq_settings(app)
+
+ async def _on_startup() -> None:
+ assert app.state.rabbitmq_client # nosec
+
+ #
+ # https://python-socketio.readthedocs.io/en/stable/server.html#emitting-from-external-processes
+ #
+ # Connect to the as an external process in write-only mode
+ #
+ app.state.external_socketio = socketio.AsyncAioPikaManager(
+ url=settings.dsn, logger=_logger, write_only=True
+ )
+
+ # NOTE: this might be moved somewhere else when notifier incorporates emails etc
+ app.state.notifier = Notifier(_sio_manager=app.state.external_socketio)
+
+ async def _on_shutdown() -> None:
+ if app.state.external_socketio:
+ await cleanup_socketio_async_pubsub_manager(
+ server_manager=app.state.external_socketio
+ )
+
+ app.add_event_handler("startup", _on_startup)
+ app.add_event_handler("shutdown", _on_shutdown)
+
+
+async def notify_payment_completed(
+ app: FastAPI,
+ *,
+ user_primary_group_id: GroupID,
+ payment: PaymentTransaction,
+):
+
+ notifier: Notifier = app.state.notifier
+
+ return await notifier.notify_payment_completed(
+ user_primary_group_id=user_primary_group_id, payment=payment
+ )
diff --git a/services/payments/tests/unit/conftest.py b/services/payments/tests/unit/conftest.py
index 7f75753186c..729f9b93f7a 100644
--- a/services/payments/tests/unit/conftest.py
+++ b/services/payments/tests/unit/conftest.py
@@ -57,15 +57,16 @@
@pytest.fixture
def disable_rabbitmq_and_rpc_setup(mocker: MockerFixture) -> Callable:
- def _do():
+ def _():
# The following services are affected if rabbitmq is not in place
+ mocker.patch("simcore_service_payments.core.application.setup_socketio")
mocker.patch("simcore_service_payments.core.application.setup_rabbitmq")
mocker.patch("simcore_service_payments.core.application.setup_rpc_api_routes")
mocker.patch(
"simcore_service_payments.core.application.setup_auto_recharge_listener"
)
- return _do
+ return _
@pytest.fixture
@@ -92,7 +93,7 @@ def _setup(app: FastAPI):
Mock()
) # NOTE: avoids error in api._dependencies::get_db_engine
- def _do():
+ def _():
# The following services are affected if postgres is not in place
mocker.patch(
"simcore_service_payments.core.application.setup_postgres",
@@ -100,7 +101,7 @@ def _do():
side_effect=_setup,
)
- return _do
+ return _
@pytest.fixture
diff --git a/services/payments/tests/unit/test_services_socketio.py b/services/payments/tests/unit/test_services_socketio.py
new file mode 100644
index 00000000000..3f32e1aa957
--- /dev/null
+++ b/services/payments/tests/unit/test_services_socketio.py
@@ -0,0 +1,262 @@
+# pylint: disable=redefined-outer-name
+# pylint: disable=unused-argument
+# pylint: disable=unused-variable
+# pylint: disable=too-many-arguments
+
+
+import asyncio
+import threading
+from collections.abc import AsyncIterable, Callable
+from typing import AsyncIterator
+from unittest.mock import AsyncMock
+
+import arrow
+import pytest
+import socketio
+from aiohttp import web
+from aiohttp.test_utils import TestServer
+from faker import Faker
+from fastapi import FastAPI
+from models_library.api_schemas_payments.socketio import (
+ SOCKET_IO_PAYMENT_COMPLETED_EVENT,
+)
+from models_library.api_schemas_webserver.wallets import PaymentTransaction
+from models_library.users import GroupID
+from pydantic import parse_obj_as
+from pytest_mock import MockerFixture
+from pytest_simcore.helpers.rawdata_fakers import random_payment_transaction
+from pytest_simcore.helpers.typing_env import EnvVarsDict
+from pytest_simcore.helpers.utils_envs import setenvs_from_dict
+from servicelib.socketio_utils import cleanup_socketio_async_pubsub_manager
+from settings_library.rabbit import RabbitSettings
+from simcore_service_payments.models.db import PaymentsTransactionsDB
+from simcore_service_payments.services.rabbitmq import get_rabbitmq_settings
+from simcore_service_payments.services.socketio import notify_payment_completed
+from socketio import AsyncAioPikaManager, AsyncServer
+from tenacity import AsyncRetrying
+from tenacity.stop import stop_after_attempt
+from tenacity.wait import wait_fixed
+from yarl import URL
+
+pytest_simcore_core_services_selection = [
+ "rabbit",
+]
+pytest_simcore_ops_services_selection = []
+
+
+@pytest.fixture
+def app_environment(
+ monkeypatch: pytest.MonkeyPatch,
+ app_environment: EnvVarsDict,
+ with_disabled_postgres: None,
+ rabbit_env_vars_dict: EnvVarsDict, # rabbitMQ settings from 'rabbit' service
+):
+ # set environs
+ monkeypatch.delenv("PAYMENTS_RABBITMQ", raising=False)
+
+ return setenvs_from_dict(
+ monkeypatch,
+ {
+ **app_environment,
+ **rabbit_env_vars_dict,
+ },
+ )
+
+
+@pytest.fixture
+def user_primary_group_id(faker: Faker) -> GroupID:
+ return parse_obj_as(GroupID, faker.pyint())
+
+
+@pytest.fixture
+async def socketio_server(app: FastAPI) -> AsyncIterable[AsyncServer]:
+ # Same configuration as simcore_service_webserver/socketio/server.py
+ settings: RabbitSettings = get_rabbitmq_settings(app)
+ server_manager = AsyncAioPikaManager(url=settings.dsn)
+
+ server = AsyncServer(
+ async_mode="aiohttp",
+ engineio_logger=True,
+ client_manager=server_manager,
+ )
+
+ yield server
+
+ await cleanup_socketio_async_pubsub_manager(server_manager)
+
+
+@pytest.fixture
+def socketio_server_events(
+ socketio_server: AsyncServer,
+ mocker: MockerFixture,
+ user_primary_group_id: GroupID,
+) -> dict[str, AsyncMock]:
+
+ user_room_name = f"{user_primary_group_id}"
+
+ # handlers
+ async def connect(sid: str, environ):
+ print("connecting", sid)
+ await socketio_server.enter_room(sid, user_room_name)
+
+ async def on_check(sid, data):
+ print("check", sid, data)
+
+ async def on_payment(sid, data):
+ print("payment", sid, parse_obj_as(PaymentTransaction, data))
+
+ async def disconnect(sid: str):
+ print("disconnecting", sid)
+ await socketio_server.leave_room(sid, user_room_name)
+
+ # spies
+ spy_connect = mocker.AsyncMock(wraps=connect)
+ socketio_server.on("connect", spy_connect)
+
+ spy_on_payment = mocker.AsyncMock(wraps=on_payment)
+ socketio_server.on(SOCKET_IO_PAYMENT_COMPLETED_EVENT, spy_on_payment)
+
+ spy_on_check = mocker.AsyncMock(wraps=on_check)
+ socketio_server.on("check", spy_on_check)
+
+ spy_disconnect = mocker.AsyncMock(wraps=disconnect)
+ socketio_server.on("disconnect", spy_disconnect)
+
+ return {
+ connect.__name__: spy_connect,
+ disconnect.__name__: spy_disconnect,
+ on_check.__name__: spy_on_check,
+ on_payment.__name__: spy_on_payment,
+ }
+
+
+@pytest.fixture
+async def web_server(
+ socketio_server: AsyncServer, aiohttp_unused_port: Callable
+) -> AsyncIterator[URL]:
+ """
+ this emulates the webserver setup: socketio server with
+ an aiopika manager that attaches an aiohttp web app
+ """
+ aiohttp_app = web.Application()
+ socketio_server.attach(aiohttp_app)
+
+ async def _lifespan(
+ server: TestServer, started: asyncio.Event, teardown: asyncio.Event
+ ):
+ # NOTE: this is necessary to avoid blocking comms between client and this server
+ await server.start_server()
+ started.set() # notifies started
+ await teardown.wait() # keeps test0server until needs to close
+ await server.close()
+
+ setup = asyncio.Event()
+ teardown = asyncio.Event()
+
+ server = TestServer(aiohttp_app, port=aiohttp_unused_port())
+ t = asyncio.create_task(_lifespan(server, setup, teardown), name="server-lifespan")
+
+ await setup.wait()
+
+ yield URL(server.make_url("/"))
+
+ assert t
+ teardown.set()
+
+
+@pytest.fixture
+async def server_url(web_server: URL) -> str:
+ return f'{web_server.with_path("/")}'
+
+
+@pytest.fixture
+async def socketio_client(server_url: str) -> AsyncIterable[socketio.AsyncClient]:
+ """This emulates a socketio client in the front-end"""
+ client = socketio.AsyncClient(logger=True, engineio_logger=True)
+ await client.connect(f"{server_url}", transports=["websocket"])
+
+ yield client
+
+ await client.disconnect()
+
+
+@pytest.fixture
+async def socketio_client_events(
+ socketio_client: socketio.AsyncClient,
+) -> dict[str, AsyncMock]:
+ # emulates front-end receiving message
+
+ async def on_payment(data):
+ assert parse_obj_as(PaymentTransaction, data) is not None
+
+ on_event_spy = AsyncMock(wraps=on_payment)
+ socketio_client.on(SOCKET_IO_PAYMENT_COMPLETED_EVENT, on_event_spy)
+
+ return {on_payment.__name__: on_event_spy}
+
+
+@pytest.fixture
+async def notify_payment(app: FastAPI, user_primary_group_id: GroupID) -> Callable:
+ async def _():
+ payment = PaymentsTransactionsDB(
+ **random_payment_transaction(completed_at=arrow.utcnow().datetime)
+ ).to_api_model()
+ await notify_payment_completed(
+ app, user_primary_group_id=user_primary_group_id, payment=payment
+ )
+
+ return _
+
+
+async def test_emit_message_as_external_process_to_frontend_client(
+ socketio_server_events: dict[str, AsyncMock],
+ socketio_client: socketio.AsyncClient,
+ socketio_client_events: dict[str, AsyncMock],
+ notify_payment: Callable,
+):
+ """
+ front-end -> socketio client (many different clients)
+ webserver -> socketio server (one/more replicas)
+ payments -> Sends messages to clients from external processes (one/more replicas)
+ """
+ # Used iusntead of a fix asyncio.sleep
+ context_switch_retry_kwargs = {
+ "wait": wait_fixed(0.1),
+ "stop": stop_after_attempt(5),
+ "reraise": True,
+ }
+
+ # web server spy events
+ server_connect = socketio_server_events["connect"]
+ server_disconnect = socketio_server_events["disconnect"]
+ server_on_check = socketio_server_events["on_check"]
+ server_on_payment = socketio_server_events["on_payment"]
+
+ # client spy events
+ client_on_payment = socketio_client_events["on_payment"]
+
+ # checks
+ assert server_connect.called
+ assert not server_disconnect.called
+
+ # client emits
+ await socketio_client.emit("check", data="hoi")
+
+ async for attempt in AsyncRetrying(**context_switch_retry_kwargs):
+ with attempt:
+ assert server_on_check.called
+
+ # payment server emits
+ def _(lp):
+ asyncio.run_coroutine_threadsafe(notify_payment(), lp)
+
+ threading.Thread(
+ target=_,
+ args=(asyncio.get_event_loop(),),
+ daemon=False,
+ ).start()
+
+ async for attempt in AsyncRetrying(**context_switch_retry_kwargs):
+ with attempt:
+ assert client_on_payment.called
+ assert not server_on_payment.called
diff --git a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_consumers.py b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_consumers.py
index b79ee8ceeee..5decd463917 100644
--- a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_consumers.py
+++ b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_consumers.py
@@ -13,6 +13,7 @@
ProgressType,
WalletCreditsMessage,
)
+from models_library.socketio import SocketMessageDict
from pydantic import parse_raw_as
from servicelib.aiohttp.monitor_services import (
MONITOR_SERVICE_STARTED_LABELS,
@@ -34,7 +35,6 @@
SOCKET_IO_NODE_UPDATED_EVENT,
SOCKET_IO_PROJECT_PROGRESS_EVENT,
SOCKET_IO_WALLET_OSPARC_CREDITS_UPDATED_EVENT,
- SocketMessageDict,
send_group_messages,
send_messages,
)
diff --git a/services/web/server/src/simcore_service_webserver/payments/_socketio.py b/services/web/server/src/simcore_service_webserver/payments/_socketio.py
index faa4f9a2dda..c2a4a68ebaa 100644
--- a/services/web/server/src/simcore_service_webserver/payments/_socketio.py
+++ b/services/web/server/src/simcore_service_webserver/payments/_socketio.py
@@ -1,17 +1,17 @@
from aiohttp import web
+from models_library.api_schemas_payments.socketio import (
+ SOCKET_IO_PAYMENT_COMPLETED_EVENT,
+ SOCKET_IO_PAYMENT_METHOD_ACKED_EVENT,
+)
from models_library.api_schemas_webserver.wallets import (
PaymentMethodTransaction,
PaymentTransaction,
)
+from models_library.socketio import SocketMessageDict
from models_library.users import UserID
from models_library.utils.fastapi_encoders import jsonable_encoder
-from ..socketio.messages import (
- SOCKET_IO_PAYMENT_COMPLETED_EVENT,
- SOCKET_IO_PAYMENT_METHOD_ACKED_EVENT,
- SocketMessageDict,
- send_messages,
-)
+from ..socketio.messages import send_messages
async def notify_payment_completed(
diff --git a/services/web/server/src/simcore_service_webserver/projects/projects_api.py b/services/web/server/src/simcore_service_webserver/projects/projects_api.py
index e3884eb26ed..98ff556a0ab 100644
--- a/services/web/server/src/simcore_service_webserver/projects/projects_api.py
+++ b/services/web/server/src/simcore_service_webserver/projects/projects_api.py
@@ -42,6 +42,7 @@
)
from models_library.services import ServiceKey, ServiceVersion
from models_library.services_resources import ServiceResourcesDict
+from models_library.socketio import SocketMessageDict
from models_library.users import UserID
from models_library.utils.fastapi_encoders import jsonable_encoder
from models_library.wallets import ZERO_CREDITS, WalletID, WalletInfo
@@ -77,7 +78,6 @@
from ..socketio.messages import (
SOCKET_IO_NODE_UPDATED_EVENT,
SOCKET_IO_PROJECT_UPDATED_EVENT,
- SocketMessageDict,
send_group_messages,
send_messages,
)
diff --git a/services/web/server/src/simcore_service_webserver/socketio/_handlers.py b/services/web/server/src/simcore_service_webserver/socketio/_handlers.py
index ef8d6b563f2..ad1eacca7ec 100644
--- a/services/web/server/src/simcore_service_webserver/socketio/_handlers.py
+++ b/services/web/server/src/simcore_service_webserver/socketio/_handlers.py
@@ -8,6 +8,7 @@
from typing import Any
from aiohttp import web
+from models_library.socketio import SocketMessageDict
from models_library.users import UserID
from servicelib.aiohttp.observer import emit
from servicelib.logging_utils import get_log_record_extra, log_context
@@ -18,7 +19,7 @@
from ..login.decorators import login_required
from ..resource_manager.user_sessions import managed_resource
from ._utils import EnvironDict, SocketID, get_socket_server, register_socketio_handler
-from .messages import SOCKET_IO_HEARTBEAT_EVENT, SocketMessageDict, send_messages
+from .messages import SOCKET_IO_HEARTBEAT_EVENT, send_messages
_logger = logging.getLogger(__name__)
diff --git a/services/web/server/src/simcore_service_webserver/socketio/messages.py b/services/web/server/src/simcore_service_webserver/socketio/messages.py
index 14130602100..f76d4545b69 100644
--- a/services/web/server/src/simcore_service_webserver/socketio/messages.py
+++ b/services/web/server/src/simcore_service_webserver/socketio/messages.py
@@ -4,9 +4,10 @@
import logging
from collections.abc import Sequence
-from typing import Any, Final, TypedDict
+from typing import Final
from aiohttp.web import Application
+from models_library.socketio import SocketMessageDict
from models_library.users import UserID
from servicelib.aiohttp.application_keys import APP_FIRE_AND_FORGET_TASKS_KEY
from servicelib.json_serialization import json_dumps
@@ -18,23 +19,20 @@
_logger = logging.getLogger(__name__)
+
+#
+# List of socket-io event names
+#
SOCKET_IO_EVENT: Final[str] = "event"
SOCKET_IO_HEARTBEAT_EVENT: Final[str] = "set_heartbeat_emit_interval"
SOCKET_IO_LOG_EVENT: Final[str] = "logger"
SOCKET_IO_NODE_PROGRESS_EVENT: Final[str] = "nodeProgress"
SOCKET_IO_NODE_UPDATED_EVENT: Final[str] = "nodeUpdated"
-SOCKET_IO_PAYMENT_COMPLETED_EVENT: Final[str] = "paymentCompleted"
-SOCKET_IO_PAYMENT_METHOD_ACKED_EVENT: Final[str] = "paymentMethodAcknoledged"
SOCKET_IO_PROJECT_PROGRESS_EVENT: Final[str] = "projectProgress"
SOCKET_IO_PROJECT_UPDATED_EVENT: Final[str] = "projectStateUpdated"
SOCKET_IO_WALLET_OSPARC_CREDITS_UPDATED_EVENT: Final[str] = "walletOsparcCreditsUpdated"
-class SocketMessageDict(TypedDict):
- event_type: str
- data: dict[str, Any]
-
-
async def send_messages(
app: Application, user_id: UserID, messages: Sequence[SocketMessageDict]
) -> None:
diff --git a/services/web/server/src/simcore_service_webserver/socketio/server.py b/services/web/server/src/simcore_service_webserver/socketio/server.py
index 693f95d404b..175604e30ad 100644
--- a/services/web/server/src/simcore_service_webserver/socketio/server.py
+++ b/services/web/server/src/simcore_service_webserver/socketio/server.py
@@ -1,8 +1,8 @@
-import asyncio
import logging
-from typing import AsyncIterator
+from collections.abc import AsyncIterator
from aiohttp import web
+from servicelib.socketio_utils import cleanup_socketio_async_pubsub_manager
from socketio import AsyncAioPikaManager, AsyncServer
from ..rabbitmq_settings import get_plugin_settings as get_rabbitmq_settings
@@ -32,37 +32,10 @@ async def _socketio_server_cleanup_ctx(app: web.Application) -> AsyncIterator[No
app[APP_CLIENT_SOCKET_SERVER_KEY] = sio_server
register_socketio_handlers(app, _handlers)
- yield
-
- # NOTE: this is ugly. It seems though that python-socketio does not
- # cleanup its background tasks properly.
- # https://github.com/miguelgrinberg/python-socketio/discussions/1092
- cancelled_tasks = []
- if hasattr(server_manager, "thread"):
- server_thread = server_manager.thread
- assert isinstance(server_thread, asyncio.Task) # nosec
- server_thread.cancel()
- cancelled_tasks.append(server_thread)
- if server_manager.publisher_channel:
- await server_manager.publisher_channel.close()
- if server_manager.publisher_connection:
- await server_manager.publisher_connection.close()
- current_tasks = asyncio.tasks.all_tasks()
+ yield
- for task in current_tasks:
- coro = task.get_coro()
- if any(
- coro_name in coro.__qualname__ # type: ignore
- for coro_name in [
- "AsyncServer._service_task",
- "AsyncSocket.schedule_ping",
- "AsyncPubSubManager._thread",
- ]
- ):
- task.cancel()
- cancelled_tasks.append(task)
- await asyncio.gather(*cancelled_tasks, return_exceptions=True)
+ await cleanup_socketio_async_pubsub_manager(server_manager)
def setup_socketio_server(app: web.Application) -> None:
diff --git a/services/web/server/src/simcore_service_webserver/wallets/_payments_handlers.py b/services/web/server/src/simcore_service_webserver/wallets/_payments_handlers.py
index 0efde369a95..7b5cec05172 100644
--- a/services/web/server/src/simcore_service_webserver/wallets/_payments_handlers.py
+++ b/services/web/server/src/simcore_service_webserver/wallets/_payments_handlers.py
@@ -1,3 +1,4 @@
+import asyncio
import logging
from aiohttp import web
@@ -299,6 +300,9 @@ async def _delete_payment_method(request: web.Request):
return web.HTTPNoContent(content_type=MIMETYPE_APPLICATION_JSON)
+_TINY_WAIT_TO_TRIGGER_CONTEXT_SWITCH = 0.1
+
+
@routes.post(
f"/{VTAG}/wallets/{{wallet_id}}/payments-methods/{{payment_method_id}}:pay",
name="pay_with_payment_method",
@@ -342,8 +346,15 @@ async def _pay_with_payment_method(request: web.Request):
# we decided not to change the return value to avoid changing the front-end logic
# instead we emulate a init-prompt-ack workflow by firing a background task that acks payment
+ async def _notify_payment_completed_after_response(app, user_id, payment):
+ # NOTE: A small delay to send notification just after the response
+ await asyncio.sleep(_TINY_WAIT_TO_TRIGGER_CONTEXT_SWITCH)
+ return (
+ await notify_payment_completed(app, user_id=user_id, payment=payment),
+ )
+
fire_and_forget_task(
- notify_payment_completed(
+ _notify_payment_completed_after_response(
request.app, user_id=req_ctx.user_id, payment=payment
),
task_suffix_name=f"{__name__}._pay_with_payment_method",
diff --git a/services/web/server/tests/unit/with_dbs/03/wallets/payments/test_payments_methods.py b/services/web/server/tests/unit/with_dbs/03/wallets/payments/test_payments_methods.py
index c18ce0973dc..67b673b6aa2 100644
--- a/services/web/server/tests/unit/with_dbs/03/wallets/payments/test_payments_methods.py
+++ b/services/web/server/tests/unit/with_dbs/03/wallets/payments/test_payments_methods.py
@@ -4,6 +4,7 @@
# pylint: disable=too-many-arguments
+import asyncio
from decimal import Decimal
from unittest.mock import MagicMock
@@ -376,7 +377,8 @@ async def test_one_time_payment_with_payment_method(
assert mock_rut_add_credits_to_wallet.called
mock_rut_add_credits_to_wallet.assert_called_once()
- # check notification (fake)
+ # check notification after response
+ await asyncio.sleep(0.1)
assert send_message.called
send_message.assert_called_once()