Skip to content

🐛Storage Worker does not need a RabbitMQ client #7426

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ def create_app(settings: ApplicationSettings) -> FastAPI: # noqa: C901
setup_s3(app)
setup_client_session(app)

setup_rabbitmq(app)
if not settings.STORAGE_WORKER_MODE:
setup_rabbitmq(app)
setup_rpc_api_routes(app)
setup_celery_client(app)
setup_rest_api_long_running_tasks_for_uploads(app)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from fastapi import FastAPI
from servicelib.logging_utils import log_context
from servicelib.rabbitmq import (
RabbitMQClient,
RabbitMQRPCClient,
wait_till_rabbitmq_responsive,
)
Expand All @@ -22,16 +21,12 @@ async def on_startup() -> None:
logging.INFO,
msg="Storage startup Rabbitmq",
):
app.state.rabbitmq_client = None
rabbit_settings: RabbitSettings | None = app.state.settings.STORAGE_RABBITMQ
if not rabbit_settings:
raise ConfigurationError(
msg="RabbitMQ client is de-activated in the settings"
)
await wait_till_rabbitmq_responsive(rabbit_settings.dsn)
app.state.rabbitmq_client = RabbitMQClient(
client_name="storage", settings=rabbit_settings
)
app.state.rabbitmq_rpc_server = await RabbitMQRPCClient.create(
client_name="storage_rpc_server", settings=rabbit_settings
)
Expand All @@ -42,23 +37,13 @@ async def on_shutdown() -> None:
logging.INFO,
msg="Storage shutdown Rabbitmq",
):
if app.state.rabbitmq_client:
await app.state.rabbitmq_client.close()
if app.state.rabbitmq_rpc_server:
await app.state.rabbitmq_rpc_server.close()

app.add_event_handler("startup", on_startup)
app.add_event_handler("shutdown", on_shutdown)


def get_rabbitmq_client(app: FastAPI) -> RabbitMQClient:
if not app.state.rabbitmq_client:
raise ConfigurationError(
msg="RabbitMQ client is not available. Please check the configuration."
)
return cast(RabbitMQClient, app.state.rabbitmq_client)


def get_rabbitmq_rpc_server(app: FastAPI) -> RabbitMQRPCClient:
assert app.state.rabbitmq_rpc_server # nosec
return cast(RabbitMQRPCClient, app.state.rabbitmq_rpc_server)
Loading