Skip to content

Commit 3f0f517

Browse files
committed
seems to work better
1 parent 9cc24b1 commit 3f0f517

File tree

2 files changed

+39
-38
lines changed
  • packages/aws-library/src/aws_library/s3
  • services/storage/src/simcore_service_storage/modules/celery

2 files changed

+39
-38
lines changed

packages/aws-library/src/aws_library/s3/_client.py

+22-13
Original file line numberDiff line numberDiff line change
@@ -85,21 +85,30 @@ async def create(
8585
cls, settings: S3Settings, s3_max_concurrency: int = _S3_MAX_CONCURRENCY_DEFAULT
8686
) -> "SimcoreS3API":
8787
session = aioboto3.Session()
88-
session_client = session.client( # type: ignore[call-overload]
89-
"s3",
90-
endpoint_url=f"{settings.S3_ENDPOINT}",
91-
aws_access_key_id=settings.S3_ACCESS_KEY,
92-
aws_secret_access_key=settings.S3_SECRET_KEY,
93-
region_name=settings.S3_REGION,
94-
config=Config(signature_version="s3v4"),
95-
)
96-
assert isinstance(session_client, ClientCreatorContext) # nosec
88+
session_client = None
9789
exit_stack = contextlib.AsyncExitStack()
98-
s3_client = cast(S3Client, await exit_stack.enter_async_context(session_client))
99-
# NOTE: this triggers a botocore.exception.ClientError in case the connection is not made to the S3 backend
100-
await s3_client.list_buckets()
90+
try:
91+
session_client = session.client( # type: ignore[call-overload]
92+
"s3",
93+
endpoint_url=f"{settings.S3_ENDPOINT}",
94+
aws_access_key_id=settings.S3_ACCESS_KEY,
95+
aws_secret_access_key=settings.S3_SECRET_KEY,
96+
region_name=settings.S3_REGION,
97+
config=Config(signature_version="s3v4"),
98+
)
99+
assert isinstance(session_client, ClientCreatorContext) # nosec
101100

102-
return cls(s3_client, session, exit_stack, s3_max_concurrency)
101+
s3_client = cast(
102+
S3Client, await exit_stack.enter_async_context(session_client)
103+
)
104+
# NOTE: this triggers a botocore.exception.ClientError in case the connection is not made to the S3 backend
105+
await s3_client.list_buckets()
106+
107+
return cls(s3_client, session, exit_stack, s3_max_concurrency)
108+
except Exception:
109+
await exit_stack.aclose()
110+
111+
raise
103112

104113
async def close(self) -> None:
105114
await self._exit_stack.aclose()

services/storage/src/simcore_service_storage/modules/celery/signals.py

+17-25
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@
77
from asgi_lifespan import LifespanManager
88
from celery import Celery # type: ignore[import-untyped]
99
from fastapi import FastAPI
10-
from servicelib.async_utils import cancel_wait_task
10+
from servicelib.logging_utils import log_context
1111

1212
from ...core.application import create_app
1313
from ...core.settings import ApplicationSettings
14-
from ...modules.celery import get_event_loop, set_event_loop
14+
from ...modules.celery import set_event_loop
1515
from ...modules.celery.utils import (
1616
get_fastapi_app,
1717
set_celery_worker,
@@ -21,8 +21,8 @@
2121

2222
_logger = logging.getLogger(__name__)
2323

24-
_LIFESPAN_TIMEOUT: Final[int] = 10
25-
_FASTAPI_STARTUP_TIMEOUT: Final[float] = datetime.timedelta(minutes=1).total_seconds()
24+
_SHUTDOWN_TIMEOUT: Final[float] = datetime.timedelta(seconds=10).total_seconds()
25+
_STARTUP_TIMEOUT: Final[float] = datetime.timedelta(minutes=1).total_seconds()
2626

2727

2828
def on_worker_init(sender, **_kwargs) -> None:
@@ -35,11 +35,13 @@ def _init_fastapi(startup_complete_event: threading.Event) -> None:
3535

3636
fastapi_app = create_app(ApplicationSettings.create_from_envs())
3737

38-
async def lifespan(startup_complete_event: threading.Event) -> None:
38+
async def lifespan(
39+
startup_complete_event: threading.Event, shutdown_event: asyncio.Event
40+
) -> None:
3941
async with LifespanManager(
4042
fastapi_app,
41-
startup_timeout=_LIFESPAN_TIMEOUT,
42-
shutdown_timeout=_LIFESPAN_TIMEOUT,
43+
startup_timeout=_STARTUP_TIMEOUT,
44+
shutdown_timeout=_SHUTDOWN_TIMEOUT,
4345
):
4446
try:
4547
_logger.info("fastapi APP started!")
@@ -48,37 +50,27 @@ async def lifespan(startup_complete_event: threading.Event) -> None:
4850
except asyncio.CancelledError:
4951
_logger.warning("Lifespan task cancelled")
5052

51-
lifespan_task = loop.create_task(lifespan(startup_complete_event))
52-
fastapi_app.state.lifespan_task = lifespan_task
5353
fastapi_app.state.shutdown_event = shutdown_event
5454
set_event_loop(fastapi_app, loop)
5555

5656
set_fastapi_app(sender.app, fastapi_app)
5757
set_celery_worker(sender.app, CeleryTaskQueueWorker(sender.app))
58-
59-
loop.run_forever()
58+
loop.run_until_complete(lifespan(startup_complete_event, shutdown_event))
6059

6160
thread = threading.Thread(
61+
group=None,
6262
target=_init_fastapi,
6363
name="fastapi_app",
6464
args=(startup_complete_event,),
65-
daemon=True,
6665
)
6766
thread.start()
6867
# ensure the fastapi app is ready before going on
69-
startup_complete_event.wait(_FASTAPI_STARTUP_TIMEOUT)
70-
71-
72-
def on_worker_shutdown(sender, **_kwargs):
73-
assert isinstance(sender.app, Celery)
68+
startup_complete_event.wait(_STARTUP_TIMEOUT * 1.1)
7469

75-
fastapi_app = get_fastapi_app(sender.app)
76-
assert isinstance(fastapi_app, FastAPI)
77-
event_loop = get_event_loop(fastapi_app)
7870

79-
async def shutdown():
71+
def on_worker_shutdown(sender, **_kwargs) -> None:
72+
with log_context(_logger, logging.INFO, "Worker Shuts-down"):
73+
assert isinstance(sender.app, Celery)
74+
fastapi_app = get_fastapi_app(sender.app)
75+
assert isinstance(fastapi_app, FastAPI)
8076
fastapi_app.state.shutdown_event.set()
81-
82-
await cancel_wait_task(fastapi_app.state.lifespan_task, max_delay=5)
83-
84-
asyncio.run_coroutine_threadsafe(shutdown(), event_loop)

0 commit comments

Comments
 (0)