Coverage for api/handlers/resources/trucks.py: 43%

59 statements  

« prev     ^ index     » next       coverage.py v7.6.2, created at 2024-10-10 03:02 +0300

1import asyncio 

2from typing import Any 

3 

4from bson import ObjectId 

5from database.entity import update_etag_and_text_search 

6from database.integration_1c.savers import remove_from_1c, update_in_1c 

7from database.updater import update_resource_in_order 

8from exceptions import BadParameterHTTPError, NotFoundHTTPError 

9from handlers.grabbers.resources import trucks_data_grabber 

10from handlers.resources import documents_creation, documents_on_update 

11from handlers.resources.common import assert_resource_manage 

12from handlers.resources.processor import process_final_entity 

13from mongodb import docs_col, trucks_col 

14from pymongo import ReturnDocument 

15from pymongo.errors import DuplicateKeyError 

16from sotrans_models.models.misc.document import DocumentDBModel 

17from sotrans_models.models.resources.trucks import ( 

18 TruckCreateModel, 

19 TruckDBModel, 

20 TruckUpdateModel, 

21) 

22from sotrans_models.models.responses import GenericGetListResponse 

23from sotrans_models.models.users import SotransOIDCUserModel 

24from sotrans_models.utils.text_mappers import get_vehicles_text_search 

25from utils.data_grabber import BaseGetListQueryParams, adjust_search_query 

26from utils.dt_utils import get_current_datetime 

27from utils.helper import EntityHash, get_org_oid 

28 

29 

30async def on_create_truck( 

31 creation_data: TruckCreateModel, executor: SotransOIDCUserModel 

32) -> TruckDBModel | dict[str, Any]: 

33 org_id = get_org_oid(executor) 

34 data = creation_data.model_dump(exclude_unset=True) 

35 if TruckDBModel.documents[0] in data: 

36 del data["documents"] 

37 db_model = TruckDBModel( 

38 organization_id=org_id, 

39 created_at=get_current_datetime(), 

40 is_active=True, 

41 **data, 

42 ) 

43 try: 

44 created = await trucks_col.create(db_model.model_dump()) 

45 except DuplicateKeyError: 

46 raise BadParameterHTTPError("номер уже зарегистрирован") 

47 updated = await documents_creation( 

48 creation_data, trucks_col, created, executor 

49 ) 

50 return await process_final_entity( 

51 updated if updated else created, trucks_col, TruckDBModel 

52 ) 

53 

54 

55async def on_get_trucks( 

56 params: BaseGetListQueryParams, 

57) -> GenericGetListResponse[TruckDBModel]: 

58 return await trucks_data_grabber.get_list(params) 

59 

60 

61async def on_get_truck( 

62 truck_id: ObjectId, executor: SotransOIDCUserModel 

63) -> TruckDBModel: 

64 return await assert_resource_manage(truck_id, trucks_col, executor) 

65 

66 

67async def on_update_truck( 

68 truck_id: ObjectId, 

69 update_data: TruckUpdateModel, 

70 executor: SotransOIDCUserModel, 

71) -> TruckDBModel: 

72 await assert_resource_manage( 

73 truck_id, trucks_col, executor, update_data.etag 

74 ) 

75 data = await documents_on_update( 

76 trucks_col, truck_id, update_data, executor 

77 ) 

78 etag_q = {"etag": update_data.etag} if update_data.etag else {} 

79 try: 

80 truck = await trucks_col.collection.find_one_and_update( 

81 {"_id": truck_id} | etag_q, 

82 {"$set": data}, 

83 return_document=ReturnDocument.AFTER, 

84 ) 

85 except DuplicateKeyError: 

86 raise BadParameterHTTPError("номер уже зарегистрирован") 

87 if truck is None: 

88 raise NotFoundHTTPError("truck") 

89 await update_etag_and_text_search( 

90 truck, trucks_col, TruckDBModel, get_vehicles_text_search 

91 ) 

92 tm = TruckDBModel(**truck) 

93 asyncio.create_task(update_in_1c(truck_id, trucks_col.collection_name, tm)) 

94 await update_resource_in_order(truck_id, truck, "truck") 

95 return tm 

96 

97 

98async def on_delete_truck( 

99 truck_id: ObjectId, executor: SotransOIDCUserModel, etag: EntityHash | None 

100): 

101 await assert_resource_manage(truck_id, trucks_col, executor, etag) 

102 await trucks_col.delete_by_id(truck_id) 

103 await docs_col.collection.update_many( 

104 {DocumentDBModel.object_id: truck_id}, 

105 {"$set": {DocumentDBModel.deleted_at: get_current_datetime()}}, 

106 ) 

107 await update_resource_in_order(truck_id, None, "truck") 

108 asyncio.create_task(remove_from_1c(truck_id, trucks_col.collection_name))