Coverage for api/handlers/orders/recommendations.py: 32%
69 statements
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
1import asyncio
2import itertools
3from typing import Any
5from bson import ObjectId
6from handlers.authorization.check_role import has_role
7from handlers.grabbers.orders import orders_data_grabber
8from mongodb import orders_col, orgs_col, users_col
9from services.notifications.director import notification_api
10from services.recommendations import suggestion_api
11from sotrans_models.models.orders.order import (
12 OrderDBModel,
13 OrderStatus,
14 Recommendation,
15 RecommendationMatch,
16)
17from sotrans_models.models.responses import GenericGetListResponse
18from sotrans_models.models.roles import SotransRole
19from sotrans_models.models.users import (
20 SotransOIDCUserModel,
21 SotransUserDBFieldsModel,
22)
23from utils.data_grabber import BaseGetListQueryParams, adjust_search_query
24from utils.helper import get_org_oid
27def filter_closure():
28 recs_in = set()
30 def filter_recs(rec: RecommendationMatch):
31 if rec.recommendation_id in recs_in:
32 return False
33 recs_in.add(rec.recommendation_id)
34 return True
36 return filter_recs
39async def get_recommendations_data(
40 org_oid: ObjectId,
41) -> list[RecommendationMatch]:
42 orders = await orders_col.find_batch(
43 {
44 OrderDBModel.carrier.id: org_oid,
45 OrderDBModel.status: {
46 "$in": [
47 OrderStatus.confirmed.value,
48 OrderStatus.unverified.value,
49 OrderStatus.active.value,
50 ]
51 },
52 },
53 projection={"id": 1},
54 )
55 oids = (order["id"] for order in orders)
56 tasks = [
57 suggestion_api.get_recommendations_for_target(oid) for oid in oids
58 ]
59 recommendations_results = await asyncio.gather(*tasks)
60 recs_with_repetitions: list[RecommendationMatch] = list(
61 itertools.chain(*recommendations_results)
62 )
64 return list(filter(filter_closure(), recs_with_repetitions))
67async def get_all_recommendations(
68 user: SotransOIDCUserModel, params: BaseGetListQueryParams
69):
70 rec_meta = await get_recommendations_data(get_org_oid(user))
71 params.where = adjust_search_query(
72 params.where,
73 "id",
74 {str(rm.recommendation_id) for rm in rec_meta},
75 )
76 recommended_orders = await orders_data_grabber.get_list(params, user)
78 items = []
79 meta_map = {meta.recommendation_id: meta for meta in rec_meta}
80 for order in recommended_orders.items:
81 meta = meta_map.get(order.id)
82 if meta is None:
83 continue
84 items.append(
85 Recommendation(
86 **order.model_dump(),
87 time_gap_hours=meta.time_gap_hours,
88 distance_km=meta.distance_km,
89 )
90 )
91 return GenericGetListResponse[Recommendation](
92 items=items, total=recommended_orders.total
93 )
96async def on_get_recommendations(
97 order_id: ObjectId, user: SotransOIDCUserModel
98) -> list[dict[str, Any] | Recommendation]:
99 if has_role(user, SotransRole.carrier_logistician) and not has_role(
100 user, SotransRole.company_director
101 ):
102 order = await orders_col.collection.find_one(
103 {
104 OrderDBModel.status: {"$ne": OrderStatus.appointment.value},
105 "$or": [
106 {OrderDBModel.status: OrderStatus.exchange.value},
107 {OrderDBModel.carrier.id: get_org_oid(user)},
108 ],
109 "id": order_id,
110 }
111 )
112 if not order:
113 return []
114 recommendations_meta = await suggestion_api.get_recommendations_for_target(
115 order_id
116 )
117 if not recommendations_meta:
118 return []
119 recs: list[Recommendation] = []
120 for meta in recommendations_meta:
121 recommended_data = await orders_col.find_single(
122 "id", meta.recommendation_id
123 )
124 if not recommended_data:
125 continue
126 rec = Recommendation(
127 **recommended_data,
128 distance_km=meta.distance_km,
129 time_gap_hours=meta.time_gap_hours,
130 )
131 recs.append(rec)
132 return recs
135async def notify_with_recommended_summary():
136 carriers = await orgs_col.find_batch({}, projection={"_id": 1})
137 for carrier in carriers:
138 cid = carrier["_id"]
139 recs_count = len(await get_recommendations_data(cid))
140 if recs_count == 0:
141 continue
142 employees = await users_col.find_batch(
143 {SotransUserDBFieldsModel.organization_id: str(cid)}
144 )
145 if not employees:
146 continue
147 notification_api.notify_with_recommended(employees, recs_count)