Coverage for api/endpoints/resources/drivers.py: 81%
27 statements
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
1from fastapi import APIRouter, Depends
2from handlers.resources.drivers import (
3 on_create_driver,
4 on_delete_driver,
5 on_get_driver,
6 on_get_drivers,
7 on_update_driver,
8)
9from keycloak import idp
10from sotrans_models.models._mongo import PydanticObjectIdPath
11from sotrans_models.models.resources.drivers import (
12 DriverCreateModel,
13 DriverDBModel,
14 DriverUpdateModel,
15)
16from sotrans_models.models.responses import ErrorRepr, GenericGetListResponse
17from sotrans_models.models.roles import SotransRole
18from sotrans_models.models.users import SotransOIDCUserModel
19from starlette import status
20from typing_extensions import Annotated
21from utils.data_grabber import BaseGetListQueryParams
23drivers_router = APIRouter(prefix="/drivers", tags=["drivers"])
26@drivers_router.post("", status_code=status.HTTP_201_CREATED)
27async def create_driver(
28 creation_data: DriverCreateModel,
29 executor: Annotated[
30 SotransOIDCUserModel,
31 Depends(
32 idp.get_current_user(
33 required_role_names=[SotransRole.carrier_logistician]
34 )
35 ),
36 ],
37) -> DriverDBModel:
38 return await on_create_driver(creation_data, executor)
41@drivers_router.get("")
42async def get_drivers(
43 _: Annotated[
44 SotransOIDCUserModel,
45 Depends(
46 idp.get_current_user(
47 required_role_names=[
48 SotransRole.company_logistician,
49 SotransRole.drivers_bot,
50 ]
51 )
52 ),
53 ],
54 params: BaseGetListQueryParams = Depends(),
55) -> GenericGetListResponse[DriverDBModel]:
56 return await on_get_drivers(params)
59@drivers_router.get("/{driver_id}", responses={404: {"model": ErrorRepr}})
60async def get_driver(
61 driver_id: PydanticObjectIdPath,
62 executor: Annotated[SotransOIDCUserModel, Depends(idp.get_current_user())],
63) -> DriverDBModel:
64 return await on_get_driver(driver_id, executor)
67@drivers_router.patch("/{driver_id}", responses={404: {"model": ErrorRepr}})
68async def update_driver(
69 driver_id: PydanticObjectIdPath,
70 update_data: DriverUpdateModel,
71 executor: Annotated[
72 SotransOIDCUserModel,
73 Depends(
74 idp.get_current_user(
75 required_role_names=[SotransRole.carrier_logistician]
76 )
77 ),
78 ],
79) -> DriverDBModel:
80 return await on_update_driver(driver_id, update_data, executor)
83@drivers_router.delete(
84 "/{driver_id}",
85 responses={
86 204: {"description": "No content"},
87 404: {"model": ErrorRepr},
88 },
89 status_code=204,
90)
91async def delete_driver(
92 driver_id: PydanticObjectIdPath,
93 executor: Annotated[
94 SotransOIDCUserModel,
95 Depends(idp.get_current_user([SotransRole.carrier_logistician])),
96 ],
97 resource_with_etag: DriverUpdateModel | None = None,
98):
99 await on_delete_driver(
100 driver_id,
101 executor,
102 resource_with_etag.etag if resource_with_etag is not None else None,
103 )