Coverage for api/endpoints/resources/trailers.py: 81%

27 statements  

« prev     ^ index     » next       coverage.py v7.6.2, created at 2024-10-10 03:02 +0300

1from fastapi import APIRouter, Depends 

2from handlers.resources.trailers import ( 

3 on_create_trailer, 

4 on_delete_trailer, 

5 on_get_trailer, 

6 on_get_trailers, 

7 on_update_trailer, 

8) 

9from keycloak import idp 

10from sotrans_models.models._mongo import PydanticObjectIdPath 

11from sotrans_models.models.resources.trailers import ( 

12 TrailerCreateModel, 

13 TrailerDBModel, 

14 TrailerUpdateModel, 

15) 

16from sotrans_models.models.responses import ErrorRepr, GenericGetListResponse 

17from sotrans_models.models.roles import SotransRole 

18from sotrans_models.models.users import SotransOIDCUserModel 

19from starlette import status 

20from typing_extensions import Annotated 

21from utils.data_grabber import BaseGetListQueryParams 

22 

23trailers_router = APIRouter(prefix="/trailers", tags=["trailers"]) 

24 

25 

26@trailers_router.post("", status_code=status.HTTP_201_CREATED) 

27async def create_trailer( 

28 creation_data: TrailerCreateModel, 

29 executor: Annotated[ 

30 SotransOIDCUserModel, 

31 Depends( 

32 idp.get_current_user( 

33 required_role_names=[SotransRole.carrier_logistician] 

34 ) 

35 ), 

36 ], 

37) -> TrailerDBModel: 

38 return await on_create_trailer(creation_data, executor) 

39 

40 

41@trailers_router.get("") 

42async def get_trailers( 

43 _: Annotated[ 

44 SotransOIDCUserModel, 

45 Depends( 

46 idp.get_current_user( 

47 required_role_names=[SotransRole.company_logistician] 

48 ) 

49 ), 

50 ], 

51 params: BaseGetListQueryParams = Depends(), 

52) -> GenericGetListResponse[TrailerDBModel]: 

53 return await on_get_trailers(params) 

54 

55 

56@trailers_router.get("/{trailer_id}", responses={404: {"model": ErrorRepr}}) 

57async def get_trailer( 

58 trailer_id: PydanticObjectIdPath, 

59 executor: Annotated[SotransOIDCUserModel, Depends(idp.get_current_user())], 

60) -> TrailerDBModel: 

61 return await on_get_trailer(trailer_id, executor) 

62 

63 

64@trailers_router.patch("/{trailer_id}") 

65async def update_trailer( 

66 trailer_id: PydanticObjectIdPath, 

67 update_data: TrailerUpdateModel, 

68 executor: Annotated[ 

69 SotransOIDCUserModel, 

70 Depends( 

71 idp.get_current_user( 

72 required_role_names=[SotransRole.carrier_logistician] 

73 ) 

74 ), 

75 ], 

76) -> TrailerDBModel: 

77 return await on_update_trailer(trailer_id, update_data, executor) 

78 

79 

80@trailers_router.delete( 

81 "/{trailer_id}", 

82 responses={204: {"description": "No content"}, 404: {"model": ErrorRepr}}, 

83 status_code=204, 

84) 

85async def delete_trailer( 

86 trailer_id: PydanticObjectIdPath, 

87 executor: Annotated[ 

88 SotransOIDCUserModel, 

89 Depends(idp.get_current_user([SotransRole.carrier_logistician])), 

90 ], 

91 resource_with_etag: TrailerUpdateModel | None = None, 

92): 

93 await on_delete_trailer( 

94 trailer_id, 

95 executor, 

96 resource_with_etag.etag if resource_with_etag is not None else None, 

97 )