Skip to content

Commit 6c947eb

Browse files
authored
♻️🗑️ api-server upgraded to use asyncpg (#7598)
1 parent 168a1d6 commit 6c947eb

File tree

21 files changed

+244
-373
lines changed

21 files changed

+244
-373
lines changed

services/api-server/requirements/_base.in

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
--requirement ../../../packages/service-library/requirements/_fastapi.in
1818

1919
aiofiles
20-
aiopg[sa]
2120
cryptography
2221
fastapi-pagination
2322
fastapi[all]

services/api-server/requirements/_base.txt

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,6 @@ aiohttp==3.11.10
5555
# -c requirements/../../../requirements/constraints.txt
5656
# -r requirements/../../../packages/simcore-sdk/requirements/_base.in
5757
# aiodocker
58-
aiopg==1.4.0
59-
# via -r requirements/_base.in
6058
aiormq==6.8.1
6159
# via aio-pika
6260
aiosignal==1.3.1
@@ -84,8 +82,6 @@ arrow==1.3.0
8482
# -r requirements/../../../packages/simcore-sdk/requirements/../../../packages/service-library/requirements/_base.in
8583
asgiref==3.8.1
8684
# via opentelemetry-instrumentation-asgi
87-
async-timeout==4.0.3
88-
# via aiopg
8985
asyncpg==0.30.0
9086
# via sqlalchemy
9187
attrs==24.2.0
@@ -513,9 +509,7 @@ psutil==6.1.0
513509
# -r requirements/../../../packages/service-library/requirements/_base.in
514510
# -r requirements/../../../packages/simcore-sdk/requirements/../../../packages/service-library/requirements/_base.in
515511
psycopg2-binary==2.9.10
516-
# via
517-
# aiopg
518-
# sqlalchemy
512+
# via sqlalchemy
519513
pycparser==2.22
520514
# via cffi
521515
pycryptodome==3.21.0
@@ -785,7 +779,6 @@ sqlalchemy==1.4.54
785779
# -r requirements/../../../packages/postgres-database/requirements/_base.in
786780
# -r requirements/../../../packages/simcore-sdk/requirements/../../../packages/postgres-database/requirements/_base.in
787781
# -r requirements/../../../packages/simcore-sdk/requirements/_base.in
788-
# aiopg
789782
# alembic
790783
starlette==0.41.3
791784
# via

services/api-server/src/simcore_service_api_server/api/dependencies/authentication.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
from models_library.products import ProductName
77
from pydantic import BaseModel, PositiveInt
88

9-
from ...db.repositories.api_keys import ApiKeysRepository, UserAndProductTuple
10-
from ...db.repositories.users import UsersRepository
9+
from ...repository.api_keys import ApiKeysRepository, UserAndProductTuple
10+
from ...repository.users import UsersRepository
1111
from .database import get_repository
1212

1313
# SEE https://swagger.io/docs/specification/authentication/basic-authentication/
@@ -22,9 +22,9 @@ class Identity(BaseModel):
2222

2323
def _create_exception() -> HTTPException:
2424
_unauthorized_headers = {
25-
"WWW-Authenticate": f'Basic realm="{basic_scheme.realm}"'
26-
if basic_scheme.realm
27-
else "Basic"
25+
"WWW-Authenticate": (
26+
f'Basic realm="{basic_scheme.realm}"' if basic_scheme.realm else "Basic"
27+
)
2828
}
2929
return HTTPException(
3030
status_code=status.HTTP_401_UNAUTHORIZED,
@@ -46,7 +46,7 @@ async def get_current_identity(
4646
if user_and_product is None:
4747
exc = _create_exception()
4848
raise exc
49-
email = await users_repo.get_active_user_email(user_and_product.user_id)
49+
email = await users_repo.get_active_user_email(user_id=user_and_product.user_id)
5050
if not email:
5151
exc = _create_exception()
5252
raise exc
Lines changed: 13 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,38 @@
11
import logging
22
from collections.abc import AsyncGenerator, Callable
3-
from typing import Annotated, cast
3+
from typing import Annotated
44

5-
from aiopg.sa import Engine
65
from fastapi import Depends
76
from fastapi.requests import Request
7+
from simcore_postgres_database.utils_aiosqlalchemy import (
8+
get_pg_engine_stateinfo,
9+
)
810
from sqlalchemy.ext.asyncio import AsyncEngine
911

10-
from ...db.events import get_asyncpg_engine
11-
from ...db.repositories import BaseRepository
12-
13-
logger = logging.getLogger(__name__)
14-
12+
from ...clients.postgres import get_engine
13+
from ...repository import BaseRepository
1514

16-
def get_db_engine(request: Request) -> Engine:
17-
return cast(Engine, request.app.state.engine)
15+
_logger = logging.getLogger(__name__)
1816

1917

2018
def get_db_asyncpg_engine(request: Request) -> AsyncEngine:
21-
return get_asyncpg_engine(request.app)
19+
return get_engine(request.app)
2220

2321

2422
def get_repository(repo_type: type[BaseRepository]) -> Callable:
2523
async def _get_repo(
26-
engine: Annotated[Engine, Depends(get_db_engine)],
24+
engine: Annotated[AsyncEngine, Depends(get_db_asyncpg_engine)],
2725
) -> AsyncGenerator[BaseRepository, None]:
2826
# NOTE: 2 different ideas were tried here with not so good
2927
# 1st one was acquiring a connection per repository which lead to the following issue https://github.com/ITISFoundation/osparc-simcore/pull/1966
3028
# 2nd one was acquiring a connection per request which works but blocks the director-v2 responsiveness once
3129
# the max amount of connections is reached
3230
# now the current solution is to acquire connection when needed.
31+
_logger.debug(
32+
"Setting up a repository. Current state of connections: %s",
33+
await get_pg_engine_stateinfo(engine),
34+
)
3335

34-
available_engines = engine.maxsize - (engine.size - engine.freesize)
35-
if available_engines <= 1:
36-
logger.warning(
37-
"Low pg connections available in pool: pool size=%d, acquired=%d, free=%d, reserved=[%d, %d]",
38-
engine.size,
39-
engine.size - engine.freesize,
40-
engine.freesize,
41-
engine.minsize,
42-
engine.maxsize,
43-
)
4436
yield repo_type(db_engine=engine)
4537

4638
return _get_repo
47-
48-
49-
__all__: tuple[str, ...] = (
50-
"Engine",
51-
"get_db_engine",
52-
"get_repository",
53-
)
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
from fastapi import FastAPI
2+
from servicelib.fastapi.db_asyncpg_engine import close_db_connection, connect_to_db
3+
from servicelib.fastapi.lifespan_utils import LifespanOnStartupError
4+
from sqlalchemy.ext.asyncio import AsyncEngine
5+
6+
from ..core.settings import ApplicationSettings
7+
8+
9+
class PostgresNotConfiguredError(LifespanOnStartupError):
10+
msg_template = LifespanOnStartupError.msg_template + (
11+
"Postgres settings are not configured. "
12+
"Please check your application settings. "
13+
)
14+
15+
16+
def get_engine(app: FastAPI) -> AsyncEngine:
17+
assert app.state.engine # nosec
18+
engine: AsyncEngine = app.state.engine
19+
return engine
20+
21+
22+
def setup_postgres(app: FastAPI):
23+
app.state.engine = None
24+
25+
async def _on_startup() -> None:
26+
settings: ApplicationSettings = app.state.settings
27+
if settings.API_SERVER_POSTGRES is None:
28+
raise PostgresNotConfiguredError(
29+
lifespan_name="Postgres",
30+
settings=settings,
31+
)
32+
33+
await connect_to_db(app, settings.API_SERVER_POSTGRES)
34+
assert app.state.engine # nosec
35+
assert isinstance(app.state.engine, AsyncEngine) # nosec
36+
37+
async def _on_shutdown() -> None:
38+
assert app.state.engine # nosec
39+
await close_db_connection(app)
40+
41+
app.add_event_handler("startup", _on_startup)
42+
app.add_event_handler("shutdown", _on_shutdown)

services/api-server/src/simcore_service_api_server/core/application.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from servicelib.fastapi.profiler import initialize_profiler
88
from servicelib.fastapi.tracing import initialize_tracing
99
from servicelib.logging_utils import config_all_loggers
10+
from simcore_service_api_server.clients.postgres import setup_postgres
1011

1112
from .. import exceptions
1213
from .._meta import API_VERSION, API_VTAG, APP_NAME
@@ -15,7 +16,7 @@
1516
from ..services_http import catalog, director_v2, storage, webserver
1617
from ..services_http.rabbitmq import setup_rabbitmq
1718
from ._prometheus_instrumentation import setup_prometheus_instrumentation
18-
from .events import create_start_app_handler, create_stop_app_handler
19+
from .events import on_shutdown, on_startup
1920
from .openapi import override_openapi_method, use_route_names_as_operation_ids
2021
from .settings import ApplicationSettings
2122

@@ -81,6 +82,9 @@ def init_app(settings: ApplicationSettings | None = None) -> FastAPI:
8182

8283
app.state.settings = settings
8384

85+
if settings.API_SERVER_POSTGRES:
86+
setup_postgres(app)
87+
8488
setup_rabbitmq(app)
8589

8690
if settings.API_SERVER_TRACING:
@@ -115,8 +119,8 @@ def init_app(settings: ApplicationSettings | None = None) -> FastAPI:
115119
)
116120

117121
# setup app
118-
app.add_event_handler("startup", create_start_app_handler(app))
119-
app.add_event_handler("shutdown", create_stop_app_handler(app))
122+
app.add_event_handler("startup", on_startup)
123+
app.add_event_handler("shutdown", on_shutdown)
120124

121125
exceptions.setup_exception_handlers(
122126
app, is_debug=settings.SC_BOOT_MODE == BootModeEnum.DEBUG
Lines changed: 7 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,15 @@
11
import logging
2-
from collections.abc import Callable
3-
4-
from fastapi import FastAPI
52

63
from .._meta import APP_FINISHED_BANNER_MSG, APP_STARTED_BANNER_MSG
7-
from ..db.events import (
8-
asyncpg_close_db_connection,
9-
asyncpg_connect_to_db,
10-
close_db_connection,
11-
connect_to_db,
12-
)
13-
from .settings import ApplicationSettings
14-
15-
logger = logging.getLogger(__name__)
16-
17-
18-
def create_start_app_handler(app: FastAPI) -> Callable:
19-
async def _on_startup() -> None:
20-
logger.info("Application starting ...")
21-
if app.state.settings.API_SERVER_POSTGRES:
22-
# database
23-
assert isinstance(app.state.settings, ApplicationSettings) # nosec
24-
await connect_to_db(app)
25-
await asyncpg_connect_to_db(app, app.state.settings.API_SERVER_POSTGRES)
26-
assert app.state.engine # nosec
27-
assert app.state.asyncpg_engine # nosec
28-
29-
print(APP_STARTED_BANNER_MSG, flush=True) # noqa: T201
30-
31-
return _on_startup
32-
334

34-
def create_stop_app_handler(app: FastAPI) -> Callable:
35-
async def _on_shutdown() -> None:
36-
logger.info("Application stopping, ...")
5+
_logger = logging.getLogger(__name__)
376

38-
if app.state.settings.API_SERVER_POSTGRES:
39-
assert isinstance(app.state.settings, ApplicationSettings) # nosec
40-
try:
41-
await asyncpg_close_db_connection(app)
42-
await close_db_connection(app)
437

44-
except Exception as err: # pylint: disable=broad-except
45-
logger.warning(
46-
"Failed to close app: %s",
47-
err,
48-
exc_info=app.state.settings.debug,
49-
stack_info=app.state.settings.debug,
50-
)
8+
async def on_startup() -> None:
9+
_logger.info("Application starting ...")
10+
print(APP_STARTED_BANNER_MSG, flush=True) # noqa: T201
5111

52-
print(APP_FINISHED_BANNER_MSG, flush=True) # noqa: T201
5312

54-
return _on_shutdown
13+
async def on_shutdown() -> None:
14+
_logger.info("Application stopping, ...")
15+
print(APP_FINISHED_BANNER_MSG, flush=True) # noqa: T201

services/api-server/src/simcore_service_api_server/db/events.py

Lines changed: 0 additions & 87 deletions
This file was deleted.

0 commit comments

Comments
 (0)