Coverage for api/handlers/users.py: 13%
385 statements
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
1import asyncio
2import collections
3import json
4import uuid
5from contextlib import suppress
7from bson import ObjectId
8from bson.errors import InvalidId
9from database.updater import (
10 remove_user_from_deps,
11 update_subsidiary,
12 update_user,
13)
14from errors import log_error, log_warning
15from exceptions import (
16 BadParameterHTTPError,
17 NoAccessHTTPError,
18 NotFoundHTTPError,
19)
20from fastapi import HTTPException
21from handlers.authorization.check_role import has_role
22from handlers.grabbers.clients import clients_grabber
23from handlers.grabbers.orders import orders_data_grabber
24from handlers.grabbers.users import users_data_grabber
25from keycloak import idp
26from mongodb import (
27 cm_clients_col,
28 subsidiaries_col,
29 users_col,
30 users_webhook_col,
31)
32from pydantic import BaseModel
33from pymongo import ReturnDocument
34from pymongo.errors import OperationFailure
35from services.minio_policy_setter import MinioPolicySetter
36from services.notifications.director import notification_api
37from services.telegram import GROUP_TYPE, group_notification
38from sotrans_fastapi_keycloak._uuid import PydanticUUID
39from sotrans_fastapi_keycloak.api import get_keycloak_user_model
40from sotrans_fastapi_keycloak.exceptions import UserNotFound
41from sotrans_fastapi_keycloak.model import (
42 HTTPMethod,
43 KeycloakRoleModel,
44 user_model_attributes_serializer,
45)
46from sotrans_models.models.misc.client import (
47 ClientDBModel,
48 ManagersClientsDBModel,
49)
50from sotrans_models.models.orders.order import OrderDBModel
51from sotrans_models.models.organizations import SubsidiaryDBModel
52from sotrans_models.models.responses import GenericGetListResponse
53from sotrans_models.models.roles import SotransRole
54from sotrans_models.models.users import (
55 CreateUserWithRoleModel,
56 SotransKeycloakUserCreateModel,
57 SotransKeycloakUserModel,
58 SotransKeycloakUserPublicUpdateModel,
59 SotransKeycloakUserUpdateModel,
60 SotransOIDCUserModel,
61 SotransUserDBFieldsModel,
62 SotransUserDBModel,
63 UpdateKeycloakUserWebhookModel,
64 UpdateRoleModel,
65)
66from sotrans_models.utils.text_mappers import get_users_text_search
67from starlette import status
68from utils.data_grabber import (
69 BaseGetListQueryParams,
70 BaseGetOneQueryParams,
71 adjust_search_query,
72 update_search_query,
73)
74from utils.helper import (
75 clear_user_for_keycloak,
76 get_org_oid,
77 secure_pass_generator,
78)
80company_hierarchy = [
81 SotransRole.admin,
82 SotransRole.company_director,
83 SotransRole.company_manager,
84 SotransRole.company_logistician,
85]
87carrier_hierarchy = [
88 SotransRole.carrier_director,
89 SotransRole.carrier_logistician,
90]
93def get_most_role(roles: list[str]) -> str:
94 if not roles:
95 return ""
96 for comp_role in company_hierarchy:
97 if comp_role.value in roles:
98 return comp_role.value
99 for car_role in carrier_hierarchy:
100 if car_role.value in roles:
101 return car_role.value
102 return ""
105async def _assert_director_access(
106 user_id: str | None, accessor: SotransOIDCUserModel, access_for: str
107):
108 user_roles = accessor.roles
109 if not (
110 idp.check_roles(user_roles, [SotransRole.company_director])
111 or idp.check_roles(user_roles, [SotransRole.carrier_director])
112 ):
113 raise NoAccessHTTPError("роль")
114 if user_id is None:
115 return
116 user = await users_col.find_single("id", uuid.UUID(user_id))
117 if user is None:
118 raise NotFoundHTTPError("пользователь")
119 if not has_role(
120 accessor, SotransRole.company_director
121 ) and SotransUserDBModel(**user).organization_id != str(
122 get_org_oid(executor=accessor)
123 ):
124 raise NoAccessHTTPError(f"{access_for} директором")
127def _assert_user_access(executor):
128 if not idp.check_roles(executor.roles, [SotransRole.company_logistician]):
129 raise NoAccessHTTPError("Different organization")
132async def on_create_users(
133 creation_data: CreateUserWithRoleModel,
134 executor: SotransOIDCUserModel,
135 minio_policy_setter=MinioPolicySetter(),
136):
137 await _assert_director_access(None, executor, "create users")
138 # creation_data = SotransKeycloakUserCreateModel(
139 # **creation_data.model_dump(exclude_unset=True),
140 # organization_id=executor.organization_id
141 # )
142 role = creation_data.role
143 creation_data = SotransKeycloakUserCreateModel(
144 **creation_data.model_dump()
145 )
146 creation_data.organization_id = executor.organization_id
148 initial_roles = []
149 if has_role(executor, SotransRole.company_director):
150 company_employees = (
151 SotransRole.company_director,
152 SotransRole.company_manager,
153 SotransRole.company_logistician,
154 )
155 if role and role in company_employees:
156 initial_roles = [role]
157 else:
158 initial_roles = [SotransRole.company_logistician]
159 elif has_role(executor, SotransRole.carrier_director):
160 carrier_employees = (
161 SotransRole.carrier_director,
162 SotransRole.carrier_logistician,
163 )
165 if role and role in carrier_employees:
166 initial_roles = [role]
167 else:
168 initial_roles = [SotransRole.carrier_logistician]
170 password = None
171 if creation_data.password is None:
172 password = secure_pass_generator()
173 creation_data.password = password
175 # set minio policy according to roles and produce new model
176 updated_creation_data = (
177 minio_policy_setter.create_new_model_with_minio_policy_attr(
178 creation_data, initial_roles
179 )
180 )
182 user: SotransKeycloakUserModel = await idp.create_user(
183 model=updated_creation_data,
184 send_email_verification=False,
185 initial_roles=initial_roles,
186 )
187 new_user = SotransUserDBModel(
188 **{**user.model_dump(), **dict(roles=initial_roles)}
189 )
190 new_user.role = get_most_role(initial_roles)
192 new_user.password = password
193 notification_api.user_registered(new_user)
194 return new_user
197async def on_get_users(
198 executor: SotransOIDCUserModel, params: BaseGetListQueryParams
199) -> GenericGetListResponse[SotransUserDBModel]:
200 user_org_oid = get_org_oid(executor)
201 query = {SotransUserDBFieldsModel.organization_id: str(user_org_oid)}
202 params.where = update_search_query(params.where, query)
203 return await users_data_grabber.get_list(params)
206async def on_get_user(
207 user_id: uuid.UUID,
208 executor: SotransOIDCUserModel,
209 params: BaseGetOneQueryParams,
210):
211 q = {
212 SotransUserDBFieldsModel.organization_id: str(get_org_oid(executor)),
213 SotransUserDBFieldsModel.id: user_id,
214 }
215 user = await users_col.collection.find_one(q, projection=params.projection)
216 if user is None:
217 raise NotFoundHTTPError("пользователь")
218 return user
221def update_kc_int(d, u):
222 for k, v in u.items():
223 if isinstance(v, collections.abc.Mapping):
224 d[k] = update_kc_int(d.get(k, {}), v)
225 else:
226 d[k] = v
227 return d
230async def on_update_user(
231 user_id: str,
232 update_data: SotransKeycloakUserUpdateModel
233 | SotransKeycloakUserPublicUpdateModel,
234 executor: SotransOIDCUserModel,
235):
236 if isinstance(update_data, SotransKeycloakUserUpdateModel):
237 await _assert_director_access(
238 user_id, executor, "обновление приватной информации"
239 )
240 user = await users_col.find_single("id", uuid.UUID(user_id))
241 if user is None:
242 raise NotFoundHTTPError("пользователь")
243 if update_data.password is not None:
244 await idp.change_password(user_id, update_data.password)
245 clear_user_for_keycloak(user)
246 user_kc_model = SotransUserDBModel(**user)
247 user_in_db_kc = user_model_attributes_serializer(user_kc_model)
248 user_update_data_kc = get_keycloak_user_model(update_data)
250 update_kc_int(user_in_db_kc, user_update_data_kc)
251 clear_user_for_keycloak(user_in_db_kc)
253 response = await idp._admin_request(
254 url=f"{idp.users_uri}/{user_id}",
255 data=user_in_db_kc,
256 method=HTTPMethod.PUT,
257 )
258 if response.status == 204: # Update successful
259 with suppress(ValueError):
260 role = SotransRole(update_data.role)
261 carrier_employees = (
262 SotransRole.carrier_director,
263 SotransRole.carrier_logistician,
264 )
265 company_employees = (
266 SotransRole.company_director,
267 SotransRole.company_manager,
268 SotransRole.company_logistician,
269 )
271 async def update_role():
272 current_roles = await idp.get_user_roles(user_id)
273 await idp.remove_user_roles(
274 [cr.name for cr in current_roles], user_id
275 )
276 await idp.add_user_roles([role], user_id)
278 if has_role(executor, SotransRole.company_director):
279 if role in company_employees:
280 await update_role()
281 elif has_role(executor, SotransRole.carrier_director):
282 if role in carrier_employees:
283 await update_role()
285 sync_update = {**user, **update_data.model_dump(exclude_unset=True)}
286 sync_up_model = SotransUserDBModel(**sync_update)
287 if sync_up_model.roles and update_data.role:
288 sync_up_model.roles = [update_data.role]
290 return await users_col.collection.find_one_and_update(
291 {"id": sync_up_model.id},
292 {"$set": sync_up_model.model_dump(exclude_unset=True)},
293 return_document=ReturnDocument.AFTER,
294 )
296 log_error(f"{response.status}: {await response.content.read()}")
297 raise HTTPException(
298 status.HTTP_503_SERVICE_UNAVAILABLE,
299 "Запрос на обновление данных пользователя был провален.",
300 )
303async def on_delete_user(user_id: str, executor: SotransOIDCUserModel) -> dict:
304 await _assert_director_access(user_id, executor, "удаление пользователя")
305 return await idp.delete_user(user_id=user_id)
308async def update_users_roles(
309 user_id: str, executor: SotransOIDCUserModel, role: SotransRole
310):
311 if (
312 await idp.get_user(user_id=user_id)
313 ).organization_id != executor.organization_id:
314 raise NoAccessHTTPError("Employee")
315 carrier_employees = (
316 SotransRole.carrier_director,
317 SotransRole.carrier_logistician,
318 )
319 company_employees = (
320 SotransRole.company_director,
321 SotransRole.company_manager,
322 SotransRole.company_logistician,
323 )
324 carrier_chief = (
325 has_role(executor, SotransRole.carrier_director)
326 and role in carrier_employees
327 )
328 company_chief = (
329 has_role(executor, SotransRole.company_director)
330 and role in company_employees
331 )
333 if not (company_chief or carrier_chief):
334 raise NoAccessHTTPError("Roles")
335 current_roles = await idp.get_user_roles(user_id)
336 cur_roles_set = {cr.name for cr in current_roles}
337 if company_chief and not cur_roles_set & set(company_employees):
338 raise NoAccessHTTPError("Employees role")
339 if carrier_chief and not cur_roles_set & set(carrier_employees):
340 raise NoAccessHTTPError("Employees role")
341 # 'default-roles-sotransdev' gives ability to edit data in keycloack
342 # resulting_roles: list[str] = []
343 # with suppress(ValueError):
344 # current_roles.remove('default-roles-sotransdev')
345 # resulting_roles.append('default-roles-sotransdev')
346 await idp.remove_user_roles([cr.name for cr in current_roles], user_id)
347 await idp.add_user_roles([role], user_id)
348 return UpdateRoleModel(role=role)
351async def update_subsidiary_by_user_id(
352 user_id: str, sub_id: str | None, needs_result: bool
353) -> SotransKeycloakUserModel | None:
354 user = SotransKeycloakUserUpdateModel(
355 **(await idp.get_user(user_id)).model_dump()
356 )
357 user.subsidiary_id = sub_id
358 user_data = get_keycloak_user_model(await idp.get_user(user_id))
359 updated_user_data = get_keycloak_user_model(user)
360 update_kc_int(user_data, updated_user_data)
361 clear_user_for_keycloak(user_data)
362 await idp._admin_request(
363 url=f"{idp.users_uri}/{user_id}", data=user_data, method=HTTPMethod.PUT
364 )
365 if needs_result is False:
366 return None
367 return await idp.get_user(user_id=user_id)
370async def assign_sub_to_user(
371 user_id: str, sub_id: str | None, needs_result: bool = False
372) -> SotransKeycloakUserModel | None:
373 user = await update_subsidiary_by_user_id(user_id, sub_id, needs_result)
374 if needs_result is False:
375 return None
376 return user
379async def on_update_subsidiary(
380 user_id: str, sub_id: str
381) -> SotransKeycloakUserModel:
382 try:
383 oid = ObjectId(sub_id)
384 except InvalidId:
385 raise BadParameterHTTPError("sub_id")
386 named_sub = await subsidiaries_col.find_single(
387 "_id", oid, projection={"name": 1}
388 )
389 if named_sub is None:
390 raise NotFoundHTTPError("подразделение")
391 try:
392 return await assign_sub_to_user(user_id, sub_id, needs_result=True)
393 except UserNotFound:
394 raise NotFoundHTTPError("пользователь")
397def parse_role(data: dict) -> SotransRole | None:
398 role_update: str = data["representation"]
399 role_update_json = json.loads(role_update)
400 try:
401 role = role_update_json[0]["name"]
402 except IndexError:
403 return None
404 with suppress(ValueError):
405 return SotransRole(role)
406 return None
409async def on_add_role(data: dict):
410 res_path: str = data["resourcePath"]
411 up_user_id = uuid.UUID(res_path.split("/")[1])
412 role_enum = parse_role(data)
413 if not role_enum:
414 return
415 user = await users_col.find_single("id", up_user_id)
416 if not user:
417 log_warning(
418 f"No user with uuid {up_user_id} for role {role_enum.value}"
419 )
420 return
421 user_model = SotransUserDBModel(**user)
423 if role_enum not in user_model.roles:
424 user_model.roles.append(role_enum)
425 most_role = get_most_role(user_model.roles)
426 await users_col.collection.update_one(
427 {SotransUserDBFieldsModel.id: up_user_id},
428 {
429 "$set": {SotransUserDBFieldsModel.role: most_role},
430 "$push": {SotransUserDBFieldsModel.roles[0]: role_enum.value},
431 },
432 )
433 user_model.role = most_role
434 await update_user(up_user_id, user_model)
437class UserUpdateTimestamp(BaseModel):
438 user_id: PydanticUUID
439 timestamp: float
442async def check_ts(data: dict, user_id: uuid.UUID):
443 ts = data.get("proxy_timestamp")
444 if not ts:
445 return False
446 user_ts = await users_webhook_col.find_single("user_id", user_id)
447 if user_ts is None:
448 uts_model = UserUpdateTimestamp(user_id=user_id, timestamp=ts)
449 await users_webhook_col.create(uts_model.model_dump())
450 return True
451 user_ts_model = UserUpdateTimestamp(**user_ts)
452 if user_ts_model.timestamp > ts:
453 return False
454 await users_webhook_col.collection.update_one(
455 {"user_id": user_id, "timestamp": ts}
456 )
457 return True
460async def on_user_update_wh(data: dict):
461 group_notification(f"#user-update \n{data}", mode=GROUP_TYPE.INFO)
462 python_payload = json.loads(data["representation"])
463 user_update_data = UpdateKeycloakUserWebhookModel(**python_payload)
464 if not await check_ts(data, user_update_data.id):
465 return
466 previous_user = await users_col.find_single("id", user_update_data.id)
467 if not previous_user:
468 return
469 roles = previous_user.get("roles") or []
470 pu_model = SotransUserDBModel(**previous_user)
471 to_update_vector = (
472 True
473 if pu_model.surname != user_update_data.surname
474 or pu_model.name != user_update_data.name
475 or pu_model.patronymic != user_update_data.patronymic
476 else False
477 )
478 user = SotransUserDBModel(
479 **user_update_data.model_dump(), roles=roles, role=get_most_role(roles)
480 )
481 user.text_search = get_users_text_search(user)
482 previous_user_model = SotransUserDBModel(**previous_user)
483 change_subsidiary = previous_user_model.subsidiary_id != user.subsidiary_id
484 if change_subsidiary:
485 if user.subsidiary_id is not None:
486 with suppress(InvalidId):
487 soid = ObjectId(user.subsidiary_id)
488 subsidiary_name_holder = await subsidiaries_col.find_single(
489 "_id", soid, projection={"name": 1}
490 )
491 if subsidiary_name_holder:
492 user.subsidiary_name = subsidiary_name_holder["name"]
493 else:
494 user.subsidiary_name = None
495 await users_col.collection.update_one(
496 {SotransUserDBFieldsModel.id: user_update_data.id},
497 {"$set": user.model_dump()},
498 )
500 await update_user(
501 user_update_data.id,
502 user,
503 change_subsidiary=change_subsidiary
504 and previous_user_model.subsidiary_id is not None,
505 update_vector_ts=to_update_vector,
506 )
509async def on_remove_role(data: dict):
510 res_path: str = data["resourcePath"]
511 up_user_id = uuid.UUID(res_path.split("/")[1])
512 role_enum = parse_role(data)
513 if not role_enum:
514 return
515 user = await users_col.find_single("id", up_user_id)
516 if not user:
517 return
518 user_model = SotransUserDBModel(**user)
519 if role_enum in user_model.roles:
520 user_model.roles.remove(role_enum)
521 user_model.role = get_most_role(user_model.roles)
522 await users_col.collection.update_one(
523 {SotransUserDBFieldsModel.id: up_user_id},
524 {
525 "$set": {SotransUserDBFieldsModel.role: user_model.role},
526 "$pull": {SotransUserDBFieldsModel.roles[0]: role_enum.value},
527 },
528 )
529 await update_user(up_user_id, user_model)
532async def on_user_delete(data: dict):
533 res_path: str = data["resourcePath"]
534 user_id = uuid.UUID(res_path.split("/")[1])
535 await users_col.collection.delete_one(
536 {SotransUserDBFieldsModel.id: user_id}
537 )
538 await remove_user_from_deps(user_id)
541async def on_user_create(data: dict):
542 res_path: str = data["resourcePath"]
543 user_id = uuid.UUID(res_path.split("/")[1])
544 python_payload = json.loads(data["representation"])
545 user_update_data = UpdateKeycloakUserWebhookModel(**python_payload)
546 user_update_data.id = user_id
547 user = SotransUserDBModel(**user_update_data.model_dump())
548 user.text_search = get_users_text_search(user)
549 await users_col.collection.insert_one(user.model_dump())
550 if user.subsidiary_id is None:
551 return
552 try:
553 sub_id = ObjectId(user.subsidiary_id)
554 except InvalidId:
555 return
556 try:
557 subsidiary = await subsidiaries_col.collection.find_one_and_update(
558 {"_id": sub_id},
559 {"$push": {SubsidiaryDBModel.employees[0]: user.model_dump()}},
560 return_document=ReturnDocument.AFTER,
561 )
562 except OperationFailure:
563 subsidiary = await subsidiaries_col.collection.find_one_and_update(
564 {"_id": sub_id},
565 {"$set": {SubsidiaryDBModel.employees[0]: [user.model_dump()]}},
566 return_document=ReturnDocument.AFTER,
567 )
568 if subsidiary:
569 subs_model = SubsidiaryDBModel(**subsidiary)
570 await update_subsidiary(subs_model)
573async def on_user_register(data: dict):
574 user_id = data["userId"]
575 for t in range(60):
576 try:
577 user_data = await idp.get_user(user_id)
578 except UserNotFound:
579 await asyncio.sleep(0.7)
580 else:
581 log_warning(f"{user_id} registered on try {t}")
582 break
583 else:
584 log_error(f"Failed to find user, webhook data:\n{data}")
585 return
586 to_create = SotransUserDBModel(**user_data.model_dump())
587 to_create.text_search = get_users_text_search(to_create)
588 await users_col.collection.insert_one(to_create.model_dump())
591async def keycloak_webhook_handler(data: dict):
592 event_type = data.get("type")
593 if event_type:
594 match event_type:
595 case "USER-UPDATE":
596 await on_user_update_wh(data)
597 case "REALM_ROLE_MAPPING-CREATE":
598 await asyncio.sleep(3)
599 await on_add_role(data)
600 case "REALM_ROLE_MAPPING-DELETE":
601 await on_remove_role(data)
602 case "USER-CREATE":
603 await on_user_create(data)
604 case "REGISTER":
605 asyncio.create_task(on_user_register(data))
606 case "USER-DELETE":
607 await on_user_delete(data)
610async def on_get_assigned_to_employee_clients(
611 user_id: uuid.UUID, params: BaseGetListQueryParams
612) -> GenericGetListResponse[ClientDBModel]:
613 assigned_clients = await cm_clients_col.find_single(
614 ManagersClientsDBModel.manager_id, user_id
615 )
616 if not assigned_clients:
617 return GenericGetListResponse[ClientDBModel](items=[], total=0)
618 clients_ids = ManagersClientsDBModel(**assigned_clients).clients_ids
619 params.where = adjust_search_query(
620 params.where, "id", {str(ci) for ci in clients_ids}
621 )
622 return await clients_grabber.get_list(params)
625async def on_get_assigned_orders_to_employee(
626 user_id: uuid.UUID, params: BaseGetListQueryParams
627) -> GenericGetListResponse[OrderDBModel]:
628 params.where = update_search_query(
629 params.where, {OrderDBModel.assigned.company.employee.id: str(user_id)}
630 )
631 return await orders_data_grabber.get_list(params)
634def parse_user_roles(roles: list[KeycloakRoleModel]) -> list[str]:
635 result = []
636 for role in roles:
637 with suppress(ValueError):
638 result.append(SotransRole(role.name).value)
639 return result
642async def users_sync_batch_processing(
643 users_batch: list[SotransKeycloakUserModel],
644):
645 for user in users_batch:
646 user_roles = await idp.get_user_roles(str(user.id))
647 role_names = parse_user_roles(user_roles)
648 most_role = get_most_role(role_names)
649 try:
650 if user.subsidiary_id is None:
651 raise InvalidId
652 sub_id = ObjectId(user.subsidiary_id)
653 except (InvalidId, TypeError):
654 sub_name = None
655 else:
656 sub = await subsidiaries_col.find_single(
657 "_id", sub_id, projection={"name": 1}
658 )
659 if sub is None:
660 sub_name = None
661 else:
662 sub_name = sub["name"]
663 db_user = SotransUserDBModel(
664 **{**user.model_dump(), **dict(roles=role_names, role=most_role)},
665 subsidiary_name=sub_name,
666 )
667 db_user.text_search = get_users_text_search(db_user)
668 await users_col.collection.update_one(
669 {"id": user.id},
670 {"$set": db_user.model_dump()},
671 upsert=True,
672 )
675async def iterate_for_sync():
676 limit = 200
677 skip = 0
678 timeout = idp.timeout
679 idp.timeout = 150
680 try:
681 users = await idp.get_all_users(skip, limit)
682 while users:
683 await users_sync_batch_processing(users)
684 skip += limit
685 users = await idp.get_all_users(skip, limit)
686 except Exception:
687 log_error("Keycloak sync crash")
688 # print("Sync fail")
689 else:
690 log_warning("Keycloak sync run finished successfully")
691 # print("Sync success")
692 finally:
693 idp.timeout = timeout