Coverage for api/handlers/subsidiaries.py: 18%

97 statements  

« prev     ^ index     » next       coverage.py v7.6.2, created at 2024-10-10 03:02 +0300

1from contextlib import suppress 

2from typing import Any 

3 

4from bson import ObjectId 

5from database.entity import update_etag 

6from database.updater import remove_subsidiary_from_deps, update_subsidiary 

7from exceptions import NotFoundHTTPError 

8from handlers.grabbers.clients import clients_grabber 

9from handlers.users import assign_sub_to_user 

10from mongodb import ( 

11 clients_col, 

12 subsidiaries_clients_col, 

13 subsidiaries_col, 

14 users_col, 

15) 

16from pymongo import ReturnDocument 

17from sotrans_fastapi_keycloak.exceptions import UserNotFound 

18from sotrans_models.models._base import InsertByOIDModel 

19from sotrans_models.models.misc.client import ( 

20 ClientDBModel, 

21 SubsidiaryClientsModel, 

22) 

23from sotrans_models.models.organizations import ( 

24 CreateSubsidiaryModel, 

25 SubsidiaryDBModel, 

26 UpdateSubsidiaryModel, 

27) 

28from sotrans_models.models.responses import GenericGetListResponse 

29from utils.data_grabber import BaseGetListQueryParams, adjust_search_query 

30from utils.helper import EntityHash, etag_detalizer, get_hash 

31 

32 

33async def get_employees( 

34 sub: CreateSubsidiaryModel | UpdateSubsidiaryModel, 

35) -> list[dict[str, Any]]: 

36 employees = [] 

37 if sub.employees: 

38 for emp_ins in sub.employees: 

39 emp = await users_col.find_single("id", emp_ins.id) 

40 if emp: 

41 employees.append(emp) 

42 return employees 

43 

44 

45async def on_create_subsidiary( 

46 sub: CreateSubsidiaryModel, 

47) -> SubsidiaryDBModel | dict[str, Any]: 

48 subsidiary = sub.model_dump() 

49 if sub.employees: 

50 subsidiary[ 

51 SubsidiaryDBModel.employees[0] 

52 ] = await users_col.find_batch( 

53 {"id": {"$in": [ibuuid.id for ibuuid in sub.employees]}} 

54 ) 

55 etag = get_hash(subsidiary) 

56 created = await subsidiaries_col.create({**subsidiary, **{"etag": etag}}) 

57 if sub.employees: 

58 for employee in sub.employees: 

59 with suppress(UserNotFound): 

60 await assign_sub_to_user( 

61 str(employee.id), 

62 str(created["id"]), 

63 ) 

64 return created 

65 

66 

67async def on_update_subsidiary( 

68 sub_id: ObjectId, subdivision: UpdateSubsidiaryModel 

69) -> dict[str, Any] | SubsidiaryDBModel: 

70 update_data = subdivision.model_dump(exclude_unset=True) 

71 if subdivision.employees: 

72 del update_data["employees"] 

73 etag_q = {"etag": subdivision.etag} if subdivision.etag is not None else {} 

74 updated = await subsidiaries_col.collection.find_one_and_update( 

75 {"_id": sub_id} | etag_q, 

76 {"$set": update_data}, 

77 return_document=ReturnDocument.AFTER, 

78 ) 

79 if not updated: 

80 await etag_detalizer(subsidiaries_col, etag_q, {"_id": sub_id}) 

81 raise NotFoundHTTPError("филиал") 

82 await update_etag(updated, subsidiaries_col) 

83 updated_model = SubsidiaryDBModel(**updated) 

84 await update_subsidiary(updated_model) 

85 if "employees" in subdivision.model_fields_set: 

86 if subdivision.employees: 

87 for employee in subdivision.employees: 

88 with suppress(UserNotFound): 

89 await assign_sub_to_user(str(employee.id), str(sub_id)) 

90 elif updated_model.employees: 

91 for empl in updated_model.employees: 

92 with suppress(UserNotFound): 

93 await assign_sub_to_user(str(empl.id), None) 

94 return updated_model 

95 

96 

97async def on_remove_subsidiary(sub_id: ObjectId, etag: EntityHash | None): 

98 etag_q = {"etag": etag} if etag else {} 

99 subs = await subsidiaries_col.collection.find_one_and_delete( 

100 {"_id": sub_id} | etag_q 

101 ) 

102 if subs is None: 

103 await etag_detalizer(subsidiaries_col, etag_q, {"_id": sub_id}) 

104 raise NotFoundHTTPError("подразделение") 

105 subs_model = SubsidiaryDBModel(**subs) 

106 if subs_model.employees: 

107 for empl in subs_model.employees: 

108 with suppress(UserNotFound): 

109 await assign_sub_to_user(str(empl.id), None) 

110 await remove_subsidiary_from_deps(sub_id) 

111 

112 

113async def on_add_client_to_subsidiary( 

114 subsidiary_id: ObjectId, client_id: ObjectId 

115) -> dict[str, Any] | SubsidiaryClientsModel: 

116 subsidiary = await subsidiaries_col.find_single("_id", subsidiary_id) 

117 if not subsidiary: 

118 raise NotFoundHTTPError("subsidiary") 

119 await clients_col.collection.update_one( 

120 {"_id": client_id}, 

121 { 

122 "$set": { 

123 ClientDBModel.subsidiary: InsertByOIDModel( 

124 id=subsidiary_id 

125 ).model_dump() 

126 } 

127 }, 

128 ) 

129 assigned = await subsidiaries_clients_col.collection.find_one_and_update( 

130 {"subsidiary_id": subsidiary_id}, 

131 {"$addToSet": {"clients_ids": client_id}}, 

132 upsert=True, 

133 return_document=ReturnDocument.AFTER, 

134 ) 

135 return assigned 

136 

137 

138async def revoke_subsidiary( 

139 subsidiary_id: ObjectId, client_id: ObjectId 

140) -> dict[str, Any] | SubsidiaryClientsModel: 

141 revoked = await subsidiaries_clients_col.collection.find_one_and_update( 

142 {"subsidiary_id": subsidiary_id}, 

143 {"$pull": {"clients_ids": client_id}}, 

144 return_document=ReturnDocument.AFTER, 

145 ) 

146 if revoked: 

147 revoked_model = SubsidiaryClientsModel(**revoked) 

148 if not revoked_model.clients_ids: 

149 await subsidiaries_clients_col.collection.delete_one( 

150 {"subsidiary_id": subsidiary_id} 

151 ) 

152 return revoked_model 

153 return SubsidiaryClientsModel() 

154 

155 

156async def on_get_assigned_clients_by_sub_id( 

157 subsidiary_id: ObjectId, 

158) -> dict[str, Any] | SubsidiaryClientsModel: 

159 assigned = await subsidiaries_clients_col.find_single( 

160 "subsidiary_id", subsidiary_id 

161 ) 

162 if not assigned: 

163 raise NotFoundHTTPError("subsidiary_assignment") 

164 return assigned 

165 

166 

167async def on_get_assigned_clients_full_by_sub( 

168 sub_id: ObjectId, params: BaseGetListQueryParams 

169) -> GenericGetListResponse[ClientDBModel]: 

170 assigned_ids = await subsidiaries_clients_col.find_single( 

171 "subsidiary_id", sub_id 

172 ) 

173 if not assigned_ids: 

174 return GenericGetListResponse(total=0, items=[]) 

175 clients_ids = SubsidiaryClientsModel(**assigned_ids).clients_ids or [] 

176 params.where = adjust_search_query( 

177 params.where, 

178 "id", 

179 {str(ci) for ci in clients_ids}, 

180 ) 

181 return await clients_grabber.get_list(params)