Coverage for api/handlers/orders/orders.py: 14%

156 statements  

« prev     ^ index     » next       coverage.py v7.6.2, created at 2024-10-10 03:02 +0300

1import asyncio 

2from datetime import timedelta 

3from typing import Any 

4 

5import motor 

6from bson import ObjectId 

7from database.entity import update_etag_and_text_search 

8from database.orders import ( 

9 confirm_order_and_return, 

10 get_confirmation_interval, 

11 update_status_and_return, 

12) 

13from exceptions import ( 

14 BadParameterHTTPError, 

15 NotAcceptableHTTPError, 

16 NotFoundHTTPError, 

17) 

18from handlers.authorization.company_employee_access import ( 

19 assigned_subsidiary_or_logistician_for_order_query, 

20 flexible_company_assertion_query, 

21) 

22from mongodb import orders_col, orgs_col, trash_orders_col 

23from operations.assignment import data_from_assigned, notify_if_assigned 

24from operations.orders import ( 

25 clear_documents, 

26 patch_operations, 

27 pre_process_to_exchange_auction_end_time, 

28 update_documents_data, 

29) 

30from pymongo import ReturnDocument 

31from pymongo.results import UpdateResult 

32from services.notifications.director import notification_api 

33from services.recommendations import suggestion_api 

34from sotrans_models.models.orders.order import ( 

35 OrderDBModel, 

36 OrderStatus, 

37 OrderUpdateModel, 

38) 

39from sotrans_models.models.users import SotransOIDCUserModel 

40from sotrans_models.utils.text_mappers import get_orders_text_search 

41from utils.dt_utils import get_current_datetime 

42from utils.helper import ( 

43 add_prices_to_update, 

44 check_assigned, 

45 clean_empty_objects, 

46 etag_detalizer, 

47) 

48 

49 

50async def on_orders_update( 

51 order_id: ObjectId, 

52 order: OrderUpdateModel, 

53) -> dict[str, Any] | OrderDBModel: 

54 order_data = await orders_col.find_single("id", order_id) 

55 if not order_data: 

56 raise NotFoundHTTPError("заказ") 

57 update_data = order.model_dump(exclude_unset=True) 

58 if order.carrier and order.carrier.id: 

59 carrier = await orgs_col.find_single( 

60 "_id", order.carrier.id, projection={"documents": 0} 

61 ) 

62 if not carrier: 

63 raise NotFoundHTTPError("organization") 

64 update_data["carrier"] = carrier 

65 update_data["status"] = OrderStatus.confirmed.value 

66 update_data[ 

67 OrderDBModel.confirmation_end_time 

68 ] = get_current_datetime() + timedelta(hours=1) 

69 add_prices_to_update( 

70 order_data, update_data, order.start_price, order.end_price 

71 ) 

72 clean_empty_objects(update_data) 

73 await update_documents_data(order_id, order, update_data, order_data) 

74 await patch_operations(update_data, order) 

75 etag_q = {"etag": order.etag} if order.etag else {} 

76 updated_order = await orders_col.collection.find_one_and_update( 

77 { 

78 "id": order_id, 

79 } 

80 | etag_q, 

81 {"$set": update_data}, 

82 return_document=ReturnDocument.AFTER, 

83 ) 

84 if not updated_order: 

85 await etag_detalizer( 

86 orders_col, 

87 etag_q, 

88 { 

89 "id": order_id, 

90 }, 

91 ) 

92 raise NotFoundHTTPError("заказ") 

93 await update_etag_and_text_search( 

94 updated_order, orders_col, OrderDBModel, get_orders_text_search 

95 ) 

96 up_model = OrderDBModel(**updated_order) 

97 if order.carrier and order.carrier.id: 

98 asyncio.create_task( 

99 suggestion_api.remove_order(order_id, target=False) 

100 ) 

101 asyncio.create_task(notification_api.order_confirmed(up_model)) 

102 else: 

103 asyncio.create_task( 

104 suggestion_api.update_order(order_id, up_model, target=False) 

105 ) 

106 asyncio.create_task( 

107 suggestion_api.update_order(order_id, up_model, target=True) 

108 ) 

109 notify_if_assigned(order, up_model) 

110 return up_model 

111 

112 

113def check_assignment_parameter(order: OrderUpdateModel): 

114 if not (order.assigned and order.assigned.company): 

115 raise BadParameterHTTPError("assigned") 

116 assigned = order.assigned 

117 if not ( 

118 "employee" in assigned.company.model_fields_set 

119 or "subsidiary" in assigned.company.model_fields_set 

120 ): 

121 raise BadParameterHTTPError("assigned not entered") 

122 

123 

124async def on_company_assignment( 

125 order_id, order: OrderUpdateModel 

126) -> dict[str, Any] | OrderDBModel: 

127 check_assignment_parameter(order) 

128 update_data = await data_from_assigned(order) 

129 etag_q = {} if order.etag is None else {"etag": order.etag} 

130 order = await orders_col.collection.find_one_and_update( 

131 {"id": order_id} | etag_q, 

132 {"$set": update_data}, 

133 return_document=ReturnDocument.AFTER, 

134 ) 

135 if not order: 

136 await etag_detalizer(orders_col, etag_q, {"id": order_id}) 

137 raise NotFoundHTTPError("order") 

138 await update_etag_and_text_search( 

139 order, orders_col, OrderDBModel, get_orders_text_search 

140 ) 

141 ao = OrderDBModel(**order) 

142 notification_api.assignment(ao) 

143 return order 

144 

145 

146async def on_assign_carrier( 

147 order_id: ObjectId, 

148 user: SotransOIDCUserModel, 

149 order: OrderUpdateModel, 

150) -> OrderDBModel | None: 

151 etag_q = {"etag": order.etag} if order.etag else {} 

152 if order.carrier and order.carrier.id: 

153 carrier_data = await orgs_col.find_single( 

154 "_id", order.carrier.id, projection={"documents": 0} 

155 ) 

156 if not carrier_data: 

157 raise NotFoundHTTPError("перевозчик") 

158 

159 assigned_order_q = assigned_subsidiary_or_logistician_for_order_query( 

160 user 

161 ) 

162 update_data = await data_from_assigned(order) 

163 order_in = await orders_col.find_single("id", order_id) 

164 if order_in is None: 

165 raise NotFoundHTTPError("заказ") 

166 if (order_in.get(OrderDBModel.carrier) or {}).get( 

167 "id" 

168 ) == order.carrier.id: 

169 raise BadParameterHTTPError("перевозчики совпадают") 

170 if not (order_in.get(OrderDBModel.end_price) or order.end_price): 

171 raise BadParameterHTTPError("конечная цена") 

172 to_renew = False 

173 order_status_str: str | None = order_in.get(OrderDBModel.status) 

174 if order_status_str in ( 

175 ost.value 

176 for ost in ( 

177 OrderStatus.unverified, 

178 OrderStatus.active, 

179 OrderStatus.confirmed, 

180 ) 

181 ): 

182 to_renew = True 

183 update_data["end_price"] = order.end_price 

184 add_prices_to_update( 

185 order_in, update_data, order.start_price, order.end_price 

186 ) 

187 if to_renew is True: 

188 update_data |= { 

189 OrderDBModel.driver: None, 

190 OrderDBModel.trailer: None, 

191 OrderDBModel.truck: None, 

192 OrderDBModel.document_draft: None, 

193 OrderDBModel.documents[0]: None, 

194 } 

195 carrier_added = await confirm_order_and_return( 

196 order_id, 

197 carrier_data, 

198 restriction_query=assigned_order_q | etag_q, 

199 update_data=update_data, 

200 ) 

201 await clear_documents(order_in.get("documents")) 

202 is_assigned = check_assigned(order) 

203 order_with_new_carrier = OrderDBModel(**carrier_added) 

204 if is_assigned: 

205 notification_api.assignment(order_with_new_carrier) 

206 asyncio.create_task( 

207 notification_api.order_confirmed(order_with_new_carrier) 

208 ) 

209 asyncio.create_task( 

210 notification_api.order_canceled(OrderDBModel(**order_in)) 

211 ) 

212 return carrier_added 

213 update_search_q = { 

214 "id": order_id, 

215 OrderDBModel.status: OrderStatus.canceled.value, 

216 OrderDBModel.carrier: {"$ne": None}, 

217 } 

218 updated: UpdateResult = await orders_col.collection.update_one( 

219 update_search_q | etag_q, 

220 { 

221 "$set": { 

222 OrderDBModel.status: OrderStatus.confirmed.value, 

223 OrderDBModel.confirmation_end_time: get_current_datetime() 

224 + timedelta(minutes=await get_confirmation_interval()), 

225 } 

226 }, 

227 ) 

228 if updated.matched_count == 0: 

229 await etag_detalizer(orders_col, etag_q, update_search_q) 

230 raise NotFoundHTTPError("заказ") 

231 return None 

232 

233 

234async def on_multi_trash(ids: list[ObjectId]) -> bool: 

235 orders = await orders_col.find_batch( 

236 {"id": {"$in": ids}}, projection={"_id": 0} 

237 ) 

238 motor_trash: motor.MotorCollection = trash_orders_col.collection 

239 inserted = await motor_trash.insert_many(orders) 

240 rm_tasks = [suggestion_api.remove_order(i, target=True) for i in ids] 

241 rm_tasks.extend(suggestion_api.remove_order(i, target=False) for i in ids) 

242 asyncio.gather(*rm_tasks) 

243 await orders_col.collection.delete_many({"_id": {"$in": ids}}) 

244 return inserted.acknowledged 

245 

246 

247async def on_to_exchange( 

248 order_id: ObjectId, 

249 order: OrderUpdateModel | None, 

250 user: SotransOIDCUserModel, 

251): 

252 etag_q = {"etag": order.etag} if order and order.etag else {} 

253 update_data = await data_from_assigned(order) 

254 restriction_q = await flexible_company_assertion_query(user) 

255 order_data = await orders_col.collection.find_one( 

256 {"id": order_id} | restriction_q | etag_q 

257 ) 

258 if order_data is None: 

259 await etag_detalizer( 

260 orders_col, etag_q, {"id": order_id} | restriction_q 

261 ) 

262 raise NotFoundHTTPError("заказ") 

263 had_status = order_data.get(OrderDBModel.status) 

264 if had_status in (OrderStatus.active, OrderStatus.exchange): 

265 raise NotAcceptableHTTPError("неподходящий статус") 

266 update_start_price = order and order.start_price 

267 if not (order_data.get(OrderDBModel.start_price) or update_start_price): 

268 raise BadParameterHTTPError("нет начальной цены") 

269 order_start_price_q = ( 

270 {OrderDBModel.start_price: update_start_price} 

271 if update_start_price 

272 else {} 

273 ) 

274 

275 pre_process_to_exchange_auction_end_time(order_data, order, update_data) 

276 

277 add_prices_to_update( 

278 order_data, 

279 update_data, 

280 order and order.start_price, 

281 order and order.end_price, 

282 ) 

283 order_up = await update_status_and_return( 

284 order_id, 

285 OrderStatus.exchange, 

286 set_=update_data | order_start_price_q, 

287 restriction=restriction_q | etag_q, 

288 ) 

289 if not order_up: 

290 raise NotFoundHTTPError("заказ") 

291 order_model = OrderDBModel(**order_up) 

292 notify_if_assigned(order, order_model) 

293 

294 # Suggestion flow 

295 # From appointment - add target and rec 

296 # From confirmed, unverified, reserved, canceled - we add rec for existing target 

297 

298 if had_status == OrderStatus.appointment: 

299 asyncio.create_task( 

300 suggestion_api.create_target_and_recommendation_order(order_model) 

301 ) 

302 asyncio.create_task( 

303 suggestion_api.create_target_and_recommendation_order( 

304 order_model, target=False 

305 ) 

306 ) 

307 

308 

309async def on_to_appointment( 

310 order_id: ObjectId, 

311 order: OrderUpdateModel | None, 

312 user: SotransOIDCUserModel, 

313): 

314 prev_order_status_container = await orders_col.find_single( 

315 "id", order, projection={OrderDBModel.status: 1} 

316 ) 

317 if prev_order_status_container is None: 

318 raise NotFoundHTTPError("заказ") 

319 prev_order_status_value = prev_order_status_container.get( 

320 OrderDBModel.status 

321 ) 

322 if prev_order_status_value in ( 

323 OrderStatus.active, 

324 OrderStatus.appointment, 

325 ): 

326 raise NotAcceptableHTTPError("неподходящий статус") 

327 update_data = await data_from_assigned(order) 

328 managing_clients_query = await flexible_company_assertion_query(user) 

329 etag_q = {"etag": order.etag} if order and order.etag else {} 

330 up_order = await update_status_and_return( 

331 order_id, 

332 OrderStatus.appointment, 

333 set_=update_data, 

334 restriction=managing_clients_query | etag_q, 

335 ) 

336 

337 # Suggestion flow 

338 # for exchange - rm from target and rec 

339 # from confirmed, reserved, unverified, canceled - rm from target (as it is not in recommendations) 

340 

341 if prev_order_status_value == OrderStatus.exchange: 

342 asyncio.create_task( 

343 suggestion_api.remove_order(order_id, target=False) 

344 ) 

345 asyncio.create_task(suggestion_api.remove_order(order_id, target=True)) 

346 notify_if_assigned(order, up_order)