Coverage for api/handlers/orders/exchange_orders.py: 15%
216 statements
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
1import asyncio
2import datetime
3from datetime import timedelta
4from typing import Any, Callable
6from bson import ObjectId
7from database.entity import update_etag_and_text_search
8from database.orders import (
9 confirm_order_and_return,
10 get_confirmation_interval,
11 get_reservation_interval,
12 move_order_and_return,
13 update_status_and_return,
14)
15from database.verification import can_apply_order
16from exceptions import NotAcceptableHTTPError, NotFoundHTTPError
17from handlers.authorization.check_role import has_role
18from handlers.authorization.company_employee_access import (
19 assigned_subsidiary_or_logistician_for_order_query,
20 flexible_company_assertion_query,
21)
22from handlers.authorization.confidential import (
23 hide_dict_fields_from_carrier,
24 hide_model_fields_from_carrier,
25)
26from handlers.grabbers.orders import orders_data_grabber
27from mongodb import bids_col, buf_col, orders_col, orgs_col, trash_orders_col
28from operations.assignment import data_from_assigned, notify_if_assigned
29from operations.orders import patch_operations
30from pymongo import ReturnDocument
31from services.notifications.director import notification_api
32from services.recommendations import suggestion_api
33from sotrans_models.models.orders.bid import BidDBModel
34from sotrans_models.models.orders.order import (
35 OrderDBModel,
36 OrderStatus,
37 OrderUpdateModel,
38 ResourceCheckOrderDBModel,
39)
40from sotrans_models.models.responses import GenericGetListResponse
41from sotrans_models.models.roles import SotransRole
42from sotrans_models.models.users import SotransOIDCUserModel
43from sotrans_models.utils.text_mappers import get_orders_text_search
44from utils.data_grabber import (
45 BaseGetListQueryParams,
46 BaseGetOneQueryParams,
47 MongoDataGrabber,
48 adjust_search_query,
49)
50from utils.dt_utils import get_current_datetime
51from utils.excel import export_orders_to_excel
52from utils.helper import (
53 clean_empty_objects,
54 etag_detalizer,
55 get_org_oid,
56 place_stop_indicies,
57)
60async def on_exchange_update(
61 order_id: ObjectId, order: OrderUpdateModel, user: SotransOIDCUserModel
62) -> OrderDBModel | dict[str, Any]:
63 etag_q = {} if order.etag is None else {"etag": order.etag}
64 place_stop_indicies(order)
65 update_data = order.model_dump(exclude_none=True)
66 clean_empty_objects(update_data)
67 if order.carrier and order.carrier.id:
68 carrier = await orgs_col.find_single(
69 "_id", order.carrier, projection={"documents": 0}
70 )
71 if not carrier:
72 raise NotFoundHTTPError("organization")
73 update_data["carrier"] = carrier
74 update_data["status"] = OrderStatus.confirmed.value
76 await patch_operations(update_data, order)
77 managing_q = await flexible_company_assertion_query(user)
78 updated_order = await orders_col.collection.find_one_and_update(
79 managing_q | {"id": order_id} | etag_q,
80 {"$set": update_data},
81 return_document=ReturnDocument.AFTER,
82 )
83 if not updated_order:
84 await etag_detalizer(orders_col, etag_q, managing_q | {"id": order_id})
85 raise NotFoundHTTPError("order")
86 await update_etag_and_text_search(
87 updated_order,
88 orders_col,
89 OrderDBModel,
90 get_orders_text_search,
91 )
92 up_model = OrderDBModel(**updated_order)
93 if order.carrier:
94 asyncio.create_task(
95 suggestion_api.remove_order(order_id, target=False)
96 )
97 asyncio.create_task(notification_api.order_confirmed(up_model))
98 else:
99 asyncio.create_task(
100 suggestion_api.update_order(order_id, up_model, target=False)
101 )
102 asyncio.create_task(
103 suggestion_api.update_order(order_id, up_model, target=True)
104 )
105 notify_if_assigned(order, updated_order)
106 return up_model
109async def on_confirm_order(
110 order_id: ObjectId,
111 user: SotransOIDCUserModel,
112 order: OrderUpdateModel | None,
113) -> OrderDBModel | dict[str, Any]:
114 etag_q = {"etag": order.etag} if order and order.etag else {}
115 managing_query = await flexible_company_assertion_query(user)
116 update_data = await data_from_assigned(order)
117 if order and order.carrier and order.carrier.id:
118 carrier_org = await orgs_col.find_single(
119 "_id", order.carrier.id, projection={"documents": 0}
120 )
121 if not carrier_org:
122 raise NotFoundHTTPError("carrier")
124 best_bid_data = await bids_col.collection.find_one(
125 {
126 BidDBModel.order_id: order_id,
127 BidDBModel.carrier.id: order.carrier.id,
128 },
129 sort=[("value", 1)],
130 )
131 updated_order = await confirm_order_and_return(
132 order_id,
133 carrier_org,
134 best_bid_data,
135 managing_query | etag_q,
136 update_data,
137 )
138 um = OrderDBModel(**updated_order)
139 asyncio.create_task(notification_api.order_confirmed(um))
140 notify_if_assigned(order, um)
141 asyncio.create_task(
142 suggestion_api.remove_order(order_id, target=False)
143 )
144 return um
146 up_order = await orders_col.collection.find_one_and_update(
147 managing_query
148 | etag_q
149 | {
150 "id": order_id,
151 OrderDBModel.status: OrderStatus.reserved.value,
152 OrderDBModel.reservation_end_time: {
153 "$gte": datetime.datetime.utcnow()
154 },
155 },
156 {
157 "$set": update_data
158 | {
159 OrderDBModel.status: OrderStatus.confirmed.value,
160 OrderDBModel.confirmation_end_time: get_current_datetime()
161 + timedelta(minutes=await get_confirmation_interval()),
162 }
163 },
164 return_document=ReturnDocument.AFTER,
165 )
166 if not up_order:
167 restriction_query = managing_query | {
168 "id": order_id,
169 OrderDBModel.status: OrderStatus.reserved.value,
170 OrderDBModel.reservation_end_time: {
171 "$gte": datetime.datetime.utcnow()
172 },
173 }
174 await etag_detalizer(orders_col, etag_q, restriction_query)
176 raise NotFoundHTTPError("order")
178 um = OrderDBModel(**up_order)
179 await update_etag_and_text_search(
180 up_order, orders_col, OrderDBModel, get_orders_text_search
181 )
182 asyncio.create_task(notification_api.order_confirmed(um))
183 notify_if_assigned(order, um)
185 asyncio.create_task(suggestion_api.remove_order(order_id, target=False))
187 return um
190def get_if_exchange_only(where: dict | None) -> bool:
191 exch_only = False
192 if where:
193 if "status" not in where:
194 exch_only = True
195 elif where["status"] in (
196 OrderStatus.exchange.value,
197 OrderStatus.reserved.value,
198 ):
199 pass
200 else:
201 exch_only = True
202 else:
203 exch_only = True
204 return exch_only
207async def on_get_exchange(
208 user: SotransOIDCUserModel, params: BaseGetListQueryParams
209) -> GenericGetListResponse[OrderDBModel]:
210 where = MongoDataGrabber.parse_where(params.where)
211 exch_only = get_if_exchange_only(where)
212 if exch_only:
213 params.where = adjust_search_query(
214 params.where, OrderDBModel.status, {OrderStatus.exchange.value}
215 )
217 orders = await orders_data_grabber.get_list(params, user)
218 org_bids = {}
219 if exch_only:
220 company_bids_data = await bids_col.find_batch(
221 {
222 BidDBModel.carrier.id: get_org_oid(user),
223 BidDBModel.order_id: {
224 "$in": [ObjectId(o.id) for o in orders.items]
225 },
226 }
227 )
228 org_bids = {
229 b[BidDBModel.order_id]: BidDBModel(**b) for b in company_bids_data
230 }
231 for order in orders.items:
232 hide_model_fields_from_carrier(order)
233 if exch_only:
234 order.your_bid = org_bids.get(ObjectId(order.id))
235 return orders
238async def on_get_my_bids(
239 user: SotransOIDCUserModel, params: BaseGetListQueryParams
240):
241 where = MongoDataGrabber.parse_where(params.where)
242 exch_only = get_if_exchange_only(where)
243 if exch_only:
244 params.where = adjust_search_query(
245 params.where, OrderDBModel.status, {OrderStatus.exchange.value}
246 )
247 limit = params.limit
248 params.limit = 0
249 skip = params.skip
250 params.skip = 0
251 orders = await orders_data_grabber.get_list(params, user)
252 company_bids_data = await bids_col.find_batch(
253 {
254 BidDBModel.carrier.id: get_org_oid(user),
255 BidDBModel.order_id: {
256 "$in": [ObjectId(o.id) for o in orders.items]
257 },
258 }
259 )
260 org_bids = {
261 b[BidDBModel.order_id]: BidDBModel(**b) for b in company_bids_data
262 }
263 orders_with_bid: list[OrderDBModel] = []
264 for order in orders.items:
265 hide_model_fields_from_carrier(order)
266 your_bid = org_bids.get(ObjectId(order.id))
267 if your_bid is None:
268 continue
269 order.your_bid = your_bid
270 orders_with_bid.append(order)
271 if not (limit or skip):
272 items = orders_with_bid
273 elif not limit:
274 items = orders_with_bid[skip:]
275 elif not skip:
276 items = orders_with_bid[:limit]
277 else:
278 items = orders_with_bid[skip : skip + limit]
279 return GenericGetListResponse[OrderDBModel](
280 items=items, total=len(orders_with_bid)
281 )
284async def on_order_through_auction(
285 order_id: ObjectId,
286 user: SotransOIDCUserModel,
287 order: OrderUpdateModel | None = None,
288) -> OrderDBModel:
289 if has_role(user, SotransRole.bid_service):
290 managing_query = {}
291 else:
292 managing_query = await flexible_company_assertion_query(user)
294 best_bid_data = await bids_col.collection.find_one(
295 {BidDBModel.order_id: order_id}, sort=[("value", 1)]
296 )
298 if not best_bid_data:
299 raise NotAcceptableHTTPError("No auction")
300 best_bid = BidDBModel(**best_bid_data)
301 best_bid_owner_org_id = get_org_oid(best_bid.owner)
302 carrier_org = (
303 await orgs_col.find_single(
304 "_id", best_bid_owner_org_id, projection={"documents": 0}
305 )
306 if not best_bid.carrier
307 else best_bid.carrier.model_dump()
308 )
309 update_data = await data_from_assigned(order)
310 up_order = await orders_col.collection.find_one_and_update(
311 managing_query | {"id": order_id},
312 {
313 "$set": update_data
314 | {
315 OrderDBModel.status: OrderStatus.reserved.value,
316 OrderDBModel.reservation_end_time: get_current_datetime()
317 + timedelta(minutes=await get_reservation_interval()),
318 OrderDBModel.carrier: carrier_org,
319 OrderDBModel.end_price: best_bid.value,
320 },
321 },
322 )
323 if up_order is None:
324 raise NotFoundHTTPError("заказ")
325 om = OrderDBModel(**up_order)
326 await update_etag_and_text_search(
327 up_order, orders_col, OrderDBModel, get_orders_text_search
328 )
329 asyncio.create_task(notification_api.order_reserved(om))
330 notification_api.company_order_reserved(om)
331 notification_api.company_auction_ended(om)
332 notify_if_assigned(order, up_order)
333 asyncio.create_task(suggestion_api.remove_order(order_id, target=False))
334 return om
337async def on_get_exchange_order_by_id(
338 user: SotransOIDCUserModel,
339 order_id: ObjectId,
340 params: BaseGetOneQueryParams,
341) -> dict[str, Any] | OrderDBModel | ResourceCheckOrderDBModel:
342 restriction_q = {}
343 is_company = has_role(user, SotransRole.company_logistician)
344 if is_company:
345 restriction_q.update(await flexible_company_assertion_query(user))
347 org_id = get_org_oid(user)
349 async def fill_bid(order: OrderDBModel):
350 if order.status in (OrderStatus.exchange, OrderStatus.reserved):
351 company_bid = await bids_col.collection.find_one(
352 {BidDBModel.carrier.id: org_id, BidDBModel.order_id: order_id}
353 )
354 if company_bid:
355 order.your_bid = BidDBModel(**company_bid)
357 processors: list[Callable] = [fill_bid]
358 processed: OrderDBModel = (
359 await orders_data_grabber.get_one_by_id_with_pattern(
360 order_id,
361 params,
362 restriction_q,
363 processors,
364 )
365 )
367 if processed.status != OrderStatus.exchange:
368 if not is_company:
369 hide_model_fields_from_carrier(processed)
370 return processed
371 order_model = ResourceCheckOrderDBModel(**processed.model_dump())
372 order_model.resource_check_passing = await can_apply_order(
373 org_id, order_model.truck_body
374 )
375 if not is_company:
376 hide_model_fields_from_carrier(order_model)
377 return order_model
380async def get_order_with_resource_check(
381 order_id: ObjectId,
382 params: BaseGetOneQueryParams,
383 user: SotransOIDCUserModel,
384) -> dict[str, Any] | ResourceCheckOrderDBModel:
385 order = await orders_data_grabber.get_one(order_id, params)
386 if not has_role(user, SotransRole.company_logistician):
387 hide_dict_fields_from_carrier(order)
388 org_oid = get_org_oid(user)
389 org = await orgs_col.find_single("_id", org_oid, projection={"_id": 1})
390 if org is None:
391 raise NotFoundHTTPError("пользователь не в организации")
392 order_model = ResourceCheckOrderDBModel(**order)
393 order_model.resource_check_passing = await can_apply_order(
394 org["_id"], order_model.truck_body
395 )
396 return order_model
399async def export_order(
400 user: SotransOIDCUserModel, params: BaseGetListQueryParams
401) -> str:
402 orders = await orders_data_grabber.get_list(params, user)
403 to_cut = not has_role(user, SotransRole.company_logistician)
404 return await export_orders_to_excel(orders.items, to_cut)
407async def on_order_to_appointment(
408 order_id: ObjectId,
409 user: SotransOIDCUserModel,
410 order: OrderUpdateModel | None,
411) -> dict[str, Any] | OrderDBModel:
412 update_data = await data_from_assigned(order)
413 managing_clients_query = await flexible_company_assertion_query(user)
414 etag_q = {"etag": order.etag} if order and order.etag else {}
415 up_order = await update_status_and_return(
416 order_id,
417 OrderStatus.appointment,
418 set_=update_data,
419 restriction=managing_clients_query | etag_q,
420 )
421 asyncio.create_task(suggestion_api.remove_order(order_id, target=False))
422 asyncio.create_task(suggestion_api.remove_order(order_id, target=True))
423 notify_if_assigned(order, up_order)
424 return up_order
427async def on_put_exchange_to_buffer(
428 order_id: ObjectId, order: OrderUpdateModel
429) -> dict[str, Any] | OrderDBModel:
430 asyncio.create_task(suggestion_api.remove_order(order_id, target=True))
431 asyncio.create_task(suggestion_api.remove_order(order_id, target=False))
432 update_data = await data_from_assigned(order)
433 buffered = await move_order_and_return(
434 order_id, orders_col, buf_col, OrderStatus.buffer, updates=update_data
435 )
436 upm = OrderDBModel(**buffered)
437 notify_if_assigned(order, upm)
438 asyncio.create_task(notification_api.new_in_buffer(upm))
439 return upm
442async def on_put_exchange_to_trash(
443 order_id: ObjectId, user: SotransOIDCUserModel
444):
445 restriction_q = assigned_subsidiary_or_logistician_for_order_query(user)
446 asyncio.create_task(suggestion_api.remove_order(order_id, target=True))
447 asyncio.create_task(suggestion_api.remove_order(order_id, target=False))
448 await move_order_and_return(
449 order_id,
450 orders_col,
451 trash_orders_col,
452 OrderStatus.archived,
453 restriction_q | {OrderDBModel.status: OrderStatus.exchange.value},
454 {OrderDBModel.deleted_at: datetime.datetime.utcnow()},
455 )