Coverage for api/operations/assignment.py: 18%
38 statements
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
1import asyncio
2import uuid
3from typing import Any
5from exceptions import BadParameterHTTPError
6from mongodb import orgs_col, subsidiaries_col, users_col
7from services.notifications.director import notification_api
8from sotrans_models.models.orders.order import (
9 Assignment,
10 OrderDBModel,
11 OrderUpdateModel,
12)
13from sotrans_models.models.organizations import CreateSubsidiaryModel
14from utils.helper import check_assigned
17async def data_from_assigned(order: OrderUpdateModel | None) -> dict:
18 if not (order and order.assigned and order.assigned.company):
19 return {}
20 assigned = order.assigned
21 if not (
22 (assigned.company.subsidiary and assigned.company.subsidiary.id)
23 or (assigned.company.employee and assigned.company.employee.id)
24 ):
25 return {}
26 if isinstance(assigned.company.subsidiary, CreateSubsidiaryModel):
27 raise BadParameterHTTPError("только существующие подразделения")
28 update_data: dict = {"assigned": {"company": {}}}
29 if "employee" in assigned.company.model_fields_set:
30 if assigned.company.employee is None:
31 update_data["assigned"]["company"]["employee"] = None
32 else:
33 org_logistician = await users_col.find_single(
34 "id",
35 assigned.company.employee.id,
36 )
37 if org_logistician is None:
38 raise BadParameterHTTPError("сотрудник")
39 update_data["assigned"]["company"]["employee"] = org_logistician
40 if "subsidiary" in assigned.company.model_fields_set:
41 if assigned.company.subsidiary is None:
42 update_data["assigned"]["company"]["subsidiary"] = None
43 else:
44 subsidiary = await subsidiaries_col.collection.find_one(
45 {"_id": assigned.company.subsidiary.id},
46 )
47 if not subsidiary:
48 raise BadParameterHTTPError("подразделение")
49 update_data["assigned"]["company"]["subsidiary"] = subsidiary
50 return update_data
53def notify_if_assigned(
54 order_update: OrderUpdateModel | None,
55 order_data: dict[str, Any] | OrderDBModel,
56):
57 if order_update is None:
58 return
59 if check_assigned(order_update):
60 notification_api.assignment(
61 OrderDBModel(**order_data)
62 if isinstance(order_data, dict)
63 else order_data
64 )