Coverage for api/services/location_connector.py: 18%

51 statements  

« prev     ^ index     » next       coverage.py v7.6.2, created at 2024-10-10 03:02 +0300

1import config 

2from aiohttp import ClientConnectorError, ServerDisconnectedError 

3from errors import log_error, log_warning 

4from exceptions import GeoAPINoResults, GeoAPIStatusException 

5from logging_config import logger 

6from services.microservice_connector import ServiceConnector 

7from sotrans_models.models.orders.order import GeoLocation, StopAddress 

8from sotrans_models.models.services.location_api import ( 

9 OpenStreetMapResponse, 

10 OSMReverseGeoResult, 

11 YandexSearchResponse, 

12) 

13 

14 

15class LocationAPIConnector(ServiceConnector): 

16 async def request_line_address_with_yapi(self, line: str) -> StopAddress: 

17 await self._check_client() 

18 params = {"address": line} 

19 try: 

20 async with self.session.get( 

21 f"{config.LocationConfig.LOCATION_API_URL}/api/v1.1/yandex/geocode", 

22 params=params, 

23 ) as resp: 

24 if resp.status != 200: 

25 cont = await resp.content.read() 

26 logger.error( 

27 f"Error on getting address [{line}]: status {resp.status}, content: {cont.decode()}" 

28 ) 

29 raise GeoAPIStatusException 

30 result = await resp.json() 

31 except (ServerDisconnectedError, ClientConnectorError): 

32 raise GeoAPIStatusException 

33 response_model = YandexSearchResponse(**result) 

34 if not response_model.results: 

35 raise GeoAPINoResults 

36 first_result = response_model.results[0] 

37 return StopAddress( 

38 line=response_model.meta.request, 

39 location_address=first_result.address, 

40 index=first_result.postal_code, 

41 country=first_result.country, 

42 province=first_result.province, 

43 city=first_result.locality, 

44 location=GeoLocation( 

45 longitude=first_result.longitude, 

46 latitude=first_result.latitude, 

47 ), 

48 ) 

49 

50 async def request_line_address_with_osm(self, line: str) -> StopAddress: 

51 await self._check_client() 

52 params = {"q": line} 

53 try: 

54 async with self.session.get( 

55 f"{config.LocationConfig.LOCATION_API_URL}/osm/address", 

56 params=params, 

57 ) as resp: 

58 if resp.status != 200: 

59 log_warning( 

60 "No response from locations backend, osm." 

61 f" Status:[{resp.status}], " 

62 f"content:{(await resp.content.read()).decode()}" 

63 ) 

64 raise GeoAPIStatusException 

65 result = await resp.json() 

66 except (ServerDisconnectedError, ClientConnectorError): 

67 log_error("Connection problem on loc service") 

68 raise GeoAPIStatusException 

69 if "meta" in result: 

70 response_model = YandexSearchResponse(**result) 

71 if not response_model.results: 

72 raise GeoAPINoResults 

73 first_result = response_model.results[0] 

74 return StopAddress( 

75 line=response_model.meta.request, 

76 location_address=first_result.address, 

77 index=first_result.postal_code, 

78 country=first_result.country, 

79 province=first_result.province, 

80 city=first_result.locality, 

81 location=GeoLocation( 

82 longitude=first_result.longitude, 

83 latitude=first_result.latitude, 

84 ), 

85 ) 

86 response_model = OSMReverseGeoResult(**result) 

87 province = [] 

88 for f in (response_model.region, response_model.state): 

89 if f is not None: 

90 province.append(f) 

91 return StopAddress( 

92 line=line, 

93 location_address=line, 

94 index=response_model.postcode, 

95 country=response_model.country, 

96 city=response_model.city, 

97 province=province, 

98 location=GeoLocation( 

99 latitude=response_model.lat, 

100 longitude=response_model.lon, 

101 ), 

102 ) 

103 

104 

105geo_connector = LocationAPIConnector(timeout=config.LOCATION_API_TIMEOUT)