Skip to content

ensure only one connection acquisition per request #1966

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""migrate workbench state enum

Revision ID: cfd1c43b5d33
Revises: c8a7073deebb
Create Date: 2020-11-17 16:42:32.511722+00:00

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = 'cfd1c43b5d33'
down_revision = 'c8a7073deebb'
branch_labels = None
depends_on = None


def upgrade():
op.execute(
sa.DDL(
"""
UPDATE projects
SET workbench = (regexp_replace(workbench::text, '"FAILURE"', '"FAILED"'))::json
WHERE workbench::text LIKE '%%FAILURE%%'
"""
)
)


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
pass
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
@@ -1,21 +1,57 @@
import logging
from typing import AsyncGenerator, Callable, Type

from aiopg.sa import Engine
from aiopg.sa import Engine, SAConnection
from fastapi import Depends
from fastapi.requests import Request

from ...db.repositories import BaseRepository

logger = logging.getLogger(__name__)


def _get_db_engine(request: Request) -> Engine:
return request.app.state.engine


async def _acquire_connection(engine: Engine = Depends(_get_db_engine)) -> SAConnection:
logger.debug(
"Acquiring pg connection from pool: pool size=%d, acquired=%d, free=%d, reserved=[%d, %d]",
engine.size,
engine.size - engine.freesize,
engine.freesize,
engine.minsize,
engine.maxsize,
)
if engine.freesize <= 1:
logger.warning(
"Last or no pg connection in pool: pool size=%d, acquired=%d, free=%d, reserved=[%d, %d]",
engine.size,
engine.size - engine.freesize,
engine.freesize,
engine.minsize,
engine.maxsize,
)

async with engine.acquire() as conn:
yield conn

logger.debug(
"Released pg connection: pool size=%d, acquired=%d, free=%d, reserved=[%d, %d]",
engine.size,
engine.size - engine.freesize,
engine.freesize,
engine.minsize,
engine.maxsize,
)


def get_repository(repo_type: Type[BaseRepository]) -> Callable:
async def _get_repo(
engine: Engine = Depends(_get_db_engine),
db_connection: SAConnection = Depends(_acquire_connection),
) -> AsyncGenerator[BaseRepository, None]:
async with engine.acquire() as conn:
yield repo_type(conn)
# NOTE: Since _acquire_connection is a dependency, it is a cached by FastApi and is only called once per request
# Be very careful if you change this!!! or we will end up in the problem described in https://github.com/ITISFoundation/osparc-simcore/pull/1966
yield repo_type(db_connection)

return _get_repo
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from typing import AsyncGenerator, Callable, Type

from aiopg.sa import Engine
from aiopg.sa import Engine, SAConnection
from fastapi import Depends
from fastapi.requests import Request

Expand All @@ -14,36 +14,44 @@ def _get_db_engine(request: Request) -> Engine:
return request.app.state.engine


def get_repository(repo_type: Type[BaseRepository]) -> Callable:
async def _get_repo(
engine: Engine = Depends(_get_db_engine),
) -> AsyncGenerator[BaseRepository, None]:

logger.debug(
"Acquiring pg connection from pool: current=%d, free=%d, reserved=[%d, %d]",
engine.size,
engine.freesize,
engine.minsize,
engine.maxsize,
)
if engine.freesize <= 1:
logger.warning(
"Last or no pg connection in pool: current=%d, free=%d, reserved=[%d, %d]",
engine.size,
engine.freesize,
engine.minsize,
engine.maxsize,
)

async with engine.acquire() as conn:
yield repo_type(conn)

logger.debug(
"Released pg connection: current=%d, free=%d, reserved=[%d, %d]",
async def _acquire_connection(engine: Engine = Depends(_get_db_engine)) -> SAConnection:
logger.debug(
"Acquiring pg connection from pool: pool size=%d, acquired=%d, free=%d, reserved=[%d, %d]",
engine.size,
engine.size - engine.freesize,
engine.freesize,
engine.minsize,
engine.maxsize,
)
if engine.freesize <= 1:
logger.warning(
"Last or no pg connection in pool: pool size=%d, acquired=%d, free=%d, reserved=[%d, %d]",
engine.size,
engine.size - engine.freesize,
engine.freesize,
engine.minsize,
engine.maxsize,
)

async with engine.acquire() as conn:
yield conn

logger.debug(
"Released pg connection: pool size=%d, acquired=%d, free=%d, reserved=[%d, %d]",
engine.size,
engine.size - engine.freesize,
engine.freesize,
engine.minsize,
engine.maxsize,
)


def get_repository(repo_type: Type[BaseRepository]) -> Callable:
async def _get_repo(
db_connection: SAConnection = Depends(_acquire_connection),
) -> AsyncGenerator[BaseRepository, None]:
# NOTE: Since _acquire_connection is a dependency, it is a cached by FastApi and is only called once per request
# Be very careful if you change this!!! or we will end up in the problem described in https://github.com/ITISFoundation/osparc-simcore/pull/1966
yield repo_type(db_connection)

return _get_repo
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from typing import AsyncGenerator, Callable, Type

from aiopg.sa import Engine
from aiopg.sa import Engine, SAConnection
from fastapi import Depends
from fastapi.requests import Request

Expand All @@ -14,36 +14,44 @@ def _get_db_engine(request: Request) -> Engine:
return request.app.state.engine


def get_repository(repo_type: Type[BaseRepository]) -> Callable:
async def _get_repo(
engine: Engine = Depends(_get_db_engine),
) -> AsyncGenerator[BaseRepository, None]:

logger.debug(
"Acquiring pg connection from pool: current=%d, free=%d, reserved=[%d, %d]",
engine.size,
engine.freesize,
engine.minsize,
engine.maxsize,
)
if engine.freesize <= 1:
logger.warning(
"Last or no pg connection in pool: current=%d, free=%d, reserved=[%d, %d]",
engine.size,
engine.freesize,
engine.minsize,
engine.maxsize,
)

async with engine.acquire() as conn:
yield repo_type(conn)

logger.debug(
"Released pg connection: current=%d, free=%d, reserved=[%d, %d]",
async def _acquire_connection(engine: Engine = Depends(_get_db_engine)) -> SAConnection:
logger.debug(
"Acquiring pg connection from pool: pool size=%d, acquired=%d, free=%d, reserved=[%d, %d]",
engine.size,
engine.size - engine.freesize,
engine.freesize,
engine.minsize,
engine.maxsize,
)
if engine.freesize <= 1:
logger.warning(
"Last or no pg connection in pool: pool size=%d, acquired=%d, free=%d, reserved=[%d, %d]",
engine.size,
engine.size - engine.freesize,
engine.freesize,
engine.minsize,
engine.maxsize,
)

async with engine.acquire() as conn:
yield conn

logger.debug(
"Released pg connection: pool size=%d, acquired=%d, free=%d, reserved=[%d, %d]",
engine.size,
engine.size - engine.freesize,
engine.freesize,
engine.minsize,
engine.maxsize,
)


def get_repository(repo_type: Type[BaseRepository]) -> Callable:
async def _get_repo(
db_connection: SAConnection = Depends(_acquire_connection),
) -> AsyncGenerator[BaseRepository, None]:
# NOTE: Since _acquire_connection is a dependency, it is a cached by FastApi and is only called once per request
# Be very careful if you change this!!! or we will end up in the problem described in https://github.com/ITISFoundation/osparc-simcore/pull/1966
yield repo_type(db_connection)

return _get_repo