Coverage for api/utils/access_wrapper.py: 37%

28 statements  

« prev     ^ index     » next       coverage.py v7.6.2, created at 2024-10-10 03:02 +0300

1from typing import Annotated 

2 

3from fastapi import Depends, HTTPException 

4from handlers.authorization.check_role import has_role 

5from keycloak import idp 

6from mongodb import orgs_col 

7from sotrans_models.models.misc.verification import VerificationStatus 

8from sotrans_models.models.organizations import ( 

9 InnVerificationStatus, 

10 OrganizationDBModel, 

11) 

12from sotrans_models.models.roles import SotransRole 

13from sotrans_models.models.users import SotransOIDCUserModel 

14from starlette import status 

15from utils.helper import get_org_oid 

16 

17 

18def get_active_user( 

19 roles: list[SotransRole], 

20 is_organization_active: bool = True, 

21 company_inn_verification: InnVerificationStatus = InnVerificationStatus.success, 

22 company_security_verification: VerificationStatus 

23 | None = VerificationStatus.accepted, 

24): 

25 async def get_current_access( 

26 user: Annotated[ 

27 SotransOIDCUserModel, 

28 Depends(idp.get_current_user(required_role_names=roles)), 

29 ], 

30 ) -> SotransOIDCUserModel: 

31 if not has_role(user, SotransRole.carrier_logistician) or has_role( 

32 user, SotransRole.company_logistician 

33 ): 

34 return user 

35 oid = get_org_oid(user) 

36 organization = await orgs_col.find_single("_id", oid) 

37 organization_model = OrganizationDBModel(**organization) 

38 if is_organization_active is not organization_model.is_active: 

39 raise HTTPException( 

40 status.HTTP_403_FORBIDDEN, 

41 f"Статус организации должен быть {is_organization_active} для выполнения действия.", 

42 ) 

43 if ( 

44 company_inn_verification 

45 != organization_model.inn_verification_status 

46 ): 

47 raise HTTPException( 

48 status.HTTP_403_FORBIDDEN, 

49 f"Верификация должна иметь значение {company_inn_verification} для выполнения действия.", 

50 ) 

51 if ( 

52 company_security_verification 

53 and not organization_model.verification 

54 ): 

55 raise HTTPException( 

56 status.HTTP_403_FORBIDDEN, 

57 "Служба безопасности должна верифицировать ваш доступ.", 

58 ) 

59 if ( 

60 company_security_verification 

61 and organization_model.verification 

62 and organization_model.verification.status 

63 != company_security_verification 

64 ): 

65 raise HTTPException( 

66 status.HTTP_403_FORBIDDEN, 

67 "Верификация службой безопасности не соответствует.", 

68 ) 

69 return user 

70 

71 return get_current_access