Skip to content

Commit 51fca27

Browse files
authored
ensure only one connection acquisition per request (#1966)
* a fastapi depency is cached and thus the same connection is given per request * migrate workbench in DB from FAILURE to FAILED
1 parent cae0146 commit 51fca27

File tree

4 files changed

+144
-58
lines changed

4 files changed

+144
-58
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
"""migrate workbench state enum
2+
3+
Revision ID: cfd1c43b5d33
4+
Revises: c8a7073deebb
5+
Create Date: 2020-11-17 16:42:32.511722+00:00
6+
7+
"""
8+
from alembic import op
9+
import sqlalchemy as sa
10+
11+
12+
# revision identifiers, used by Alembic.
13+
revision = 'cfd1c43b5d33'
14+
down_revision = 'c8a7073deebb'
15+
branch_labels = None
16+
depends_on = None
17+
18+
19+
def upgrade():
20+
op.execute(
21+
sa.DDL(
22+
"""
23+
UPDATE projects
24+
SET workbench = (regexp_replace(workbench::text, '"FAILURE"', '"FAILED"'))::json
25+
WHERE workbench::text LIKE '%%FAILURE%%'
26+
"""
27+
)
28+
)
29+
30+
31+
def downgrade():
32+
# ### commands auto generated by Alembic - please adjust! ###
33+
pass
34+
# ### end Alembic commands ###
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,57 @@
1+
import logging
12
from typing import AsyncGenerator, Callable, Type
23

3-
from aiopg.sa import Engine
4+
from aiopg.sa import Engine, SAConnection
45
from fastapi import Depends
56
from fastapi.requests import Request
67

78
from ...db.repositories import BaseRepository
89

10+
logger = logging.getLogger(__name__)
11+
912

1013
def _get_db_engine(request: Request) -> Engine:
1114
return request.app.state.engine
1215

1316

17+
async def _acquire_connection(engine: Engine = Depends(_get_db_engine)) -> SAConnection:
18+
logger.debug(
19+
"Acquiring pg connection from pool: pool size=%d, acquired=%d, free=%d, reserved=[%d, %d]",
20+
engine.size,
21+
engine.size - engine.freesize,
22+
engine.freesize,
23+
engine.minsize,
24+
engine.maxsize,
25+
)
26+
if engine.freesize <= 1:
27+
logger.warning(
28+
"Last or no pg connection in pool: pool size=%d, acquired=%d, free=%d, reserved=[%d, %d]",
29+
engine.size,
30+
engine.size - engine.freesize,
31+
engine.freesize,
32+
engine.minsize,
33+
engine.maxsize,
34+
)
35+
36+
async with engine.acquire() as conn:
37+
yield conn
38+
39+
logger.debug(
40+
"Released pg connection: pool size=%d, acquired=%d, free=%d, reserved=[%d, %d]",
41+
engine.size,
42+
engine.size - engine.freesize,
43+
engine.freesize,
44+
engine.minsize,
45+
engine.maxsize,
46+
)
47+
48+
1449
def get_repository(repo_type: Type[BaseRepository]) -> Callable:
1550
async def _get_repo(
16-
engine: Engine = Depends(_get_db_engine),
51+
db_connection: SAConnection = Depends(_acquire_connection),
1752
) -> AsyncGenerator[BaseRepository, None]:
18-
async with engine.acquire() as conn:
19-
yield repo_type(conn)
53+
# NOTE: Since _acquire_connection is a dependency, it is a cached by FastApi and is only called once per request
54+
# Be very careful if you change this!!! or we will end up in the problem described in https://github.com/ITISFoundation/osparc-simcore/pull/1966
55+
yield repo_type(db_connection)
2056

2157
return _get_repo
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import logging
22
from typing import AsyncGenerator, Callable, Type
33

4-
from aiopg.sa import Engine
4+
from aiopg.sa import Engine, SAConnection
55
from fastapi import Depends
66
from fastapi.requests import Request
77

@@ -14,36 +14,44 @@ def _get_db_engine(request: Request) -> Engine:
1414
return request.app.state.engine
1515

1616

17-
def get_repository(repo_type: Type[BaseRepository]) -> Callable:
18-
async def _get_repo(
19-
engine: Engine = Depends(_get_db_engine),
20-
) -> AsyncGenerator[BaseRepository, None]:
21-
22-
logger.debug(
23-
"Acquiring pg connection from pool: current=%d, free=%d, reserved=[%d, %d]",
24-
engine.size,
25-
engine.freesize,
26-
engine.minsize,
27-
engine.maxsize,
28-
)
29-
if engine.freesize <= 1:
30-
logger.warning(
31-
"Last or no pg connection in pool: current=%d, free=%d, reserved=[%d, %d]",
32-
engine.size,
33-
engine.freesize,
34-
engine.minsize,
35-
engine.maxsize,
36-
)
37-
38-
async with engine.acquire() as conn:
39-
yield repo_type(conn)
40-
41-
logger.debug(
42-
"Released pg connection: current=%d, free=%d, reserved=[%d, %d]",
17+
async def _acquire_connection(engine: Engine = Depends(_get_db_engine)) -> SAConnection:
18+
logger.debug(
19+
"Acquiring pg connection from pool: pool size=%d, acquired=%d, free=%d, reserved=[%d, %d]",
20+
engine.size,
21+
engine.size - engine.freesize,
22+
engine.freesize,
23+
engine.minsize,
24+
engine.maxsize,
25+
)
26+
if engine.freesize <= 1:
27+
logger.warning(
28+
"Last or no pg connection in pool: pool size=%d, acquired=%d, free=%d, reserved=[%d, %d]",
4329
engine.size,
30+
engine.size - engine.freesize,
4431
engine.freesize,
4532
engine.minsize,
4633
engine.maxsize,
4734
)
4835

36+
async with engine.acquire() as conn:
37+
yield conn
38+
39+
logger.debug(
40+
"Released pg connection: pool size=%d, acquired=%d, free=%d, reserved=[%d, %d]",
41+
engine.size,
42+
engine.size - engine.freesize,
43+
engine.freesize,
44+
engine.minsize,
45+
engine.maxsize,
46+
)
47+
48+
49+
def get_repository(repo_type: Type[BaseRepository]) -> Callable:
50+
async def _get_repo(
51+
db_connection: SAConnection = Depends(_acquire_connection),
52+
) -> AsyncGenerator[BaseRepository, None]:
53+
# NOTE: Since _acquire_connection is a dependency, it is a cached by FastApi and is only called once per request
54+
# Be very careful if you change this!!! or we will end up in the problem described in https://github.com/ITISFoundation/osparc-simcore/pull/1966
55+
yield repo_type(db_connection)
56+
4957
return _get_repo
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import logging
22
from typing import AsyncGenerator, Callable, Type
33

4-
from aiopg.sa import Engine
4+
from aiopg.sa import Engine, SAConnection
55
from fastapi import Depends
66
from fastapi.requests import Request
77

@@ -14,36 +14,44 @@ def _get_db_engine(request: Request) -> Engine:
1414
return request.app.state.engine
1515

1616

17-
def get_repository(repo_type: Type[BaseRepository]) -> Callable:
18-
async def _get_repo(
19-
engine: Engine = Depends(_get_db_engine),
20-
) -> AsyncGenerator[BaseRepository, None]:
21-
22-
logger.debug(
23-
"Acquiring pg connection from pool: current=%d, free=%d, reserved=[%d, %d]",
24-
engine.size,
25-
engine.freesize,
26-
engine.minsize,
27-
engine.maxsize,
28-
)
29-
if engine.freesize <= 1:
30-
logger.warning(
31-
"Last or no pg connection in pool: current=%d, free=%d, reserved=[%d, %d]",
32-
engine.size,
33-
engine.freesize,
34-
engine.minsize,
35-
engine.maxsize,
36-
)
37-
38-
async with engine.acquire() as conn:
39-
yield repo_type(conn)
40-
41-
logger.debug(
42-
"Released pg connection: current=%d, free=%d, reserved=[%d, %d]",
17+
async def _acquire_connection(engine: Engine = Depends(_get_db_engine)) -> SAConnection:
18+
logger.debug(
19+
"Acquiring pg connection from pool: pool size=%d, acquired=%d, free=%d, reserved=[%d, %d]",
20+
engine.size,
21+
engine.size - engine.freesize,
22+
engine.freesize,
23+
engine.minsize,
24+
engine.maxsize,
25+
)
26+
if engine.freesize <= 1:
27+
logger.warning(
28+
"Last or no pg connection in pool: pool size=%d, acquired=%d, free=%d, reserved=[%d, %d]",
4329
engine.size,
30+
engine.size - engine.freesize,
4431
engine.freesize,
4532
engine.minsize,
4633
engine.maxsize,
4734
)
4835

36+
async with engine.acquire() as conn:
37+
yield conn
38+
39+
logger.debug(
40+
"Released pg connection: pool size=%d, acquired=%d, free=%d, reserved=[%d, %d]",
41+
engine.size,
42+
engine.size - engine.freesize,
43+
engine.freesize,
44+
engine.minsize,
45+
engine.maxsize,
46+
)
47+
48+
49+
def get_repository(repo_type: Type[BaseRepository]) -> Callable:
50+
async def _get_repo(
51+
db_connection: SAConnection = Depends(_acquire_connection),
52+
) -> AsyncGenerator[BaseRepository, None]:
53+
# NOTE: Since _acquire_connection is a dependency, it is a cached by FastApi and is only called once per request
54+
# Be very careful if you change this!!! or we will end up in the problem described in https://github.com/ITISFoundation/osparc-simcore/pull/1966
55+
yield repo_type(db_connection)
56+
4957
return _get_repo

0 commit comments

Comments
 (0)