Coverage for api/handlers/orders/exchange_orders.py: 15%

216 statements  

« prev     ^ index     » next       coverage.py v7.6.2, created at 2024-10-10 03:02 +0300

1import asyncio 

2import datetime 

3from datetime import timedelta 

4from typing import Any, Callable 

5 

6from bson import ObjectId 

7from database.entity import update_etag_and_text_search 

8from database.orders import ( 

9 confirm_order_and_return, 

10 get_confirmation_interval, 

11 get_reservation_interval, 

12 move_order_and_return, 

13 update_status_and_return, 

14) 

15from database.verification import can_apply_order 

16from exceptions import NotAcceptableHTTPError, NotFoundHTTPError 

17from handlers.authorization.check_role import has_role 

18from handlers.authorization.company_employee_access import ( 

19 assigned_subsidiary_or_logistician_for_order_query, 

20 flexible_company_assertion_query, 

21) 

22from handlers.authorization.confidential import ( 

23 hide_dict_fields_from_carrier, 

24 hide_model_fields_from_carrier, 

25) 

26from handlers.grabbers.orders import orders_data_grabber 

27from mongodb import bids_col, buf_col, orders_col, orgs_col, trash_orders_col 

28from operations.assignment import data_from_assigned, notify_if_assigned 

29from operations.orders import patch_operations 

30from pymongo import ReturnDocument 

31from services.notifications.director import notification_api 

32from services.recommendations import suggestion_api 

33from sotrans_models.models.orders.bid import BidDBModel 

34from sotrans_models.models.orders.order import ( 

35 OrderDBModel, 

36 OrderStatus, 

37 OrderUpdateModel, 

38 ResourceCheckOrderDBModel, 

39) 

40from sotrans_models.models.responses import GenericGetListResponse 

41from sotrans_models.models.roles import SotransRole 

42from sotrans_models.models.users import SotransOIDCUserModel 

43from sotrans_models.utils.text_mappers import get_orders_text_search 

44from utils.data_grabber import ( 

45 BaseGetListQueryParams, 

46 BaseGetOneQueryParams, 

47 MongoDataGrabber, 

48 adjust_search_query, 

49) 

50from utils.dt_utils import get_current_datetime 

51from utils.excel import export_orders_to_excel 

52from utils.helper import ( 

53 clean_empty_objects, 

54 etag_detalizer, 

55 get_org_oid, 

56 place_stop_indicies, 

57) 

58 

59 

60async def on_exchange_update( 

61 order_id: ObjectId, order: OrderUpdateModel, user: SotransOIDCUserModel 

62) -> OrderDBModel | dict[str, Any]: 

63 etag_q = {} if order.etag is None else {"etag": order.etag} 

64 place_stop_indicies(order) 

65 update_data = order.model_dump(exclude_none=True) 

66 clean_empty_objects(update_data) 

67 if order.carrier and order.carrier.id: 

68 carrier = await orgs_col.find_single( 

69 "_id", order.carrier, projection={"documents": 0} 

70 ) 

71 if not carrier: 

72 raise NotFoundHTTPError("organization") 

73 update_data["carrier"] = carrier 

74 update_data["status"] = OrderStatus.confirmed.value 

75 

76 await patch_operations(update_data, order) 

77 managing_q = await flexible_company_assertion_query(user) 

78 updated_order = await orders_col.collection.find_one_and_update( 

79 managing_q | {"id": order_id} | etag_q, 

80 {"$set": update_data}, 

81 return_document=ReturnDocument.AFTER, 

82 ) 

83 if not updated_order: 

84 await etag_detalizer(orders_col, etag_q, managing_q | {"id": order_id}) 

85 raise NotFoundHTTPError("order") 

86 await update_etag_and_text_search( 

87 updated_order, 

88 orders_col, 

89 OrderDBModel, 

90 get_orders_text_search, 

91 ) 

92 up_model = OrderDBModel(**updated_order) 

93 if order.carrier: 

94 asyncio.create_task( 

95 suggestion_api.remove_order(order_id, target=False) 

96 ) 

97 asyncio.create_task(notification_api.order_confirmed(up_model)) 

98 else: 

99 asyncio.create_task( 

100 suggestion_api.update_order(order_id, up_model, target=False) 

101 ) 

102 asyncio.create_task( 

103 suggestion_api.update_order(order_id, up_model, target=True) 

104 ) 

105 notify_if_assigned(order, updated_order) 

106 return up_model 

107 

108 

109async def on_confirm_order( 

110 order_id: ObjectId, 

111 user: SotransOIDCUserModel, 

112 order: OrderUpdateModel | None, 

113) -> OrderDBModel | dict[str, Any]: 

114 etag_q = {"etag": order.etag} if order and order.etag else {} 

115 managing_query = await flexible_company_assertion_query(user) 

116 update_data = await data_from_assigned(order) 

117 if order and order.carrier and order.carrier.id: 

118 carrier_org = await orgs_col.find_single( 

119 "_id", order.carrier.id, projection={"documents": 0} 

120 ) 

121 if not carrier_org: 

122 raise NotFoundHTTPError("carrier") 

123 

124 best_bid_data = await bids_col.collection.find_one( 

125 { 

126 BidDBModel.order_id: order_id, 

127 BidDBModel.carrier.id: order.carrier.id, 

128 }, 

129 sort=[("value", 1)], 

130 ) 

131 updated_order = await confirm_order_and_return( 

132 order_id, 

133 carrier_org, 

134 best_bid_data, 

135 managing_query | etag_q, 

136 update_data, 

137 ) 

138 um = OrderDBModel(**updated_order) 

139 asyncio.create_task(notification_api.order_confirmed(um)) 

140 notify_if_assigned(order, um) 

141 asyncio.create_task( 

142 suggestion_api.remove_order(order_id, target=False) 

143 ) 

144 return um 

145 

146 up_order = await orders_col.collection.find_one_and_update( 

147 managing_query 

148 | etag_q 

149 | { 

150 "id": order_id, 

151 OrderDBModel.status: OrderStatus.reserved.value, 

152 OrderDBModel.reservation_end_time: { 

153 "$gte": datetime.datetime.utcnow() 

154 }, 

155 }, 

156 { 

157 "$set": update_data 

158 | { 

159 OrderDBModel.status: OrderStatus.confirmed.value, 

160 OrderDBModel.confirmation_end_time: get_current_datetime() 

161 + timedelta(minutes=await get_confirmation_interval()), 

162 } 

163 }, 

164 return_document=ReturnDocument.AFTER, 

165 ) 

166 if not up_order: 

167 restriction_query = managing_query | { 

168 "id": order_id, 

169 OrderDBModel.status: OrderStatus.reserved.value, 

170 OrderDBModel.reservation_end_time: { 

171 "$gte": datetime.datetime.utcnow() 

172 }, 

173 } 

174 await etag_detalizer(orders_col, etag_q, restriction_query) 

175 

176 raise NotFoundHTTPError("order") 

177 

178 um = OrderDBModel(**up_order) 

179 await update_etag_and_text_search( 

180 up_order, orders_col, OrderDBModel, get_orders_text_search 

181 ) 

182 asyncio.create_task(notification_api.order_confirmed(um)) 

183 notify_if_assigned(order, um) 

184 

185 asyncio.create_task(suggestion_api.remove_order(order_id, target=False)) 

186 

187 return um 

188 

189 

190def get_if_exchange_only(where: dict | None) -> bool: 

191 exch_only = False 

192 if where: 

193 if "status" not in where: 

194 exch_only = True 

195 elif where["status"] in ( 

196 OrderStatus.exchange.value, 

197 OrderStatus.reserved.value, 

198 ): 

199 pass 

200 else: 

201 exch_only = True 

202 else: 

203 exch_only = True 

204 return exch_only 

205 

206 

207async def on_get_exchange( 

208 user: SotransOIDCUserModel, params: BaseGetListQueryParams 

209) -> GenericGetListResponse[OrderDBModel]: 

210 where = MongoDataGrabber.parse_where(params.where) 

211 exch_only = get_if_exchange_only(where) 

212 if exch_only: 

213 params.where = adjust_search_query( 

214 params.where, OrderDBModel.status, {OrderStatus.exchange.value} 

215 ) 

216 

217 orders = await orders_data_grabber.get_list(params, user) 

218 org_bids = {} 

219 if exch_only: 

220 company_bids_data = await bids_col.find_batch( 

221 { 

222 BidDBModel.carrier.id: get_org_oid(user), 

223 BidDBModel.order_id: { 

224 "$in": [ObjectId(o.id) for o in orders.items] 

225 }, 

226 } 

227 ) 

228 org_bids = { 

229 b[BidDBModel.order_id]: BidDBModel(**b) for b in company_bids_data 

230 } 

231 for order in orders.items: 

232 hide_model_fields_from_carrier(order) 

233 if exch_only: 

234 order.your_bid = org_bids.get(ObjectId(order.id)) 

235 return orders 

236 

237 

238async def on_get_my_bids( 

239 user: SotransOIDCUserModel, params: BaseGetListQueryParams 

240): 

241 where = MongoDataGrabber.parse_where(params.where) 

242 exch_only = get_if_exchange_only(where) 

243 if exch_only: 

244 params.where = adjust_search_query( 

245 params.where, OrderDBModel.status, {OrderStatus.exchange.value} 

246 ) 

247 limit = params.limit 

248 params.limit = 0 

249 skip = params.skip 

250 params.skip = 0 

251 orders = await orders_data_grabber.get_list(params, user) 

252 company_bids_data = await bids_col.find_batch( 

253 { 

254 BidDBModel.carrier.id: get_org_oid(user), 

255 BidDBModel.order_id: { 

256 "$in": [ObjectId(o.id) for o in orders.items] 

257 }, 

258 } 

259 ) 

260 org_bids = { 

261 b[BidDBModel.order_id]: BidDBModel(**b) for b in company_bids_data 

262 } 

263 orders_with_bid: list[OrderDBModel] = [] 

264 for order in orders.items: 

265 hide_model_fields_from_carrier(order) 

266 your_bid = org_bids.get(ObjectId(order.id)) 

267 if your_bid is None: 

268 continue 

269 order.your_bid = your_bid 

270 orders_with_bid.append(order) 

271 if not (limit or skip): 

272 items = orders_with_bid 

273 elif not limit: 

274 items = orders_with_bid[skip:] 

275 elif not skip: 

276 items = orders_with_bid[:limit] 

277 else: 

278 items = orders_with_bid[skip : skip + limit] 

279 return GenericGetListResponse[OrderDBModel]( 

280 items=items, total=len(orders_with_bid) 

281 ) 

282 

283 

284async def on_order_through_auction( 

285 order_id: ObjectId, 

286 user: SotransOIDCUserModel, 

287 order: OrderUpdateModel | None = None, 

288) -> OrderDBModel: 

289 if has_role(user, SotransRole.bid_service): 

290 managing_query = {} 

291 else: 

292 managing_query = await flexible_company_assertion_query(user) 

293 

294 best_bid_data = await bids_col.collection.find_one( 

295 {BidDBModel.order_id: order_id}, sort=[("value", 1)] 

296 ) 

297 

298 if not best_bid_data: 

299 raise NotAcceptableHTTPError("No auction") 

300 best_bid = BidDBModel(**best_bid_data) 

301 best_bid_owner_org_id = get_org_oid(best_bid.owner) 

302 carrier_org = ( 

303 await orgs_col.find_single( 

304 "_id", best_bid_owner_org_id, projection={"documents": 0} 

305 ) 

306 if not best_bid.carrier 

307 else best_bid.carrier.model_dump() 

308 ) 

309 update_data = await data_from_assigned(order) 

310 up_order = await orders_col.collection.find_one_and_update( 

311 managing_query | {"id": order_id}, 

312 { 

313 "$set": update_data 

314 | { 

315 OrderDBModel.status: OrderStatus.reserved.value, 

316 OrderDBModel.reservation_end_time: get_current_datetime() 

317 + timedelta(minutes=await get_reservation_interval()), 

318 OrderDBModel.carrier: carrier_org, 

319 OrderDBModel.end_price: best_bid.value, 

320 }, 

321 }, 

322 ) 

323 if up_order is None: 

324 raise NotFoundHTTPError("заказ") 

325 om = OrderDBModel(**up_order) 

326 await update_etag_and_text_search( 

327 up_order, orders_col, OrderDBModel, get_orders_text_search 

328 ) 

329 asyncio.create_task(notification_api.order_reserved(om)) 

330 notification_api.company_order_reserved(om) 

331 notification_api.company_auction_ended(om) 

332 notify_if_assigned(order, up_order) 

333 asyncio.create_task(suggestion_api.remove_order(order_id, target=False)) 

334 return om 

335 

336 

337async def on_get_exchange_order_by_id( 

338 user: SotransOIDCUserModel, 

339 order_id: ObjectId, 

340 params: BaseGetOneQueryParams, 

341) -> dict[str, Any] | OrderDBModel | ResourceCheckOrderDBModel: 

342 restriction_q = {} 

343 is_company = has_role(user, SotransRole.company_logistician) 

344 if is_company: 

345 restriction_q.update(await flexible_company_assertion_query(user)) 

346 

347 org_id = get_org_oid(user) 

348 

349 async def fill_bid(order: OrderDBModel): 

350 if order.status in (OrderStatus.exchange, OrderStatus.reserved): 

351 company_bid = await bids_col.collection.find_one( 

352 {BidDBModel.carrier.id: org_id, BidDBModel.order_id: order_id} 

353 ) 

354 if company_bid: 

355 order.your_bid = BidDBModel(**company_bid) 

356 

357 processors: list[Callable] = [fill_bid] 

358 processed: OrderDBModel = ( 

359 await orders_data_grabber.get_one_by_id_with_pattern( 

360 order_id, 

361 params, 

362 restriction_q, 

363 processors, 

364 ) 

365 ) 

366 

367 if processed.status != OrderStatus.exchange: 

368 if not is_company: 

369 hide_model_fields_from_carrier(processed) 

370 return processed 

371 order_model = ResourceCheckOrderDBModel(**processed.model_dump()) 

372 order_model.resource_check_passing = await can_apply_order( 

373 org_id, order_model.truck_body 

374 ) 

375 if not is_company: 

376 hide_model_fields_from_carrier(order_model) 

377 return order_model 

378 

379 

380async def get_order_with_resource_check( 

381 order_id: ObjectId, 

382 params: BaseGetOneQueryParams, 

383 user: SotransOIDCUserModel, 

384) -> dict[str, Any] | ResourceCheckOrderDBModel: 

385 order = await orders_data_grabber.get_one(order_id, params) 

386 if not has_role(user, SotransRole.company_logistician): 

387 hide_dict_fields_from_carrier(order) 

388 org_oid = get_org_oid(user) 

389 org = await orgs_col.find_single("_id", org_oid, projection={"_id": 1}) 

390 if org is None: 

391 raise NotFoundHTTPError("пользователь не в организации") 

392 order_model = ResourceCheckOrderDBModel(**order) 

393 order_model.resource_check_passing = await can_apply_order( 

394 org["_id"], order_model.truck_body 

395 ) 

396 return order_model 

397 

398 

399async def export_order( 

400 user: SotransOIDCUserModel, params: BaseGetListQueryParams 

401) -> str: 

402 orders = await orders_data_grabber.get_list(params, user) 

403 to_cut = not has_role(user, SotransRole.company_logistician) 

404 return await export_orders_to_excel(orders.items, to_cut) 

405 

406 

407async def on_order_to_appointment( 

408 order_id: ObjectId, 

409 user: SotransOIDCUserModel, 

410 order: OrderUpdateModel | None, 

411) -> dict[str, Any] | OrderDBModel: 

412 update_data = await data_from_assigned(order) 

413 managing_clients_query = await flexible_company_assertion_query(user) 

414 etag_q = {"etag": order.etag} if order and order.etag else {} 

415 up_order = await update_status_and_return( 

416 order_id, 

417 OrderStatus.appointment, 

418 set_=update_data, 

419 restriction=managing_clients_query | etag_q, 

420 ) 

421 asyncio.create_task(suggestion_api.remove_order(order_id, target=False)) 

422 asyncio.create_task(suggestion_api.remove_order(order_id, target=True)) 

423 notify_if_assigned(order, up_order) 

424 return up_order 

425 

426 

427async def on_put_exchange_to_buffer( 

428 order_id: ObjectId, order: OrderUpdateModel 

429) -> dict[str, Any] | OrderDBModel: 

430 asyncio.create_task(suggestion_api.remove_order(order_id, target=True)) 

431 asyncio.create_task(suggestion_api.remove_order(order_id, target=False)) 

432 update_data = await data_from_assigned(order) 

433 buffered = await move_order_and_return( 

434 order_id, orders_col, buf_col, OrderStatus.buffer, updates=update_data 

435 ) 

436 upm = OrderDBModel(**buffered) 

437 notify_if_assigned(order, upm) 

438 asyncio.create_task(notification_api.new_in_buffer(upm)) 

439 return upm 

440 

441 

442async def on_put_exchange_to_trash( 

443 order_id: ObjectId, user: SotransOIDCUserModel 

444): 

445 restriction_q = assigned_subsidiary_or_logistician_for_order_query(user) 

446 asyncio.create_task(suggestion_api.remove_order(order_id, target=True)) 

447 asyncio.create_task(suggestion_api.remove_order(order_id, target=False)) 

448 await move_order_and_return( 

449 order_id, 

450 orders_col, 

451 trash_orders_col, 

452 OrderStatus.archived, 

453 restriction_q | {OrderDBModel.status: OrderStatus.exchange.value}, 

454 {OrderDBModel.deleted_at: datetime.datetime.utcnow()}, 

455 )