Coverage for api/services/location_connector.py: 18%
51 statements
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
1import config
2from aiohttp import ClientConnectorError, ServerDisconnectedError
3from errors import log_error, log_warning
4from exceptions import GeoAPINoResults, GeoAPIStatusException
5from logging_config import logger
6from services.microservice_connector import ServiceConnector
7from sotrans_models.models.orders.order import GeoLocation, StopAddress
8from sotrans_models.models.services.location_api import (
9 OpenStreetMapResponse,
10 OSMReverseGeoResult,
11 YandexSearchResponse,
12)
15class LocationAPIConnector(ServiceConnector):
16 async def request_line_address_with_yapi(self, line: str) -> StopAddress:
17 await self._check_client()
18 params = {"address": line}
19 try:
20 async with self.session.get(
21 f"{config.LocationConfig.LOCATION_API_URL}/api/v1.1/yandex/geocode",
22 params=params,
23 ) as resp:
24 if resp.status != 200:
25 cont = await resp.content.read()
26 logger.error(
27 f"Error on getting address [{line}]: status {resp.status}, content: {cont.decode()}"
28 )
29 raise GeoAPIStatusException
30 result = await resp.json()
31 except (ServerDisconnectedError, ClientConnectorError):
32 raise GeoAPIStatusException
33 response_model = YandexSearchResponse(**result)
34 if not response_model.results:
35 raise GeoAPINoResults
36 first_result = response_model.results[0]
37 return StopAddress(
38 line=response_model.meta.request,
39 location_address=first_result.address,
40 index=first_result.postal_code,
41 country=first_result.country,
42 province=first_result.province,
43 city=first_result.locality,
44 location=GeoLocation(
45 longitude=first_result.longitude,
46 latitude=first_result.latitude,
47 ),
48 )
50 async def request_line_address_with_osm(self, line: str) -> StopAddress:
51 await self._check_client()
52 params = {"q": line}
53 try:
54 async with self.session.get(
55 f"{config.LocationConfig.LOCATION_API_URL}/osm/address",
56 params=params,
57 ) as resp:
58 if resp.status != 200:
59 log_warning(
60 "No response from locations backend, osm."
61 f" Status:[{resp.status}], "
62 f"content:{(await resp.content.read()).decode()}"
63 )
64 raise GeoAPIStatusException
65 result = await resp.json()
66 except (ServerDisconnectedError, ClientConnectorError):
67 log_error("Connection problem on loc service")
68 raise GeoAPIStatusException
69 if "meta" in result:
70 response_model = YandexSearchResponse(**result)
71 if not response_model.results:
72 raise GeoAPINoResults
73 first_result = response_model.results[0]
74 return StopAddress(
75 line=response_model.meta.request,
76 location_address=first_result.address,
77 index=first_result.postal_code,
78 country=first_result.country,
79 province=first_result.province,
80 city=first_result.locality,
81 location=GeoLocation(
82 longitude=first_result.longitude,
83 latitude=first_result.latitude,
84 ),
85 )
86 response_model = OSMReverseGeoResult(**result)
87 province = []
88 for f in (response_model.region, response_model.state):
89 if f is not None:
90 province.append(f)
91 return StopAddress(
92 line=line,
93 location_address=line,
94 index=response_model.postcode,
95 country=response_model.country,
96 city=response_model.city,
97 province=province,
98 location=GeoLocation(
99 latitude=response_model.lat,
100 longitude=response_model.lon,
101 ),
102 )
105geo_connector = LocationAPIConnector(timeout=config.LOCATION_API_TIMEOUT)