Coverage for api/services/microservice_connector.py: 28%

106 statements  

« prev     ^ index     » next       coverage.py v7.6.2, created at 2024-10-10 03:02 +0300

1import asyncio.exceptions 

2import time 

3from typing import Any, Literal 

4 

5import aiohttp 

6import config 

7import jwt 

8from aiohttp import ClientConnectorError, ServerTimeoutError 

9from async_oauthlib import OAuth2Session 

10from errors import log_error 

11from logging_config import logger 

12from oauthlib import oauth2 

13 

14 

15class ServiceConnector: 

16 def __init__( 

17 self, username: str = None, password: str = None, timeout: float = 10 

18 ): 

19 self.timeout = timeout 

20 self.session: aiohttp.ClientSession = None 

21 self.oauth_token_url = config.KeycloakConfig.TOKEN_URL 

22 self.client_id = config.KeycloakConfig.CLIENT_ID 

23 self.client_secret = config.KeycloakConfig.CLIENT_SECRET 

24 self.username = username 

25 self.password = password 

26 self.jwt_token = None 

27 self.exp = None 

28 

29 def _check_session(self): 

30 timeout = aiohttp.ClientTimeout(total=self.timeout) 

31 if not self.session or self.session.closed: 

32 self.session = aiohttp.ClientSession(timeout=timeout) 

33 

34 async def _check_client(self): 

35 self._check_session() 

36 # preventive expiration renewal 

37 if not self.exp or self.exp - time.time() < 5 * 60: 

38 await self._initialize() 

39 

40 async def _initialize(self): 

41 client = None 

42 if self.username and self.password: 

43 client = oauth2.LegacyApplicationClient(self.client_id) 

44 client.prepare_request_body(self.username, self.password) 

45 else: 

46 client = oauth2.BackendApplicationClient(self.client_id) 

47 client.prepare_request_body(include_client_id=True) 

48 try: 

49 async with OAuth2Session(client=client) as session: 

50 if self.username and self.password: 

51 response = await session.fetch_token( 

52 token_url=self.oauth_token_url, 

53 client_id=self.client_id, 

54 client_secret=self.client_secret, 

55 username=self.username, 

56 password=self.password, 

57 ) 

58 else: 

59 response = await session.fetch_token( 

60 token_url=self.oauth_token_url, 

61 client_id=self.client_id, 

62 client_secret=self.client_secret, 

63 audience=None, 

64 ) 

65 self.jwt_token = response["access_token"] 

66 jwt_token = jwt.decode( 

67 self.jwt_token, options={"verify_signature": False} 

68 ) 

69 self.exp = jwt_token["exp"] 

70 self.session.headers["Authorization"] = f"Bearer {self.jwt_token}" 

71 

72 except Exception as e: 

73 log_error() 

74 raise e 

75 

76 

77class RecommendationsConnector(ServiceConnector): 

78 async def make_request( 

79 self, 

80 api_url: str, 

81 api_path: str, 

82 method: Literal["PATCH", "POST", "DELETE", "GET"], 

83 json=None, 

84 ) -> list[dict[str, Any]] | dict[str, Any] | None: 

85 await self._check_client() 

86 if not self.session: 

87 raise aiohttp.ClientError("Can not initialize session") 

88 req_path = f"{api_url}{api_path}" 

89 if method == "GET": 

90 ctx = self.session.get(req_path) 

91 elif method == "POST": 

92 ctx = self.session.post(req_path, json=json) 

93 elif method == "PATCH": 

94 ctx = self.session.patch(req_path, json=json) 

95 elif method == "DELETE": 

96 ctx = self.session.delete(req_path) 

97 else: 

98 raise NotImplementedError 

99 try: 

100 async with ctx as resp: 

101 if resp.status not in (200, 201, 204): 

102 log_error(f"Request failed for {req_path}") 

103 if method == "GET": 

104 raise aiohttp.ClientError(str(resp.status)) 

105 if method == "GET": 

106 return await resp.json() 

107 except (ClientConnectorError, ServerTimeoutError): 

108 log_error("Recommendation server seems down.") 

109 return None 

110 

111 

112class DocumentsConnector(ServiceConnector): 

113 def __init__(self): 

114 super().__init__() 

115 self.timeout = 30 

116 

117 async def make_request(self, api_path: str, json) -> str | None: 

118 await self._check_client() 

119 try: 

120 async with self.session.post(f"{api_path}/doc", json=json) as resp: 

121 if resp.status not in [200, 201]: 

122 log_error( 

123 f"Document service response status {resp.status}" 

124 ) 

125 return None 

126 return await resp.text() 

127 except ( 

128 ClientConnectorError, 

129 ServerTimeoutError, 

130 asyncio.exceptions.TimeoutError, 

131 ): 

132 log_error("Documents server seems down.") 

133 return None 

134 

135 

136class NotificationConnector(ServiceConnector): 

137 def __init__(self, api_path: str): 

138 super().__init__() 

139 self.api_path = api_path 

140 

141 async def make_request(self, json, is_multicast=False) -> str | None: 

142 await self._check_client() 

143 try: 

144 async with self.session.post( 

145 f"{self.api_path}/notifications{'/multicast' if is_multicast else '' }", 

146 json=json, 

147 ) as resp: 

148 if resp.status in [200, 201, 202]: 

149 return await resp.text() 

150 log_error( 

151 f"Notifications server unauthorized or bad request, status: {resp.status}; content: {await resp.text()}" 

152 ) 

153 except (ClientConnectorError, ServerTimeoutError): 

154 log_error("Notification server seems down.") 

155 return None 

156 logger.error("make_request FAILED") 

157 return None