Coverage for api/handlers/orders/recommendations.py: 32%

69 statements  

« prev     ^ index     » next       coverage.py v7.6.2, created at 2024-10-10 03:02 +0300

1import asyncio 

2import itertools 

3from typing import Any 

4 

5from bson import ObjectId 

6from handlers.authorization.check_role import has_role 

7from handlers.grabbers.orders import orders_data_grabber 

8from mongodb import orders_col, orgs_col, users_col 

9from services.notifications.director import notification_api 

10from services.recommendations import suggestion_api 

11from sotrans_models.models.orders.order import ( 

12 OrderDBModel, 

13 OrderStatus, 

14 Recommendation, 

15 RecommendationMatch, 

16) 

17from sotrans_models.models.responses import GenericGetListResponse 

18from sotrans_models.models.roles import SotransRole 

19from sotrans_models.models.users import ( 

20 SotransOIDCUserModel, 

21 SotransUserDBFieldsModel, 

22) 

23from utils.data_grabber import BaseGetListQueryParams, adjust_search_query 

24from utils.helper import get_org_oid 

25 

26 

27def filter_closure(): 

28 recs_in = set() 

29 

30 def filter_recs(rec: RecommendationMatch): 

31 if rec.recommendation_id in recs_in: 

32 return False 

33 recs_in.add(rec.recommendation_id) 

34 return True 

35 

36 return filter_recs 

37 

38 

39async def get_recommendations_data( 

40 org_oid: ObjectId, 

41) -> list[RecommendationMatch]: 

42 orders = await orders_col.find_batch( 

43 { 

44 OrderDBModel.carrier.id: org_oid, 

45 OrderDBModel.status: { 

46 "$in": [ 

47 OrderStatus.confirmed.value, 

48 OrderStatus.unverified.value, 

49 OrderStatus.active.value, 

50 ] 

51 }, 

52 }, 

53 projection={"id": 1}, 

54 ) 

55 oids = (order["id"] for order in orders) 

56 tasks = [ 

57 suggestion_api.get_recommendations_for_target(oid) for oid in oids 

58 ] 

59 recommendations_results = await asyncio.gather(*tasks) 

60 recs_with_repetitions: list[RecommendationMatch] = list( 

61 itertools.chain(*recommendations_results) 

62 ) 

63 

64 return list(filter(filter_closure(), recs_with_repetitions)) 

65 

66 

67async def get_all_recommendations( 

68 user: SotransOIDCUserModel, params: BaseGetListQueryParams 

69): 

70 rec_meta = await get_recommendations_data(get_org_oid(user)) 

71 params.where = adjust_search_query( 

72 params.where, 

73 "id", 

74 {str(rm.recommendation_id) for rm in rec_meta}, 

75 ) 

76 recommended_orders = await orders_data_grabber.get_list(params, user) 

77 

78 items = [] 

79 meta_map = {meta.recommendation_id: meta for meta in rec_meta} 

80 for order in recommended_orders.items: 

81 meta = meta_map.get(order.id) 

82 if meta is None: 

83 continue 

84 items.append( 

85 Recommendation( 

86 **order.model_dump(), 

87 time_gap_hours=meta.time_gap_hours, 

88 distance_km=meta.distance_km, 

89 ) 

90 ) 

91 return GenericGetListResponse[Recommendation]( 

92 items=items, total=recommended_orders.total 

93 ) 

94 

95 

96async def on_get_recommendations( 

97 order_id: ObjectId, user: SotransOIDCUserModel 

98) -> list[dict[str, Any] | Recommendation]: 

99 if has_role(user, SotransRole.carrier_logistician) and not has_role( 

100 user, SotransRole.company_director 

101 ): 

102 order = await orders_col.collection.find_one( 

103 { 

104 OrderDBModel.status: {"$ne": OrderStatus.appointment.value}, 

105 "$or": [ 

106 {OrderDBModel.status: OrderStatus.exchange.value}, 

107 {OrderDBModel.carrier.id: get_org_oid(user)}, 

108 ], 

109 "id": order_id, 

110 } 

111 ) 

112 if not order: 

113 return [] 

114 recommendations_meta = await suggestion_api.get_recommendations_for_target( 

115 order_id 

116 ) 

117 if not recommendations_meta: 

118 return [] 

119 recs: list[Recommendation] = [] 

120 for meta in recommendations_meta: 

121 recommended_data = await orders_col.find_single( 

122 "id", meta.recommendation_id 

123 ) 

124 if not recommended_data: 

125 continue 

126 rec = Recommendation( 

127 **recommended_data, 

128 distance_km=meta.distance_km, 

129 time_gap_hours=meta.time_gap_hours, 

130 ) 

131 recs.append(rec) 

132 return recs 

133 

134 

135async def notify_with_recommended_summary(): 

136 carriers = await orgs_col.find_batch({}, projection={"_id": 1}) 

137 for carrier in carriers: 

138 cid = carrier["_id"] 

139 recs_count = len(await get_recommendations_data(cid)) 

140 if recs_count == 0: 

141 continue 

142 employees = await users_col.find_batch( 

143 {SotransUserDBFieldsModel.organization_id: str(cid)} 

144 ) 

145 if not employees: 

146 continue 

147 notification_api.notify_with_recommended(employees, recs_count)