Coverage for api/handlers/authorization/assigned_filters.py: 16%

91 statements  

« prev     ^ index     » next       coverage.py v7.6.2, created at 2024-10-10 03:02 +0300

1from typing import Any, Literal 

2from uuid import UUID 

3 

4from bson import ObjectId 

5from bson.errors import InvalidId 

6from exceptions import BadParameterHTTPError, SubsidiaryIsNotAssigned 

7from handlers.authorization.check_role import has_role 

8from handlers.authorization.company_employee_access import ( 

9 get_assigned_clients_ids, 

10) 

11from mongodb import cm_clients_col 

12from sotrans_models.models.misc.client import ManagersClientsDBModel 

13from sotrans_models.models.roles import SotransRole 

14from sotrans_models.models.users import SotransOIDCUserModel 

15from utils.helper import get_subsidiary_oid 

16 

17AssignedFilter = dict[Literal["type", "subject"], str] 

18 

19BY_CLIENT: AssignedFilter = {"type": "client"} 

20ASSIGNED: tuple[AssignedFilter, AssignedFilter] = {"type": "subsidiary"}, { 

21 "type": "direct" 

22} 

23 

24 

25async def match_filters_for_not_privileged( 

26 case: AssignedFilter, user: SotransOIDCUserModel, filters: list[dict] 

27): 

28 if case["type"] == "direct": 

29 filters.append({"assigned.company.employee.id": user.sub}) 

30 if case["type"] == "client": 

31 client_ids = await get_assigned_clients_ids(user) 

32 filters.append({"client.id": {"$in": client_ids}}) 

33 if case["type"] == "subsidiary": 

34 try: 

35 sub_id = await get_subsidiary_oid(user) 

36 except SubsidiaryIsNotAssigned: 

37 filters.append({"_id": {"$exists": False}}) 

38 return 

39 filters.append({"assigned.company.subsidiary.id": sub_id}) 

40 

41 

42async def get_subject_for_director( 

43 user: SotransOIDCUserModel, case: AssignedFilter 

44) -> UUID | ObjectId: 

45 if "subject" not in case: 

46 if case["type"] == "subsidiary": 

47 subject: ObjectId | UUID = await get_subsidiary_oid(user) 

48 elif case["type"] == "direct": 

49 subject = user.sub 

50 elif case["type"] == "client": 

51 subject = user.sub 

52 else: 

53 raise BadParameterHTTPError("неизвестный тип") 

54 else: 

55 try: 

56 subject = ObjectId(case["subject"]) 

57 except InvalidId: 

58 try: 

59 subject = UUID(case["subject"]) 

60 except ValueError: 

61 raise BadParameterHTTPError("неверный subject") 

62 return subject 

63 

64 

65async def make_filter_from_assignment( 

66 assignment: list[AssignedFilter], user: SotransOIDCUserModel 

67) -> dict[str, Any]: 

68 filters: list[dict] = [] 

69 had_by_subsidiary = ( 

70 had_by_client 

71 ) = had_by_employee = missing_subsidiary = False 

72 if has_role(user, SotransRole.company_director): 

73 for case in assignment: 

74 try: 

75 subject = await get_subject_for_director(user, case) 

76 except SubsidiaryIsNotAssigned: 

77 missing_subsidiary = True 

78 if case["type"] == "direct": 

79 if had_by_employee: 

80 continue 

81 had_by_employee = True 

82 filters.append({"assigned.company.employee.id": subject}) 

83 elif case["type"] == "client": 

84 if had_by_client: 

85 continue 

86 had_by_client = True 

87 assigned_clients = await cm_clients_col.find_single( 

88 "manager_id", subject 

89 ) 

90 client_ids = ( 

91 ManagersClientsDBModel(**assigned_clients).clients_ids 

92 if assigned_clients 

93 else [] 

94 ) 

95 filters.append({"client.id": {"$in": client_ids}}) 

96 elif case["type"] == "subsidiary": 

97 if had_by_subsidiary: 

98 continue 

99 had_by_subsidiary = True 

100 if missing_subsidiary: 

101 filters.append({"_id": {"$exists": False}}) 

102 continue 

103 filters.append({"assigned.company.subsidiary.id": subject}) 

104 else: 

105 for case in assignment: 

106 await match_filters_for_not_privileged(case, user, filters) 

107 if len(filters) == 1: 

108 return filters[0] 

109 if len(filters) > 1: 

110 return {"$or": filters} 

111 return {} 

112 

113 

114def ensure_default_clients(assignment: list[AssignedFilter]): 

115 if not assignment: 

116 assignment.append(BY_CLIENT) 

117 

118 

119def ensure_default_clients_assignment(assignment: list[AssignedFilter]): 

120 if not assignment: 

121 assignment.append(BY_CLIENT) 

122 assignment.extend(ASSIGNED) 

123 

124 

125def ensure_default_assigned(assignment: list[AssignedFilter]): 

126 if not assignment: 

127 assignment.extend(ASSIGNED)