-
-
Notifications
You must be signed in to change notification settings - Fork 322
/
Copy pathendpoints.py
60 lines (45 loc) · 1.45 KB
/
endpoints.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
"""Endpoints module."""
from typing import Annotated
from fastapi import APIRouter, Depends, Response, status
from dependency_injector.wiring import Provide, inject
from .containers import Container
from .repositories import NotFoundError
from .services import UserService
router = APIRouter()
@router.get("/users")
@inject
def get_list(
user_service: Annotated[UserService, Depends(Provide[Container.user_service])],
):
return user_service.get_users()
@router.get("/users/{user_id}")
@inject
def get_by_id(
user_id: int,
user_service: Annotated[UserService, Depends(Provide[Container.user_service])],
):
try:
return user_service.get_user_by_id(user_id)
except NotFoundError:
return Response(status_code=status.HTTP_404_NOT_FOUND)
@router.post("/users", status_code=status.HTTP_201_CREATED)
@inject
def add(
user_service: Annotated[UserService, Depends(Provide[Container.user_service])],
):
return user_service.create_user()
@router.delete("/users/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
@inject
def remove(
user_id: int,
user_service: Annotated[UserService, Depends(Provide[Container.user_service])],
) -> Response:
try:
user_service.delete_user_by_id(user_id)
except NotFoundError:
return Response(status_code=status.HTTP_404_NOT_FOUND)
else:
return Response(status_code=status.HTTP_204_NO_CONTENT)
@router.get("/status")
def get_status():
return {"status": "OK"}