Coverage for api/utils/helper.py: 22%
155 statements
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
1import itertools
2import json
3import math
4import os
5import random
6import re
7import string
8import uuid
9from typing import Any, Awaitable, Callable, TypeVar
11from bson import ObjectId
12from bson.errors import InvalidId
13from dbcc import MongoTableEngine
14from exceptions import (
15 BadParameterHTTPError,
16 GeoAPINoResults,
17 GeoAPIStatusException,
18 NotFoundHTTPError,
19 SubsidiaryIsNotAssigned,
20)
21from mongodb import users_col
22from services.location_connector import geo_connector
23from sotrans_models.models._base import EntityHash
24from sotrans_models.models.orders.order import (
25 OrderCreateModel,
26 OrderDBModel,
27 StopModel,
28)
29from sotrans_models.models.organizations import OrganizationDBModel
30from sotrans_models.models.responses import MultiselectError
31from sotrans_models.models.users import (
32 SotransOIDCUserModel,
33 SotransUserDBModel,
34)
35from starlette.exceptions import HTTPException
36from utils.concurrency import threadpool
38Order = TypeVar("Order", bound=OrderCreateModel)
41def place_stop_indicies(order: Order):
42 if order.stops:
43 i = 0
44 for _ in order.stops:
45 order.stops[i].index = i + 1
46 i += 1
49def get_org_oid(
50 executor: SotransOIDCUserModel | SotransUserDBModel,
51) -> ObjectId:
52 try:
53 if executor.organization_id is None: 53 ↛ 54line 53 didn't jump to line 54 because the condition on line 53 was never true
54 raise InvalidId
55 return ObjectId(executor.organization_id)
56 except InvalidId:
57 raise NotFoundHTTPError("организация действующего лица")
60async def get_subsidiary_oid(
61 executor: SotransOIDCUserModel | SotransUserDBModel,
62) -> ObjectId:
63 user = await users_col.collection.find_one(
64 {
65 "id": executor.sub
66 if isinstance(executor, SotransOIDCUserModel)
67 else executor.id
68 }
69 )
70 if user is None:
71 raise SubsidiaryIsNotAssigned
72 user_model = SotransUserDBModel(**user)
73 if user_model.subsidiary_id is None:
74 raise SubsidiaryIsNotAssigned
75 try:
76 return ObjectId(user_model.subsidiary_id)
77 except InvalidId:
78 raise SubsidiaryIsNotAssigned
81def stop_sort_key(s: StopModel):
82 secondary_sort_mapping = {
83 "loading": 1,
84 "unloading": 0,
85 }
86 return s.datetime, secondary_sort_mapping.get(s.stop_type)
89async def update_order_address(order_data: dict[str, Any], by_external: bool):
90 if "stops" not in order_data:
91 return
92 stops = []
93 for stop in order_data["stops"]:
94 try:
95 if by_external:
96 stop_address = (
97 await geo_connector.request_line_address_with_osm(
98 stop["address"]["line"]
99 )
100 )
101 else:
102 stop_address = (
103 await geo_connector.request_line_address_with_yapi(
104 stop["address"]["line"]
105 )
106 )
107 except (GeoAPIStatusException, GeoAPINoResults):
108 stops.append(stop)
109 continue
110 stop["address"] = stop_address.model_dump()
111 stops.append(stop)
112 stops_models = [StopModel(**stop) for stop in stops]
113 stops_with_meta = list(
114 filter(lambda s: s.datetime and s.address.location, stops_models)
115 )
116 if len(stops_with_meta) > 1:
117 try:
118 stops_with_meta.sort(key=stop_sort_key)
119 except TypeError:
120 raise BadParameterHTTPError("неверно заполнены остановки")
121 pairs = itertools.pairwise(stops_with_meta)
122 distance = 0.0
123 for a, b in pairs:
124 distance += calculate_ab_distance(
125 a.address.location.latitude,
126 a.address.location.longitude,
127 b.address.location.latitude,
128 b.address.location.longitude,
129 )
130 order_data[OrderDBModel.route_km] = int(distance)
131 order_data["stops"] = stops
134def add_prices_to_update(
135 order_data: dict,
136 update_data: dict,
137 start_price: int | None,
138 end_price: int | None,
139):
140 if not (start_price or end_price):
141 return
142 route_km = order_data.get(OrderDBModel.route_km)
143 if not route_km: # 0 or None or empty
144 return
145 if start_price:
146 update_data[OrderDBModel.start_price_km] = int(start_price / route_km)
147 if end_price:
148 update_data[OrderDBModel.end_price_km] = int(end_price / route_km)
151def clean_empty_objects(data: dict):
152 fields = (
153 OrderDBModel.carrier,
154 OrderDBModel.trailer,
155 OrderDBModel.truck,
156 OrderDBModel.driver,
157 )
158 for field in fields:
159 if field in data and "id" not in data[field]:
160 del data[field]
163def validate_search_q(search_query):
164 if len(search_query) > 32:
165 search_query = search_query[:32]
166 search_query = re.sub(r"[^а-яА-Яa-zA-Z0-9 ]", "", search_query)
167 return search_query
170def check_assigned(order):
171 assigned = (
172 order.assigned
173 and order.assigned.company
174 and (
175 order.assigned.company.employee
176 and order.assigned.company.employee.id
177 or order.assigned.company.subsidiary
178 and order.assigned.company.subsidiary.id
179 )
180 )
181 return assigned
184def calculate_ab_distance(
185 lat_a: float, lon_a: float, lat_b: float, lon_b: float
186) -> float:
187 fi_a = math.radians(lat_a)
188 fi_b = math.radians(lat_b)
189 d_lat = math.radians(lat_b - lat_a)
190 d_lon = math.radians(lon_b - lon_a)
191 a = (
192 math.sin(d_lat / 2) ** 2
193 + math.cos(fi_a) * math.cos(fi_b) * math.sin(d_lon / 2) ** 2
194 )
195 c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
196 return 6371 * c
199def is_organization_filled(org_data: dict) -> bool:
200 org = OrganizationDBModel(**org_data)
201 return bool(
202 org.registration_date
203 and org.name
204 and org.short_name
205 and org.head
206 and org.ownership_type
207 and org.logo
208 and org.legal_address
209 and org.factual_address
210 and org.phone
211 and org.email
212 and org.kpp
213 and org.ogrn
214 and org.taxation_type
215 and org.bik
216 and org.ifns_code
217 and org.bank
218 and org.contact_user
219 and org.documents
220 )
223def get_hash(entity: dict[str, Any]) -> EntityHash:
224 return str(hash(str(entity)))
227@threadpool
228def write_1c_to_file(data, endpoint_name: str):
229 os.makedirs("/files/", exist_ok=True)
230 with open(f"/files/{endpoint_name}_{uuid.uuid4()}.json", "w") as f:
231 json.dump(data, f, ensure_ascii=False)
234def get_ids_list(path: str) -> list[ObjectId]:
235 str_mb_ids = path.split(",")
236 ids = []
237 for smi in str_mb_ids:
238 try:
239 ids.append(ObjectId(smi))
240 except InvalidId:
241 raise BadParameterHTTPError(f"[{smi}] не является ObjectId")
242 return ids
245async def multiexec(
246 path: str, cb: Callable[..., Awaitable[Any]], *args, **kwargs
247) -> MultiselectError:
248 ids = get_ids_list(path)
249 id_to_error = {}
250 for i in ids:
251 try:
252 await cb(i, *args, **kwargs)
253 except HTTPException as e:
254 id_to_error[str(i)[-6:]] = e.detail
255 return MultiselectError(raised_ids=id_to_error)
258def secure_pass_generator():
259 return "".join(
260 random.SystemRandom().choice(string.ascii_uppercase + string.digits)
261 for _ in range(16)
262 )
265async def etag_detalizer(
266 engine: MongoTableEngine,
267 etag_container: dict[str, Any],
268 search_q: dict[str, Any],
269):
270 if "etag" in etag_container:
271 is_raced = await engine.collection.find_one(search_q)
272 if is_raced:
273 raise NotFoundHTTPError("данные устарели")
276async def ensure_yapi_location(order_data: dict[str, Any]):
277 if "stops" not in order_data or not order_data["stops"]:
278 return
279 for stop in order_data["stops"]:
280 if isinstance(stop.get("address"), dict):
281 for f in ("province", "index", "country", "city"):
282 if stop["address"].get(f) is not None:
283 return
284 await update_order_address(order_data, by_external=False)
287def clear_user_for_keycloak(user_data: dict[str, Any]):
288 for f in ("role", "text_search", "roles", "subsidiary_name", "password"):
289 user_data.pop(f, None)