Coverage for api/services/microservice_connector.py: 28%
106 statements
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
1import asyncio.exceptions
2import time
3from typing import Any, Literal
5import aiohttp
6import config
7import jwt
8from aiohttp import ClientConnectorError, ServerTimeoutError
9from async_oauthlib import OAuth2Session
10from errors import log_error
11from logging_config import logger
12from oauthlib import oauth2
15class ServiceConnector:
16 def __init__(
17 self, username: str = None, password: str = None, timeout: float = 10
18 ):
19 self.timeout = timeout
20 self.session: aiohttp.ClientSession = None
21 self.oauth_token_url = config.KeycloakConfig.TOKEN_URL
22 self.client_id = config.KeycloakConfig.CLIENT_ID
23 self.client_secret = config.KeycloakConfig.CLIENT_SECRET
24 self.username = username
25 self.password = password
26 self.jwt_token = None
27 self.exp = None
29 def _check_session(self):
30 timeout = aiohttp.ClientTimeout(total=self.timeout)
31 if not self.session or self.session.closed:
32 self.session = aiohttp.ClientSession(timeout=timeout)
34 async def _check_client(self):
35 self._check_session()
36 # preventive expiration renewal
37 if not self.exp or self.exp - time.time() < 5 * 60:
38 await self._initialize()
40 async def _initialize(self):
41 client = None
42 if self.username and self.password:
43 client = oauth2.LegacyApplicationClient(self.client_id)
44 client.prepare_request_body(self.username, self.password)
45 else:
46 client = oauth2.BackendApplicationClient(self.client_id)
47 client.prepare_request_body(include_client_id=True)
48 try:
49 async with OAuth2Session(client=client) as session:
50 if self.username and self.password:
51 response = await session.fetch_token(
52 token_url=self.oauth_token_url,
53 client_id=self.client_id,
54 client_secret=self.client_secret,
55 username=self.username,
56 password=self.password,
57 )
58 else:
59 response = await session.fetch_token(
60 token_url=self.oauth_token_url,
61 client_id=self.client_id,
62 client_secret=self.client_secret,
63 audience=None,
64 )
65 self.jwt_token = response["access_token"]
66 jwt_token = jwt.decode(
67 self.jwt_token, options={"verify_signature": False}
68 )
69 self.exp = jwt_token["exp"]
70 self.session.headers["Authorization"] = f"Bearer {self.jwt_token}"
72 except Exception as e:
73 log_error()
74 raise e
77class RecommendationsConnector(ServiceConnector):
78 async def make_request(
79 self,
80 api_url: str,
81 api_path: str,
82 method: Literal["PATCH", "POST", "DELETE", "GET"],
83 json=None,
84 ) -> list[dict[str, Any]] | dict[str, Any] | None:
85 await self._check_client()
86 if not self.session:
87 raise aiohttp.ClientError("Can not initialize session")
88 req_path = f"{api_url}{api_path}"
89 if method == "GET":
90 ctx = self.session.get(req_path)
91 elif method == "POST":
92 ctx = self.session.post(req_path, json=json)
93 elif method == "PATCH":
94 ctx = self.session.patch(req_path, json=json)
95 elif method == "DELETE":
96 ctx = self.session.delete(req_path)
97 else:
98 raise NotImplementedError
99 try:
100 async with ctx as resp:
101 if resp.status not in (200, 201, 204):
102 log_error(f"Request failed for {req_path}")
103 if method == "GET":
104 raise aiohttp.ClientError(str(resp.status))
105 if method == "GET":
106 return await resp.json()
107 except (ClientConnectorError, ServerTimeoutError):
108 log_error("Recommendation server seems down.")
109 return None
112class DocumentsConnector(ServiceConnector):
113 def __init__(self):
114 super().__init__()
115 self.timeout = 30
117 async def make_request(self, api_path: str, json) -> str | None:
118 await self._check_client()
119 try:
120 async with self.session.post(f"{api_path}/doc", json=json) as resp:
121 if resp.status not in [200, 201]:
122 log_error(
123 f"Document service response status {resp.status}"
124 )
125 return None
126 return await resp.text()
127 except (
128 ClientConnectorError,
129 ServerTimeoutError,
130 asyncio.exceptions.TimeoutError,
131 ):
132 log_error("Documents server seems down.")
133 return None
136class NotificationConnector(ServiceConnector):
137 def __init__(self, api_path: str):
138 super().__init__()
139 self.api_path = api_path
141 async def make_request(self, json, is_multicast=False) -> str | None:
142 await self._check_client()
143 try:
144 async with self.session.post(
145 f"{self.api_path}/notifications{'/multicast' if is_multicast else '' }",
146 json=json,
147 ) as resp:
148 if resp.status in [200, 201, 202]:
149 return await resp.text()
150 log_error(
151 f"Notifications server unauthorized or bad request, status: {resp.status}; content: {await resp.text()}"
152 )
153 except (ClientConnectorError, ServerTimeoutError):
154 log_error("Notification server seems down.")
155 return None
156 logger.error("make_request FAILED")
157 return None