Coverage for api/endpoints/misc/documents.py: 73%
37 statements
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
1from typing import Annotated
3from fastapi import APIRouter, Depends
4from handlers.misc.documents import (
5 on_create_document,
6 on_delete_document,
7 on_get_document,
8 on_get_documents,
9 on_get_documents_new,
10 on_update_document,
11)
12from keycloak import idp
13from sotrans_models.models._mongo import PydanticObjectIdPath
14from sotrans_models.models.misc.document import (
15 DocumentCreateModel,
16 DocumentDBModel,
17 DocumentUpdateModel,
18)
19from sotrans_models.models.responses import ErrorRepr, GenericGetListResponse
20from sotrans_models.models.roles import SotransRole
21from sotrans_models.models.users import SotransOIDCUserModel
22from starlette import status
23from utils.data_grabber import BaseGetListQueryParams, BaseGetOneQueryParams
25documents_router = APIRouter(prefix="/documents", tags=["documents"])
27documents_col_router = APIRouter(prefix="", tags=["documents"])
30@documents_router.post(
31 "",
32 status_code=status.HTTP_201_CREATED,
33 responses={400: {"model": ErrorRepr}},
34)
35async def create_document(
36 document: DocumentCreateModel,
37 user: Annotated[
38 SotransOIDCUserModel,
39 Depends(
40 idp.get_current_user(
41 required_role_names=[SotransRole.carrier_logistician]
42 )
43 ),
44 ],
45) -> DocumentDBModel:
46 return await on_create_document(document, user)
49@documents_col_router.post(
50 "/{collection}/{object_id}/documents",
51 status_code=status.HTTP_201_CREATED,
52 responses={400: {"model": ErrorRepr}},
53)
54async def create_document_new(
55 collection: str,
56 object_id: PydanticObjectIdPath,
57 document: DocumentUpdateModel,
58 user: Annotated[
59 SotransOIDCUserModel,
60 Depends(
61 idp.get_current_user(
62 required_role_names=[
63 SotransRole.drivers_bot,
64 SotransRole.carrier_logistician,
65 ]
66 )
67 ),
68 ],
69) -> DocumentDBModel:
70 document.object_id = object_id
71 document.collection = collection
72 document = DocumentCreateModel(**document.model_dump())
73 return await on_create_document(document, user)
76@documents_router.get("")
77async def get_documents(
78 user: Annotated[
79 SotransOIDCUserModel,
80 Depends(
81 idp.get_current_user(
82 required_role_names=[SotransRole.carrier_logistician]
83 )
84 ),
85 ],
86 params: BaseGetListQueryParams = Depends(),
87) -> GenericGetListResponse[DocumentDBModel]:
88 return await on_get_documents(user, params)
91@documents_col_router.get(
92 "/{collection}/{object_id}/documents",
93)
94async def get_documents_new(
95 collection: str,
96 object_id: PydanticObjectIdPath,
97 user: Annotated[
98 SotransOIDCUserModel,
99 Depends(
100 idp.get_current_user(
101 required_role_names=[SotransRole.carrier_logistician]
102 )
103 ),
104 ],
105 params: BaseGetListQueryParams = Depends(),
106) -> GenericGetListResponse[DocumentDBModel]:
107 return await on_get_documents_new(user, collection, object_id, params)
110@documents_router.get("/{document_id}", responses={404: {"model": ErrorRepr}})
111async def document_details(
112 user: Annotated[
113 SotransOIDCUserModel,
114 Depends(
115 idp.get_current_user(
116 required_role_names=[SotransRole.carrier_logistician]
117 )
118 ),
119 ],
120 document_id: PydanticObjectIdPath,
121 params: BaseGetOneQueryParams = Depends(),
122) -> DocumentDBModel:
123 return await on_get_document(user, document_id, params)
126@documents_router.delete(
127 "/{document_id}",
128 responses={204: {"description": "No content"}, 404: {"model": ErrorRepr}},
129 status_code=204,
130)
131async def remove_document(
132 document_id: PydanticObjectIdPath,
133 user: Annotated[
134 SotransOIDCUserModel,
135 Depends(
136 idp.get_current_user(
137 required_role_names=[SotransRole.carrier_logistician]
138 )
139 ),
140 ],
141 resource_with_etag: DocumentUpdateModel | None = None,
142):
143 await on_delete_document(
144 user,
145 document_id,
146 None if resource_with_etag is None else resource_with_etag.etag,
147 )
150@documents_router.patch(
151 "/{doc_id}",
152 responses={404: {"model": ErrorRepr}, 406: {"model": ErrorRepr}},
153)
154async def update_document(
155 doc_id: PydanticObjectIdPath,
156 document: DocumentUpdateModel,
157 user: Annotated[
158 SotransOIDCUserModel,
159 Depends(
160 idp.get_current_user(
161 required_role_names=[SotransRole.carrier_logistician]
162 )
163 ),
164 ],
165) -> DocumentDBModel:
166 return await on_update_document(user, doc_id, document)