Coverage for api/database/orders.py: 20%
81 statements
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
1import copy
2from datetime import timedelta
3from typing import Any
5from bson import ObjectId
6from database.entity import update_etag_and_text_search
7from dbcc import MongoTableEngine
8from exceptions import NotFoundHTTPError
9from mongodb import buf_col, orders_col, settings_col, trash_orders_col
10from operations.orders import clear_documents, create_order_with_second_phase
11from pymongo import ReturnDocument
12from sotrans_models.models.admin.settings import AdminSettings
13from sotrans_models.models.orders.bid import BidDBModel
14from sotrans_models.models.orders.order import OrderDBModel, OrderStatus
15from sotrans_models.utils.text_mappers import get_orders_text_search
16from utils.clear import clear_on_move
17from utils.dt_utils import get_current_datetime
18from utils.helper import ensure_yapi_location, etag_detalizer
21async def get_confirmation_interval() -> int:
22 settings = await settings_col.collection.find_one({})
23 settings = AdminSettings(**settings) if settings else AdminSettings()
24 return settings.confirmation_interval_minutes
27async def get_reservation_interval() -> int:
28 settings = await settings_col.collection.find_one({})
29 settings = AdminSettings(**settings) if settings else AdminSettings()
30 return settings.reservation_interval_minutes
33async def confirm_order_and_return(
34 order_id: ObjectId,
35 carrier_data: dict[str, Any],
36 best_bid: dict | None = None,
37 restriction_query: dict[str, Any] | None = None,
38 update_data: dict | None = None,
39):
40 """@:param carrier_data - data directly from database"""
41 if not update_data:
42 update_data = {}
43 update = {
44 OrderDBModel.status: OrderStatus.confirmed.value,
45 OrderDBModel.confirmation_end_time: get_current_datetime()
46 + timedelta(minutes=await get_confirmation_interval()),
47 OrderDBModel.carrier: carrier_data,
48 }
49 if best_bid is not None:
50 update[OrderDBModel.best_bid] = best_bid
51 update[OrderDBModel.end_price] = best_bid[BidDBModel.value]
52 if restriction_query is None:
53 restriction_query = {}
54 order = await orders_col.collection.find_one_and_update(
55 restriction_query | {"id": order_id},
56 {"$set": update_data | update},
57 return_document=ReturnDocument.AFTER,
58 )
59 if order is None:
60 etag_container = copy.deepcopy(restriction_query)
61 restriction_query.pop("etag", None)
62 await etag_detalizer(
63 orders_col, etag_container, restriction_query | {"id": order_id}
64 )
65 raise NotFoundHTTPError("заказ")
66 await update_etag_and_text_search(
67 order, orders_col, OrderDBModel, get_orders_text_search
68 )
69 return order
72async def move_order_and_return(
73 order_id: ObjectId,
74 source_col: MongoTableEngine,
75 target_col: MongoTableEngine,
76 order_status: OrderStatus,
77 restrictions_query: dict[str, Any] | None = None,
78 updates: dict | None = None,
79 order_data: dict[str, Any] | None = None,
80) -> dict[str, Any]:
81 if restrictions_query is None:
82 restrictions_query = {}
83 if order_data:
84 await source_col.collection.delete_one(
85 restrictions_query | {"id": order_id}
86 )
87 else:
88 order_data = await source_col.collection.find_one_and_delete(
89 restrictions_query | {"id": order_id}, projection={"_id": 0}
90 )
91 if order_data is None:
92 raise NotFoundHTTPError("заказ(перемещён?)")
93 await clear_documents(order_data.get("documents"))
94 if updates is None:
95 updates = {}
96 order_data |= updates
97 order_data[OrderDBModel.status] = order_status.value
98 clear_on_move(order_data, order_status)
99 if source_col == buf_col and target_col != trash_orders_col:
100 await ensure_yapi_location(order_data)
101 created = await create_order_with_second_phase(
102 order_data, target_col, move=True
103 )
104 return created.model_dump()
107async def update_status_and_return(
108 order_id: ObjectId,
109 order_status: OrderStatus,
110 set_: dict | None = None,
111 push: dict | None = None,
112 restriction: dict | None = None,
113) -> dict[str, Any]:
114 if restriction is None:
115 restriction = {}
116 if set_ is None:
117 set_ = {}
118 if order_status in (OrderStatus.exchange, OrderStatus.appointment):
119 order_in = await orders_col.collection.find_one(
120 {"id": order_id} | restriction
121 )
122 if order_in is not None:
123 await clear_documents(order_in.get("documents"))
124 set_ |= {
125 k: None
126 for k in (
127 OrderDBModel.driver,
128 OrderDBModel.carrier,
129 OrderDBModel.truck,
130 OrderDBModel.trailer,
131 OrderDBModel.best_bid,
132 OrderDBModel.document_draft,
133 "documents",
134 )
135 }
136 update_q = {"$set": set_ | {OrderDBModel.status: order_status.value}}
137 if push:
138 update_q["$push"] = push
139 order = await orders_col.collection.find_one_and_update(
140 {"id": order_id} | restriction,
141 update_q,
142 return_document=ReturnDocument.AFTER,
143 )
144 if order is None:
145 etag_container = copy.deepcopy(restriction)
146 restriction.pop("etag", None)
147 await etag_detalizer(
148 orders_col, etag_container, restriction | {"id": order_id}
149 )
150 raise NotFoundHTTPError("order")
151 await update_etag_and_text_search(
152 order, orders_col, OrderDBModel, get_orders_text_search
153 )
154 return order