Coverage for api/endpoints/orders/buffer_orders.py: 75%
48 statements
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
1from io import BytesIO
3from aiofiles import os as aios
4from fastapi import APIRouter, Depends, File, UploadFile
5from handlers.orders.buffer_orders import (
6 on_assign_subsidiary,
7 on_create_order,
8 on_get_buffer,
9 on_get_one_from_buffer,
10 on_put_to_exchange,
11 on_put_to_trash,
12 on_update_order,
13)
14from keycloak import idp
15from sotrans_models.models._mongo import PydanticObjectIdPath
16from sotrans_models.models.orders.order import (
17 OrderCreateModel,
18 OrderDBModel,
19 OrderUpdateModel,
20)
21from sotrans_models.models.responses import (
22 ErrorRepr,
23 GenericGetListResponse,
24 MultiselectError,
25)
26from sotrans_models.models.roles import SotransRole
27from sotrans_models.models.users import SotransOIDCUserModel
28from starlette import status
29from starlette.background import BackgroundTasks
30from starlette.responses import FileResponse
31from typing_extensions import Annotated
32from utils.data_grabber import BaseGetListQueryParams, BaseGetOneQueryParams
33from utils.excel import export_buffer, on_import_to_buffer
34from utils.helper import multiexec
36buffer_orders_router = APIRouter(
37 prefix="/buffer-orders", tags=["buffer", "orders"]
38)
41@buffer_orders_router.put("/{order_ids}/to-appointment")
42async def assign_order(
43 order_ids: str,
44 user: Annotated[
45 SotransOIDCUserModel,
46 Depends(
47 idp.get_current_user(
48 required_role_names=[SotransRole.company_manager]
49 )
50 ),
51 ],
52 order: OrderUpdateModel,
53) -> MultiselectError:
54 """Accepts assigned(req) and end_price."""
55 return await multiexec(order_ids, on_assign_subsidiary, user, order)
58@buffer_orders_router.put("/{order_ids}/to-exchange")
59async def put_order_to_exchange(
60 order_ids: str,
61 user: Annotated[
62 SotransOIDCUserModel,
63 Depends(
64 idp.get_current_user(
65 required_role_names=[SotransRole.company_manager]
66 )
67 ),
68 ],
69 order: OrderUpdateModel,
70) -> MultiselectError:
71 """Accepts start_price(req), auction_end_time(must be in result) and assigned."""
72 return await multiexec(order_ids, on_put_to_exchange, user, order)
75@buffer_orders_router.get("")
76async def get_buffer_orders(
77 user: Annotated[
78 SotransOIDCUserModel,
79 Depends(
80 idp.get_current_user(
81 required_role_names=[SotransRole.company_manager]
82 )
83 ),
84 ],
85 params: BaseGetListQueryParams = Depends(),
86) -> GenericGetListResponse[OrderDBModel]:
87 return await on_get_buffer(user, params)
90@buffer_orders_router.get("/xlsx")
91async def excel_export_buffer(
92 user: Annotated[
93 SotransOIDCUserModel,
94 Depends(
95 idp.get_current_user(
96 required_role_names=[SotransRole.company_manager]
97 )
98 ),
99 ],
100 bg_task: BackgroundTasks,
101 params: BaseGetListQueryParams = Depends(),
102) -> FileResponse:
103 path = await export_buffer(user, params)
104 bg_task.add_task(aios.remove, path)
105 return FileResponse(path)
108@buffer_orders_router.post(
109 "",
110 status_code=status.HTTP_201_CREATED,
111 responses={400: {"model": ErrorRepr}},
112)
113async def insert_into_buffer(
114 user: Annotated[
115 SotransOIDCUserModel,
116 Depends(
117 idp.get_current_user(
118 required_role_names=[SotransRole.company_manager]
119 )
120 ),
121 ],
122 order: OrderCreateModel,
123) -> OrderDBModel:
124 return await on_create_order(user, order)
127@buffer_orders_router.get("/{order_id}", responses={404: {"model": ErrorRepr}})
128async def buffer_details(
129 order_id: PydanticObjectIdPath,
130 user: Annotated[
131 SotransOIDCUserModel,
132 Depends(
133 idp.get_current_user(
134 required_role_names=[SotransRole.company_manager]
135 )
136 ),
137 ],
138 params: BaseGetOneQueryParams = Depends(),
139) -> OrderDBModel:
140 return await on_get_one_from_buffer(user, order_id, params)
143@buffer_orders_router.put(
144 "/{order_ids}/to-trash", responses={404: {"model": ErrorRepr}}
145)
146async def put_order_to_trash(
147 order_ids: str,
148 user: Annotated[
149 SotransOIDCUserModel,
150 Depends(
151 idp.get_current_user(
152 required_role_names=[SotransRole.company_manager]
153 )
154 ),
155 ],
156) -> MultiselectError:
157 return await multiexec(order_ids, on_put_to_trash, user)
160@buffer_orders_router.patch(
161 "/{order_id}", responses={400: {"model": ErrorRepr}}
162)
163async def update_order_data(
164 order_id: PydanticObjectIdPath,
165 user: Annotated[
166 SotransOIDCUserModel,
167 Depends(
168 idp.get_current_user(
169 required_role_names=[SotransRole.company_manager]
170 )
171 ),
172 ],
173 order: OrderUpdateModel,
174) -> OrderDBModel:
175 return await on_update_order(user, order_id, order)
178@buffer_orders_router.post("/xlsx")
179async def import_to_buffer(
180 user: Annotated[
181 SotransOIDCUserModel,
182 Depends(
183 idp.get_current_user(
184 required_role_names=[SotransRole.company_manager]
185 )
186 ),
187 ],
188 xls_file: UploadFile = File(...),
189) -> GenericGetListResponse[OrderDBModel]:
190 file_data = BytesIO(await xls_file.read())
191 return await on_import_to_buffer(file_data, user)