Coverage for api/handlers/orders/external_orders.py: 15%
86 statements
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
1import asyncio
3import motor
4from dbcc import MongoTableEngine
5from errors import log_warning
6from exceptions import BadParameterHTTPError, NotFoundHTTPError
7from mongodb import buf_col, orders_col, trash_orders_col
8from operations.orders import (
9 create_order_with_second_phase,
10 prepare_to_create_order,
11)
12from pymongo import ReturnDocument
13from pymongo.errors import WriteError
14from services.notifications.director import (
15 ScrapedNotificationContainer,
16 notification_api,
17)
18from services.recommendations import suggestion_api
19from sotrans_models.models.orders.order import (
20 ExternalOrderStatus,
21 OrderCreateModel,
22 OrderDBModel,
23 OrderStatus,
24)
25from sotrans_models.models.users import SotransOIDCUserModel
26from utils.helper import update_order_address
29async def on_post_batch(
30 executor: SotransOIDCUserModel,
31 orders: list[OrderCreateModel],
32 container: ScrapedNotificationContainer,
33):
34 raw_orders = []
35 for order in orders:
36 if not order.external:
37 log_warning("No external order")
38 continue
39 order.external.original_order = order.model_dump()
40 if order.external.unique_id:
41 existing_order = await buf_col.find_single(
42 "external.unique_id", order.external.unique_id
43 )
44 if existing_order:
45 continue
46 else:
47 if not order.external.id:
48 continue
49 order.external.unique_id = (
50 f"{order.external.resource_name}_{order.external.id}"
51 )
52 raw_orders.append(
53 prepare_to_create_order(executor, order, by_external=True)
54 )
55 inserted_ids = []
56 for prep in raw_orders:
57 try:
58 order_data = await prep
59 except (BadParameterHTTPError, NotFoundHTTPError):
60 continue
61 created = await create_order_with_second_phase(order_data, buf_col)
62 inserted_ids.append(created.id)
64 inserted = await buf_col.find_batch(
65 {"id": {"$in": inserted_ids}}, projection={"client": 1}
66 )
67 if inserted:
68 asyncio.create_task(
69 notification_api.orders_scraped(
70 [OrderDBModel(**i) for i in inserted], container
71 )
72 )
75async def on_patch_batch(orders: list[OrderCreateModel]):
76 delta_fields = (
77 OrderDBModel.stops[0],
78 OrderDBModel.client_price,
79 OrderDBModel.truck_body,
80 )
82 async def update_delta(
83 collection: MongoTableEngine, update_data_f: dict, updated_f: dict
84 ):
85 prev_data = {
86 k: updated_f[k]
87 for k in delta_fields
88 if updated_f[k] != update_data_f[k]
89 }
90 try:
91 await collection.collection.update_one(
92 {"external.unique_id": order.external.unique_id},
93 {"$push": {"updated": prev_data}},
94 )
95 except WriteError:
96 await collection.collection.update_one(
97 {"external.unique_id": order.external.unique_id},
98 {"$set": {"updated": [prev_data]}},
99 )
101 for order in orders:
102 order_data = order.model_dump()
103 from_exchange_delta = {}
104 if order.external.from_exchange:
105 from_exchange_delta = {OrderDBModel.external.from_exchange: True}
106 await update_order_address(order_data, by_external=True)
107 update_data = {k: order_data[k] for k in delta_fields}
108 buffer_motor_col: motor.MotorCollection = buf_col.collection
109 updated = await buffer_motor_col.find_one_and_update(
110 {"external.unique_id": order.external.unique_id},
111 {"$set": update_data | from_exchange_delta},
112 )
113 if updated:
114 await update_delta(buf_col, update_data, updated)
115 continue
116 exchange_motor_col: motor.MotorCollection = orders_col.collection
117 updated = await exchange_motor_col.find_one_and_update(
118 {"external.unique_id": order.external.unique_id},
119 {"$set": update_data | from_exchange_delta},
120 return_document=ReturnDocument.AFTER,
121 )
122 if updated:
123 await update_delta(orders_col, update_data, updated)
124 updated_model = OrderDBModel(**updated)
125 asyncio.create_task(
126 suggestion_api.update_order(
127 updated_model.id, updated_model, target=False
128 )
129 )
130 asyncio.create_task(
131 suggestion_api.update_order(
132 updated_model.id, updated_model, target=True
133 )
134 )
137async def on_delete_batch(orders: list[OrderDBModel]):
138 for order in orders:
139 match_query = {"external.unique_id": order.external.unique_id}
140 buf_del = await buf_col.collection.find_one_and_delete(match_query)
141 if buf_del:
142 continue
143 trash_motor: motor.MotorCollection = trash_orders_col.collection
144 trash_del = await trash_motor.find_one_and_delete(match_query)
145 if trash_del:
146 continue
147 exch_del = await orders_col.collection.find_one_and_delete(
148 match_query
149 | {
150 OrderDBModel.status: {
151 "$in": [
152 OrderStatus.exchange.value,
153 OrderStatus.appointment.value,
154 ]
155 }
156 }
157 )
158 if exch_del:
159 exch_del["external"]["status"] = ExternalOrderStatus.missing.value
160 await trash_orders_col.create(exch_del)
161 oid = exch_del["id"]
162 asyncio.create_task(suggestion_api.remove_order(oid, target=False))
163 asyncio.create_task(suggestion_api.remove_order(oid, target=True))
164 continue
165 await orders_col.collection.update_one(
166 match_query
167 | {
168 OrderDBModel.status: {
169 "$in": [
170 OrderStatus.confirmed.value,
171 OrderStatus.reserved.value,
172 OrderStatus.unverified.value,
173 OrderStatus.active.value,
174 ]
175 }
176 },
177 {"$set": {"external.status": ExternalOrderStatus.missing.value}},
178 )