Coverage for api/operations/assignment.py: 18%

38 statements  

« prev     ^ index     » next       coverage.py v7.6.2, created at 2024-10-10 03:02 +0300

1import asyncio 

2import uuid 

3from typing import Any 

4 

5from exceptions import BadParameterHTTPError 

6from mongodb import orgs_col, subsidiaries_col, users_col 

7from services.notifications.director import notification_api 

8from sotrans_models.models.orders.order import ( 

9 Assignment, 

10 OrderDBModel, 

11 OrderUpdateModel, 

12) 

13from sotrans_models.models.organizations import CreateSubsidiaryModel 

14from utils.helper import check_assigned 

15 

16 

17async def data_from_assigned(order: OrderUpdateModel | None) -> dict: 

18 if not (order and order.assigned and order.assigned.company): 

19 return {} 

20 assigned = order.assigned 

21 if not ( 

22 (assigned.company.subsidiary and assigned.company.subsidiary.id) 

23 or (assigned.company.employee and assigned.company.employee.id) 

24 ): 

25 return {} 

26 if isinstance(assigned.company.subsidiary, CreateSubsidiaryModel): 

27 raise BadParameterHTTPError("только существующие подразделения") 

28 update_data: dict = {"assigned": {"company": {}}} 

29 if "employee" in assigned.company.model_fields_set: 

30 if assigned.company.employee is None: 

31 update_data["assigned"]["company"]["employee"] = None 

32 else: 

33 org_logistician = await users_col.find_single( 

34 "id", 

35 assigned.company.employee.id, 

36 ) 

37 if org_logistician is None: 

38 raise BadParameterHTTPError("сотрудник") 

39 update_data["assigned"]["company"]["employee"] = org_logistician 

40 if "subsidiary" in assigned.company.model_fields_set: 

41 if assigned.company.subsidiary is None: 

42 update_data["assigned"]["company"]["subsidiary"] = None 

43 else: 

44 subsidiary = await subsidiaries_col.collection.find_one( 

45 {"_id": assigned.company.subsidiary.id}, 

46 ) 

47 if not subsidiary: 

48 raise BadParameterHTTPError("подразделение") 

49 update_data["assigned"]["company"]["subsidiary"] = subsidiary 

50 return update_data 

51 

52 

53def notify_if_assigned( 

54 order_update: OrderUpdateModel | None, 

55 order_data: dict[str, Any] | OrderDBModel, 

56): 

57 if order_update is None: 

58 return 

59 if check_assigned(order_update): 

60 notification_api.assignment( 

61 OrderDBModel(**order_data) 

62 if isinstance(order_data, dict) 

63 else order_data 

64 )