Coverage for api/handlers/subsidiaries.py: 18%
97 statements
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
1from contextlib import suppress
2from typing import Any
4from bson import ObjectId
5from database.entity import update_etag
6from database.updater import remove_subsidiary_from_deps, update_subsidiary
7from exceptions import NotFoundHTTPError
8from handlers.grabbers.clients import clients_grabber
9from handlers.users import assign_sub_to_user
10from mongodb import (
11 clients_col,
12 subsidiaries_clients_col,
13 subsidiaries_col,
14 users_col,
15)
16from pymongo import ReturnDocument
17from sotrans_fastapi_keycloak.exceptions import UserNotFound
18from sotrans_models.models._base import InsertByOIDModel
19from sotrans_models.models.misc.client import (
20 ClientDBModel,
21 SubsidiaryClientsModel,
22)
23from sotrans_models.models.organizations import (
24 CreateSubsidiaryModel,
25 SubsidiaryDBModel,
26 UpdateSubsidiaryModel,
27)
28from sotrans_models.models.responses import GenericGetListResponse
29from utils.data_grabber import BaseGetListQueryParams, adjust_search_query
30from utils.helper import EntityHash, etag_detalizer, get_hash
33async def get_employees(
34 sub: CreateSubsidiaryModel | UpdateSubsidiaryModel,
35) -> list[dict[str, Any]]:
36 employees = []
37 if sub.employees:
38 for emp_ins in sub.employees:
39 emp = await users_col.find_single("id", emp_ins.id)
40 if emp:
41 employees.append(emp)
42 return employees
45async def on_create_subsidiary(
46 sub: CreateSubsidiaryModel,
47) -> SubsidiaryDBModel | dict[str, Any]:
48 subsidiary = sub.model_dump()
49 if sub.employees:
50 subsidiary[
51 SubsidiaryDBModel.employees[0]
52 ] = await users_col.find_batch(
53 {"id": {"$in": [ibuuid.id for ibuuid in sub.employees]}}
54 )
55 etag = get_hash(subsidiary)
56 created = await subsidiaries_col.create({**subsidiary, **{"etag": etag}})
57 if sub.employees:
58 for employee in sub.employees:
59 with suppress(UserNotFound):
60 await assign_sub_to_user(
61 str(employee.id),
62 str(created["id"]),
63 )
64 return created
67async def on_update_subsidiary(
68 sub_id: ObjectId, subdivision: UpdateSubsidiaryModel
69) -> dict[str, Any] | SubsidiaryDBModel:
70 update_data = subdivision.model_dump(exclude_unset=True)
71 if subdivision.employees:
72 del update_data["employees"]
73 etag_q = {"etag": subdivision.etag} if subdivision.etag is not None else {}
74 updated = await subsidiaries_col.collection.find_one_and_update(
75 {"_id": sub_id} | etag_q,
76 {"$set": update_data},
77 return_document=ReturnDocument.AFTER,
78 )
79 if not updated:
80 await etag_detalizer(subsidiaries_col, etag_q, {"_id": sub_id})
81 raise NotFoundHTTPError("филиал")
82 await update_etag(updated, subsidiaries_col)
83 updated_model = SubsidiaryDBModel(**updated)
84 await update_subsidiary(updated_model)
85 if "employees" in subdivision.model_fields_set:
86 if subdivision.employees:
87 for employee in subdivision.employees:
88 with suppress(UserNotFound):
89 await assign_sub_to_user(str(employee.id), str(sub_id))
90 elif updated_model.employees:
91 for empl in updated_model.employees:
92 with suppress(UserNotFound):
93 await assign_sub_to_user(str(empl.id), None)
94 return updated_model
97async def on_remove_subsidiary(sub_id: ObjectId, etag: EntityHash | None):
98 etag_q = {"etag": etag} if etag else {}
99 subs = await subsidiaries_col.collection.find_one_and_delete(
100 {"_id": sub_id} | etag_q
101 )
102 if subs is None:
103 await etag_detalizer(subsidiaries_col, etag_q, {"_id": sub_id})
104 raise NotFoundHTTPError("подразделение")
105 subs_model = SubsidiaryDBModel(**subs)
106 if subs_model.employees:
107 for empl in subs_model.employees:
108 with suppress(UserNotFound):
109 await assign_sub_to_user(str(empl.id), None)
110 await remove_subsidiary_from_deps(sub_id)
113async def on_add_client_to_subsidiary(
114 subsidiary_id: ObjectId, client_id: ObjectId
115) -> dict[str, Any] | SubsidiaryClientsModel:
116 subsidiary = await subsidiaries_col.find_single("_id", subsidiary_id)
117 if not subsidiary:
118 raise NotFoundHTTPError("subsidiary")
119 await clients_col.collection.update_one(
120 {"_id": client_id},
121 {
122 "$set": {
123 ClientDBModel.subsidiary: InsertByOIDModel(
124 id=subsidiary_id
125 ).model_dump()
126 }
127 },
128 )
129 assigned = await subsidiaries_clients_col.collection.find_one_and_update(
130 {"subsidiary_id": subsidiary_id},
131 {"$addToSet": {"clients_ids": client_id}},
132 upsert=True,
133 return_document=ReturnDocument.AFTER,
134 )
135 return assigned
138async def revoke_subsidiary(
139 subsidiary_id: ObjectId, client_id: ObjectId
140) -> dict[str, Any] | SubsidiaryClientsModel:
141 revoked = await subsidiaries_clients_col.collection.find_one_and_update(
142 {"subsidiary_id": subsidiary_id},
143 {"$pull": {"clients_ids": client_id}},
144 return_document=ReturnDocument.AFTER,
145 )
146 if revoked:
147 revoked_model = SubsidiaryClientsModel(**revoked)
148 if not revoked_model.clients_ids:
149 await subsidiaries_clients_col.collection.delete_one(
150 {"subsidiary_id": subsidiary_id}
151 )
152 return revoked_model
153 return SubsidiaryClientsModel()
156async def on_get_assigned_clients_by_sub_id(
157 subsidiary_id: ObjectId,
158) -> dict[str, Any] | SubsidiaryClientsModel:
159 assigned = await subsidiaries_clients_col.find_single(
160 "subsidiary_id", subsidiary_id
161 )
162 if not assigned:
163 raise NotFoundHTTPError("subsidiary_assignment")
164 return assigned
167async def on_get_assigned_clients_full_by_sub(
168 sub_id: ObjectId, params: BaseGetListQueryParams
169) -> GenericGetListResponse[ClientDBModel]:
170 assigned_ids = await subsidiaries_clients_col.find_single(
171 "subsidiary_id", sub_id
172 )
173 if not assigned_ids:
174 return GenericGetListResponse(total=0, items=[])
175 clients_ids = SubsidiaryClientsModel(**assigned_ids).clients_ids or []
176 params.where = adjust_search_query(
177 params.where,
178 "id",
179 {str(ci) for ci in clients_ids},
180 )
181 return await clients_grabber.get_list(params)