Coverage for api/endpoints/misc/documents.py: 73%

37 statements  

« prev     ^ index     » next       coverage.py v7.6.2, created at 2024-10-10 03:02 +0300

1from typing import Annotated 

2 

3from fastapi import APIRouter, Depends 

4from handlers.misc.documents import ( 

5 on_create_document, 

6 on_delete_document, 

7 on_get_document, 

8 on_get_documents, 

9 on_get_documents_new, 

10 on_update_document, 

11) 

12from keycloak import idp 

13from sotrans_models.models._mongo import PydanticObjectIdPath 

14from sotrans_models.models.misc.document import ( 

15 DocumentCreateModel, 

16 DocumentDBModel, 

17 DocumentUpdateModel, 

18) 

19from sotrans_models.models.responses import ErrorRepr, GenericGetListResponse 

20from sotrans_models.models.roles import SotransRole 

21from sotrans_models.models.users import SotransOIDCUserModel 

22from starlette import status 

23from utils.data_grabber import BaseGetListQueryParams, BaseGetOneQueryParams 

24 

25documents_router = APIRouter(prefix="/documents", tags=["documents"]) 

26 

27documents_col_router = APIRouter(prefix="", tags=["documents"]) 

28 

29 

30@documents_router.post( 

31 "", 

32 status_code=status.HTTP_201_CREATED, 

33 responses={400: {"model": ErrorRepr}}, 

34) 

35async def create_document( 

36 document: DocumentCreateModel, 

37 user: Annotated[ 

38 SotransOIDCUserModel, 

39 Depends( 

40 idp.get_current_user( 

41 required_role_names=[SotransRole.carrier_logistician] 

42 ) 

43 ), 

44 ], 

45) -> DocumentDBModel: 

46 return await on_create_document(document, user) 

47 

48 

49@documents_col_router.post( 

50 "/{collection}/{object_id}/documents", 

51 status_code=status.HTTP_201_CREATED, 

52 responses={400: {"model": ErrorRepr}}, 

53) 

54async def create_document_new( 

55 collection: str, 

56 object_id: PydanticObjectIdPath, 

57 document: DocumentUpdateModel, 

58 user: Annotated[ 

59 SotransOIDCUserModel, 

60 Depends( 

61 idp.get_current_user( 

62 required_role_names=[ 

63 SotransRole.drivers_bot, 

64 SotransRole.carrier_logistician, 

65 ] 

66 ) 

67 ), 

68 ], 

69) -> DocumentDBModel: 

70 document.object_id = object_id 

71 document.collection = collection 

72 document = DocumentCreateModel(**document.model_dump()) 

73 return await on_create_document(document, user) 

74 

75 

76@documents_router.get("") 

77async def get_documents( 

78 user: Annotated[ 

79 SotransOIDCUserModel, 

80 Depends( 

81 idp.get_current_user( 

82 required_role_names=[SotransRole.carrier_logistician] 

83 ) 

84 ), 

85 ], 

86 params: BaseGetListQueryParams = Depends(), 

87) -> GenericGetListResponse[DocumentDBModel]: 

88 return await on_get_documents(user, params) 

89 

90 

91@documents_col_router.get( 

92 "/{collection}/{object_id}/documents", 

93) 

94async def get_documents_new( 

95 collection: str, 

96 object_id: PydanticObjectIdPath, 

97 user: Annotated[ 

98 SotransOIDCUserModel, 

99 Depends( 

100 idp.get_current_user( 

101 required_role_names=[SotransRole.carrier_logistician] 

102 ) 

103 ), 

104 ], 

105 params: BaseGetListQueryParams = Depends(), 

106) -> GenericGetListResponse[DocumentDBModel]: 

107 return await on_get_documents_new(user, collection, object_id, params) 

108 

109 

110@documents_router.get("/{document_id}", responses={404: {"model": ErrorRepr}}) 

111async def document_details( 

112 user: Annotated[ 

113 SotransOIDCUserModel, 

114 Depends( 

115 idp.get_current_user( 

116 required_role_names=[SotransRole.carrier_logistician] 

117 ) 

118 ), 

119 ], 

120 document_id: PydanticObjectIdPath, 

121 params: BaseGetOneQueryParams = Depends(), 

122) -> DocumentDBModel: 

123 return await on_get_document(user, document_id, params) 

124 

125 

126@documents_router.delete( 

127 "/{document_id}", 

128 responses={204: {"description": "No content"}, 404: {"model": ErrorRepr}}, 

129 status_code=204, 

130) 

131async def remove_document( 

132 document_id: PydanticObjectIdPath, 

133 user: Annotated[ 

134 SotransOIDCUserModel, 

135 Depends( 

136 idp.get_current_user( 

137 required_role_names=[SotransRole.carrier_logistician] 

138 ) 

139 ), 

140 ], 

141 resource_with_etag: DocumentUpdateModel | None = None, 

142): 

143 await on_delete_document( 

144 user, 

145 document_id, 

146 None if resource_with_etag is None else resource_with_etag.etag, 

147 ) 

148 

149 

150@documents_router.patch( 

151 "/{doc_id}", 

152 responses={404: {"model": ErrorRepr}, 406: {"model": ErrorRepr}}, 

153) 

154async def update_document( 

155 doc_id: PydanticObjectIdPath, 

156 document: DocumentUpdateModel, 

157 user: Annotated[ 

158 SotransOIDCUserModel, 

159 Depends( 

160 idp.get_current_user( 

161 required_role_names=[SotransRole.carrier_logistician] 

162 ) 

163 ), 

164 ], 

165) -> DocumentDBModel: 

166 return await on_update_document(user, doc_id, document)