Coverage for api/endpoints/orders/buffer_orders.py: 75%

48 statements  

« prev     ^ index     » next       coverage.py v7.6.2, created at 2024-10-10 03:02 +0300

1from io import BytesIO 

2 

3from aiofiles import os as aios 

4from fastapi import APIRouter, Depends, File, UploadFile 

5from handlers.orders.buffer_orders import ( 

6 on_assign_subsidiary, 

7 on_create_order, 

8 on_get_buffer, 

9 on_get_one_from_buffer, 

10 on_put_to_exchange, 

11 on_put_to_trash, 

12 on_update_order, 

13) 

14from keycloak import idp 

15from sotrans_models.models._mongo import PydanticObjectIdPath 

16from sotrans_models.models.orders.order import ( 

17 OrderCreateModel, 

18 OrderDBModel, 

19 OrderUpdateModel, 

20) 

21from sotrans_models.models.responses import ( 

22 ErrorRepr, 

23 GenericGetListResponse, 

24 MultiselectError, 

25) 

26from sotrans_models.models.roles import SotransRole 

27from sotrans_models.models.users import SotransOIDCUserModel 

28from starlette import status 

29from starlette.background import BackgroundTasks 

30from starlette.responses import FileResponse 

31from typing_extensions import Annotated 

32from utils.data_grabber import BaseGetListQueryParams, BaseGetOneQueryParams 

33from utils.excel import export_buffer, on_import_to_buffer 

34from utils.helper import multiexec 

35 

36buffer_orders_router = APIRouter( 

37 prefix="/buffer-orders", tags=["buffer", "orders"] 

38) 

39 

40 

41@buffer_orders_router.put("/{order_ids}/to-appointment") 

42async def assign_order( 

43 order_ids: str, 

44 user: Annotated[ 

45 SotransOIDCUserModel, 

46 Depends( 

47 idp.get_current_user( 

48 required_role_names=[SotransRole.company_manager] 

49 ) 

50 ), 

51 ], 

52 order: OrderUpdateModel, 

53) -> MultiselectError: 

54 """Accepts assigned(req) and end_price.""" 

55 return await multiexec(order_ids, on_assign_subsidiary, user, order) 

56 

57 

58@buffer_orders_router.put("/{order_ids}/to-exchange") 

59async def put_order_to_exchange( 

60 order_ids: str, 

61 user: Annotated[ 

62 SotransOIDCUserModel, 

63 Depends( 

64 idp.get_current_user( 

65 required_role_names=[SotransRole.company_manager] 

66 ) 

67 ), 

68 ], 

69 order: OrderUpdateModel, 

70) -> MultiselectError: 

71 """Accepts start_price(req), auction_end_time(must be in result) and assigned.""" 

72 return await multiexec(order_ids, on_put_to_exchange, user, order) 

73 

74 

75@buffer_orders_router.get("") 

76async def get_buffer_orders( 

77 user: Annotated[ 

78 SotransOIDCUserModel, 

79 Depends( 

80 idp.get_current_user( 

81 required_role_names=[SotransRole.company_manager] 

82 ) 

83 ), 

84 ], 

85 params: BaseGetListQueryParams = Depends(), 

86) -> GenericGetListResponse[OrderDBModel]: 

87 return await on_get_buffer(user, params) 

88 

89 

90@buffer_orders_router.get("/xlsx") 

91async def excel_export_buffer( 

92 user: Annotated[ 

93 SotransOIDCUserModel, 

94 Depends( 

95 idp.get_current_user( 

96 required_role_names=[SotransRole.company_manager] 

97 ) 

98 ), 

99 ], 

100 bg_task: BackgroundTasks, 

101 params: BaseGetListQueryParams = Depends(), 

102) -> FileResponse: 

103 path = await export_buffer(user, params) 

104 bg_task.add_task(aios.remove, path) 

105 return FileResponse(path) 

106 

107 

108@buffer_orders_router.post( 

109 "", 

110 status_code=status.HTTP_201_CREATED, 

111 responses={400: {"model": ErrorRepr}}, 

112) 

113async def insert_into_buffer( 

114 user: Annotated[ 

115 SotransOIDCUserModel, 

116 Depends( 

117 idp.get_current_user( 

118 required_role_names=[SotransRole.company_manager] 

119 ) 

120 ), 

121 ], 

122 order: OrderCreateModel, 

123) -> OrderDBModel: 

124 return await on_create_order(user, order) 

125 

126 

127@buffer_orders_router.get("/{order_id}", responses={404: {"model": ErrorRepr}}) 

128async def buffer_details( 

129 order_id: PydanticObjectIdPath, 

130 user: Annotated[ 

131 SotransOIDCUserModel, 

132 Depends( 

133 idp.get_current_user( 

134 required_role_names=[SotransRole.company_manager] 

135 ) 

136 ), 

137 ], 

138 params: BaseGetOneQueryParams = Depends(), 

139) -> OrderDBModel: 

140 return await on_get_one_from_buffer(user, order_id, params) 

141 

142 

143@buffer_orders_router.put( 

144 "/{order_ids}/to-trash", responses={404: {"model": ErrorRepr}} 

145) 

146async def put_order_to_trash( 

147 order_ids: str, 

148 user: Annotated[ 

149 SotransOIDCUserModel, 

150 Depends( 

151 idp.get_current_user( 

152 required_role_names=[SotransRole.company_manager] 

153 ) 

154 ), 

155 ], 

156) -> MultiselectError: 

157 return await multiexec(order_ids, on_put_to_trash, user) 

158 

159 

160@buffer_orders_router.patch( 

161 "/{order_id}", responses={400: {"model": ErrorRepr}} 

162) 

163async def update_order_data( 

164 order_id: PydanticObjectIdPath, 

165 user: Annotated[ 

166 SotransOIDCUserModel, 

167 Depends( 

168 idp.get_current_user( 

169 required_role_names=[SotransRole.company_manager] 

170 ) 

171 ), 

172 ], 

173 order: OrderUpdateModel, 

174) -> OrderDBModel: 

175 return await on_update_order(user, order_id, order) 

176 

177 

178@buffer_orders_router.post("/xlsx") 

179async def import_to_buffer( 

180 user: Annotated[ 

181 SotransOIDCUserModel, 

182 Depends( 

183 idp.get_current_user( 

184 required_role_names=[SotransRole.company_manager] 

185 ) 

186 ), 

187 ], 

188 xls_file: UploadFile = File(...), 

189) -> GenericGetListResponse[OrderDBModel]: 

190 file_data = BytesIO(await xls_file.read()) 

191 return await on_import_to_buffer(file_data, user)