Coverage for api/utils/helper.py: 22%

155 statements  

« prev     ^ index     » next       coverage.py v7.6.2, created at 2024-10-10 03:02 +0300

1import itertools 

2import json 

3import math 

4import os 

5import random 

6import re 

7import string 

8import uuid 

9from typing import Any, Awaitable, Callable, TypeVar 

10 

11from bson import ObjectId 

12from bson.errors import InvalidId 

13from dbcc import MongoTableEngine 

14from exceptions import ( 

15 BadParameterHTTPError, 

16 GeoAPINoResults, 

17 GeoAPIStatusException, 

18 NotFoundHTTPError, 

19 SubsidiaryIsNotAssigned, 

20) 

21from mongodb import users_col 

22from services.location_connector import geo_connector 

23from sotrans_models.models._base import EntityHash 

24from sotrans_models.models.orders.order import ( 

25 OrderCreateModel, 

26 OrderDBModel, 

27 StopModel, 

28) 

29from sotrans_models.models.organizations import OrganizationDBModel 

30from sotrans_models.models.responses import MultiselectError 

31from sotrans_models.models.users import ( 

32 SotransOIDCUserModel, 

33 SotransUserDBModel, 

34) 

35from starlette.exceptions import HTTPException 

36from utils.concurrency import threadpool 

37 

38Order = TypeVar("Order", bound=OrderCreateModel) 

39 

40 

41def place_stop_indicies(order: Order): 

42 if order.stops: 

43 i = 0 

44 for _ in order.stops: 

45 order.stops[i].index = i + 1 

46 i += 1 

47 

48 

49def get_org_oid( 

50 executor: SotransOIDCUserModel | SotransUserDBModel, 

51) -> ObjectId: 

52 try: 

53 if executor.organization_id is None: 53 ↛ 54line 53 didn't jump to line 54 because the condition on line 53 was never true

54 raise InvalidId 

55 return ObjectId(executor.organization_id) 

56 except InvalidId: 

57 raise NotFoundHTTPError("организация действующего лица") 

58 

59 

60async def get_subsidiary_oid( 

61 executor: SotransOIDCUserModel | SotransUserDBModel, 

62) -> ObjectId: 

63 user = await users_col.collection.find_one( 

64 { 

65 "id": executor.sub 

66 if isinstance(executor, SotransOIDCUserModel) 

67 else executor.id 

68 } 

69 ) 

70 if user is None: 

71 raise SubsidiaryIsNotAssigned 

72 user_model = SotransUserDBModel(**user) 

73 if user_model.subsidiary_id is None: 

74 raise SubsidiaryIsNotAssigned 

75 try: 

76 return ObjectId(user_model.subsidiary_id) 

77 except InvalidId: 

78 raise SubsidiaryIsNotAssigned 

79 

80 

81def stop_sort_key(s: StopModel): 

82 secondary_sort_mapping = { 

83 "loading": 1, 

84 "unloading": 0, 

85 } 

86 return s.datetime, secondary_sort_mapping.get(s.stop_type) 

87 

88 

89async def update_order_address(order_data: dict[str, Any], by_external: bool): 

90 if "stops" not in order_data: 

91 return 

92 stops = [] 

93 for stop in order_data["stops"]: 

94 try: 

95 if by_external: 

96 stop_address = ( 

97 await geo_connector.request_line_address_with_osm( 

98 stop["address"]["line"] 

99 ) 

100 ) 

101 else: 

102 stop_address = ( 

103 await geo_connector.request_line_address_with_yapi( 

104 stop["address"]["line"] 

105 ) 

106 ) 

107 except (GeoAPIStatusException, GeoAPINoResults): 

108 stops.append(stop) 

109 continue 

110 stop["address"] = stop_address.model_dump() 

111 stops.append(stop) 

112 stops_models = [StopModel(**stop) for stop in stops] 

113 stops_with_meta = list( 

114 filter(lambda s: s.datetime and s.address.location, stops_models) 

115 ) 

116 if len(stops_with_meta) > 1: 

117 try: 

118 stops_with_meta.sort(key=stop_sort_key) 

119 except TypeError: 

120 raise BadParameterHTTPError("неверно заполнены остановки") 

121 pairs = itertools.pairwise(stops_with_meta) 

122 distance = 0.0 

123 for a, b in pairs: 

124 distance += calculate_ab_distance( 

125 a.address.location.latitude, 

126 a.address.location.longitude, 

127 b.address.location.latitude, 

128 b.address.location.longitude, 

129 ) 

130 order_data[OrderDBModel.route_km] = int(distance) 

131 order_data["stops"] = stops 

132 

133 

134def add_prices_to_update( 

135 order_data: dict, 

136 update_data: dict, 

137 start_price: int | None, 

138 end_price: int | None, 

139): 

140 if not (start_price or end_price): 

141 return 

142 route_km = order_data.get(OrderDBModel.route_km) 

143 if not route_km: # 0 or None or empty 

144 return 

145 if start_price: 

146 update_data[OrderDBModel.start_price_km] = int(start_price / route_km) 

147 if end_price: 

148 update_data[OrderDBModel.end_price_km] = int(end_price / route_km) 

149 

150 

151def clean_empty_objects(data: dict): 

152 fields = ( 

153 OrderDBModel.carrier, 

154 OrderDBModel.trailer, 

155 OrderDBModel.truck, 

156 OrderDBModel.driver, 

157 ) 

158 for field in fields: 

159 if field in data and "id" not in data[field]: 

160 del data[field] 

161 

162 

163def validate_search_q(search_query): 

164 if len(search_query) > 32: 

165 search_query = search_query[:32] 

166 search_query = re.sub(r"[^а-яА-Яa-zA-Z0-9 ]", "", search_query) 

167 return search_query 

168 

169 

170def check_assigned(order): 

171 assigned = ( 

172 order.assigned 

173 and order.assigned.company 

174 and ( 

175 order.assigned.company.employee 

176 and order.assigned.company.employee.id 

177 or order.assigned.company.subsidiary 

178 and order.assigned.company.subsidiary.id 

179 ) 

180 ) 

181 return assigned 

182 

183 

184def calculate_ab_distance( 

185 lat_a: float, lon_a: float, lat_b: float, lon_b: float 

186) -> float: 

187 fi_a = math.radians(lat_a) 

188 fi_b = math.radians(lat_b) 

189 d_lat = math.radians(lat_b - lat_a) 

190 d_lon = math.radians(lon_b - lon_a) 

191 a = ( 

192 math.sin(d_lat / 2) ** 2 

193 + math.cos(fi_a) * math.cos(fi_b) * math.sin(d_lon / 2) ** 2 

194 ) 

195 c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) 

196 return 6371 * c 

197 

198 

199def is_organization_filled(org_data: dict) -> bool: 

200 org = OrganizationDBModel(**org_data) 

201 return bool( 

202 org.registration_date 

203 and org.name 

204 and org.short_name 

205 and org.head 

206 and org.ownership_type 

207 and org.logo 

208 and org.legal_address 

209 and org.factual_address 

210 and org.phone 

211 and org.email 

212 and org.kpp 

213 and org.ogrn 

214 and org.taxation_type 

215 and org.bik 

216 and org.ifns_code 

217 and org.bank 

218 and org.contact_user 

219 and org.documents 

220 ) 

221 

222 

223def get_hash(entity: dict[str, Any]) -> EntityHash: 

224 return str(hash(str(entity))) 

225 

226 

227@threadpool 

228def write_1c_to_file(data, endpoint_name: str): 

229 os.makedirs("/files/", exist_ok=True) 

230 with open(f"/files/{endpoint_name}_{uuid.uuid4()}.json", "w") as f: 

231 json.dump(data, f, ensure_ascii=False) 

232 

233 

234def get_ids_list(path: str) -> list[ObjectId]: 

235 str_mb_ids = path.split(",") 

236 ids = [] 

237 for smi in str_mb_ids: 

238 try: 

239 ids.append(ObjectId(smi)) 

240 except InvalidId: 

241 raise BadParameterHTTPError(f"[{smi}] не является ObjectId") 

242 return ids 

243 

244 

245async def multiexec( 

246 path: str, cb: Callable[..., Awaitable[Any]], *args, **kwargs 

247) -> MultiselectError: 

248 ids = get_ids_list(path) 

249 id_to_error = {} 

250 for i in ids: 

251 try: 

252 await cb(i, *args, **kwargs) 

253 except HTTPException as e: 

254 id_to_error[str(i)[-6:]] = e.detail 

255 return MultiselectError(raised_ids=id_to_error) 

256 

257 

258def secure_pass_generator(): 

259 return "".join( 

260 random.SystemRandom().choice(string.ascii_uppercase + string.digits) 

261 for _ in range(16) 

262 ) 

263 

264 

265async def etag_detalizer( 

266 engine: MongoTableEngine, 

267 etag_container: dict[str, Any], 

268 search_q: dict[str, Any], 

269): 

270 if "etag" in etag_container: 

271 is_raced = await engine.collection.find_one(search_q) 

272 if is_raced: 

273 raise NotFoundHTTPError("данные устарели") 

274 

275 

276async def ensure_yapi_location(order_data: dict[str, Any]): 

277 if "stops" not in order_data or not order_data["stops"]: 

278 return 

279 for stop in order_data["stops"]: 

280 if isinstance(stop.get("address"), dict): 

281 for f in ("province", "index", "country", "city"): 

282 if stop["address"].get(f) is not None: 

283 return 

284 await update_order_address(order_data, by_external=False) 

285 

286 

287def clear_user_for_keycloak(user_data: dict[str, Any]): 

288 for f in ("role", "text_search", "roles", "subsidiary_name", "password"): 

289 user_data.pop(f, None)