Coverage for api/handlers/int1c.py: 20%

22 statements  

« prev     ^ index     » next       coverage.py v7.6.2, created at 2024-10-10 03:02 +0300

1from typing import Any 

2 

3from database.integration_1c.integration_mappers import ( 

4 make_driver_1c, 

5 make_order_1c, 

6 make_org_1c, 

7 make_vehicle_1c, 

8) 

9from mongodb import ( 

10 drivers_col, 

11 orders_col, 

12 orgs_col, 

13 trailers_col, 

14 trucks_col, 

15 updates1c_col, 

16) 

17from sotrans_models.models.updates1c import ( 

18 Processing1CResult, 

19 Update1CModel, 

20 UpdateStatus, 

21) 

22 

23 

24async def on_get_updates( 

25 status: UpdateStatus, 

26) -> list[dict[str, Any] | Update1CModel]: 

27 pattern = {} 

28 if status is not None: 

29 pattern.update({"status_1c": status.value}) 

30 updates = await updates1c_col.find_batch(pattern) 

31 mapped_entities = [] 

32 entity_mapper_dispatcher = { 

33 orgs_col.collection_name: make_org_1c, 

34 trucks_col.collection_name: make_vehicle_1c, 

35 trailers_col.collection_name: make_vehicle_1c, 

36 drivers_col.collection_name: make_driver_1c, 

37 orders_col.collection_name: make_order_1c, 

38 } 

39 for up in updates: 

40 collection_name = up.get("collection") 

41 args = ( 

42 (False,) if collection_name == trailers_col.collection_name else () 

43 ) 

44 payload = entity_mapper_dispatcher[collection_name](up, *args) 

45 if not isinstance(payload, dict): 

46 payload = await payload 

47 mapped_entities.append( 

48 Update1CModel( 

49 collection=collection_name, 

50 mongo_id=up["_id"], 

51 status=up["status_1c"], 

52 code=up.get("code_1c"), 

53 payload=payload, 

54 ) 

55 ) 

56 return mapped_entities 

57 

58 

59async def apply_processing_results(updates: list[Processing1CResult]): 

60 for update in updates: 

61 await updates1c_col.update_by_id( 

62 update.id, 

63 {"status_1c": update.status.value, "code_1c": update.code}, 

64 )