Coverage for api/endpoints/resources/trailers.py: 81%
27 statements
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
1from fastapi import APIRouter, Depends
2from handlers.resources.trailers import (
3 on_create_trailer,
4 on_delete_trailer,
5 on_get_trailer,
6 on_get_trailers,
7 on_update_trailer,
8)
9from keycloak import idp
10from sotrans_models.models._mongo import PydanticObjectIdPath
11from sotrans_models.models.resources.trailers import (
12 TrailerCreateModel,
13 TrailerDBModel,
14 TrailerUpdateModel,
15)
16from sotrans_models.models.responses import ErrorRepr, GenericGetListResponse
17from sotrans_models.models.roles import SotransRole
18from sotrans_models.models.users import SotransOIDCUserModel
19from starlette import status
20from typing_extensions import Annotated
21from utils.data_grabber import BaseGetListQueryParams
23trailers_router = APIRouter(prefix="/trailers", tags=["trailers"])
26@trailers_router.post("", status_code=status.HTTP_201_CREATED)
27async def create_trailer(
28 creation_data: TrailerCreateModel,
29 executor: Annotated[
30 SotransOIDCUserModel,
31 Depends(
32 idp.get_current_user(
33 required_role_names=[SotransRole.carrier_logistician]
34 )
35 ),
36 ],
37) -> TrailerDBModel:
38 return await on_create_trailer(creation_data, executor)
41@trailers_router.get("")
42async def get_trailers(
43 _: Annotated[
44 SotransOIDCUserModel,
45 Depends(
46 idp.get_current_user(
47 required_role_names=[SotransRole.company_logistician]
48 )
49 ),
50 ],
51 params: BaseGetListQueryParams = Depends(),
52) -> GenericGetListResponse[TrailerDBModel]:
53 return await on_get_trailers(params)
56@trailers_router.get("/{trailer_id}", responses={404: {"model": ErrorRepr}})
57async def get_trailer(
58 trailer_id: PydanticObjectIdPath,
59 executor: Annotated[SotransOIDCUserModel, Depends(idp.get_current_user())],
60) -> TrailerDBModel:
61 return await on_get_trailer(trailer_id, executor)
64@trailers_router.patch("/{trailer_id}")
65async def update_trailer(
66 trailer_id: PydanticObjectIdPath,
67 update_data: TrailerUpdateModel,
68 executor: Annotated[
69 SotransOIDCUserModel,
70 Depends(
71 idp.get_current_user(
72 required_role_names=[SotransRole.carrier_logistician]
73 )
74 ),
75 ],
76) -> TrailerDBModel:
77 return await on_update_trailer(trailer_id, update_data, executor)
80@trailers_router.delete(
81 "/{trailer_id}",
82 responses={204: {"description": "No content"}, 404: {"model": ErrorRepr}},
83 status_code=204,
84)
85async def delete_trailer(
86 trailer_id: PydanticObjectIdPath,
87 executor: Annotated[
88 SotransOIDCUserModel,
89 Depends(idp.get_current_user([SotransRole.carrier_logistician])),
90 ],
91 resource_with_etag: TrailerUpdateModel | None = None,
92):
93 await on_delete_trailer(
94 trailer_id,
95 executor,
96 resource_with_etag.etag if resource_with_etag is not None else None,
97 )