Coverage for api/services/notifications/director.py: 17%
282 statements
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
1import asyncio
2import collections
3import json
4import uuid
5from typing import Any
7from bson import ObjectId
8from config import MIN_COUNT_TO_NOTIFY_ON_SCRAPED, NotificationAPIConfig
9from logging_config import logger
10from mongodb import docs_col, drivers_col, trailers_col, trucks_col, users_col
11from pydantic import BaseModel
12from services.microservice_connector import NotificationConnector
13from services.notifications.builder import NotificationBuilder
14from sotrans_models.models.misc.client import ClientDBModel
15from sotrans_models.models.misc.document import DocumentDBModel
16from sotrans_models.models.misc.verification import VerificationStatus
17from sotrans_models.models.orders.bid import BidDBModel
18from sotrans_models.models.orders.order import OrderDBModel, OrderStatus
19from sotrans_models.models.organizations import (
20 OrganizationDBModel,
21 SubsidiaryDBModel,
22)
23from sotrans_models.models.resources.drivers import DriverDBModel
24from sotrans_models.models.resources.trailers import TrailerDBModel
25from sotrans_models.models.resources.trucks import TruckDBModel
26from sotrans_models.models.roles import SotransRole
27from sotrans_models.models.services.notifications import (
28 CastType,
29 NotificationCreateBatchModel,
30 NotificationCreateModel,
31 NotificationCreateMulticastModel,
32)
33from sotrans_models.models.users import (
34 SotransUserDBFieldsModel,
35 SotransUserDBModel,
36)
37from utils.dt_utils import get_current_datetime
38from utils.helper import is_organization_filled
41class ClientToNotifyAbout(BaseModel):
42 subsidiary: SubsidiaryDBModel | None = None
43 responsible: list[SotransUserDBModel] | None = None
46class ScrapedNotificationContainer(BaseModel):
47 order_by_clients: list[ClientToNotifyAbout]
50class NotificationDirector:
51 connector: NotificationConnector
52 builder: NotificationBuilder
54 def __init__(
55 self, connector: NotificationConnector, builder: NotificationBuilder
56 ):
57 self.connector = connector
58 self.builder = builder
60 def _make_request(
61 self,
62 notification_model: NotificationCreateBatchModel
63 | NotificationCreateModel
64 | NotificationCreateMulticastModel
65 | None,
66 ):
67 if notification_model is None:
68 return
69 asyncio.create_task(
70 self.connector.make_request(
71 json.loads(notification_model.model_dump_json()),
72 is_multicast=isinstance(
73 notification_model, NotificationCreateMulticastModel
74 ),
75 )
76 )
78 async def order_archived(self, order: OrderDBModel):
79 if not (order.carrier and order.carrier.id):
80 return
81 users = await get_users_to_notify(order.carrier.id)
82 if not (users or order.carrier.contact_user):
83 return
84 notification_model = self.builder.with_preset_type(
85 "order_archived"
86 ).with_object(order.model_dump(format_ids=False))
87 if users:
88 notification_model = notification_model.with_cast_many(
89 CastType.organization, users
90 )
91 if order.carrier.contact_user:
92 notification_model = notification_model.with_cast_one(
93 CastType.personal, order.carrier.contact_user
94 )
95 self._make_request(notification_model.build())
97 async def success_inn_verification(
98 self, organization: OrganizationDBModel
99 ):
100 users = await get_users_to_notify(organization.id)
101 if len(users) == 0:
102 if organization.owner is None:
103 raise ValueError(
104 "success_inn_verification notification requires employees or owner present in organization"
105 )
106 users = [organization.owner]
107 notification_model = (
108 self.builder.with_cast_many(CastType.organization, users)
109 .with_preset_type("success_inn_verification")
110 .with_object(organization.model_dump(format_ids=False))
111 .build()
112 )
113 self._make_request(notification_model)
115 async def better_bid(self, order: OrderDBModel, previous_bid: BidDBModel):
116 if not (previous_bid.carrier and previous_bid.carrier.id):
117 return
118 employees = await get_users_to_notify(previous_bid.carrier.id)
119 notification_model = (
120 self.builder.with_cast_one(CastType.personal, previous_bid.owner)
121 .with_cast_many(CastType.organization, employees)
122 .with_preset_type("better_bid")
123 .with_object(order.model_dump(format_ids=False))
124 .build()
125 )
126 self._make_request(notification_model)
128 async def organization_filled(
129 self,
130 organization: dict,
131 ):
132 is_filled = is_organization_filled(organization)
133 if is_filled is False:
134 return None
135 organization_model = OrganizationDBModel(**organization)
136 employees = await get_users_to_notify(organization_model.id)
137 notification_model = (
138 self.builder.with_cast_many(CastType.organization, employees)
139 .with_preset_type("success_organization_filled")
140 .with_object(organization_model.model_dump(format_ids=False))
141 .build()
142 )
143 self._make_request(notification_model)
145 async def order_is_active(self, order: OrderDBModel):
146 if not (order.carrier and order.carrier.id):
147 return
148 employees = await get_users_to_notify(order.carrier.id)
149 notification_model = (
150 self.builder.with_cast_many(CastType.organization, employees)
151 .with_preset_type("order_active")
152 .with_cast_one(CastType.personal, order.carrier.contact_user)
153 .with_object(order.model_dump(format_ids=False))
154 .build()
155 )
156 self._make_request(notification_model)
158 async def order_confirmed(self, order: OrderDBModel):
159 if not (order.carrier and order.carrier.id):
160 return
161 employees = await get_users_to_notify(order.carrier.id)
162 notification = (
163 self.builder.with_cast_many(CastType.organization, employees)
164 .with_preset_type("order_confirmed")
165 .with_object(order.model_dump(format_ids=False))
166 )
167 if order.carrier and order.carrier.contact_user:
168 notification = notification.with_cast_one(
169 CastType.personal, order.carrier.contact_user
170 )
171 self._make_request(notification.build())
173 def admin_notification_on_parser(
174 self, error_data: dict, admins: list[SotransUserDBModel]
175 ):
176 notification_model = (
177 self.builder.with_preset_type("parser_failed")
178 .with_cast_many(CastType.organization, admins)
179 .with_object(error_data)
180 .build()
181 )
182 self._make_request(notification_model=notification_model)
184 async def order_canceled(self, order: OrderDBModel):
185 if not (order.carrier and order.carrier.id):
186 return
187 employees = await get_users_to_notify(order.carrier.id)
188 if not (employees or order.carrier.contact_user):
189 return
190 notification = self.builder.with_preset_type(
191 "carrier_canceled_order"
192 ).with_object(order.model_dump(format_ids=False))
193 if employees:
194 notification = notification.with_cast_many(
195 CastType.organization, employees
196 )
197 if order.carrier.contact_user:
198 notification = notification.with_cast_one(
199 CastType.personal, order.carrier.contact_user
200 )
201 self._make_request(notification.build())
203 async def order_reserved(
204 self,
205 order: OrderDBModel,
206 ):
207 if order.carrier is None:
208 return
209 employees: list[SotransUserDBModel] = await get_users_to_notify(
210 order.carrier.id
211 )
212 logger.warning(f"order_reserved, users: {len(employees)}")
213 notification = (
214 self.builder.with_cast_many(CastType.organization, employees)
215 .with_preset_type("order_reserved")
216 .with_object(order.model_dump(format_ids=False))
217 )
218 if order.carrier and order.carrier.contact_user:
219 notification = notification.with_cast_one(
220 CastType.personal, order.carrier.contact_user
221 )
222 self._make_request(notification.build())
224 async def verification_result(
225 self,
226 organization: OrganizationDBModel,
227 collection_name: str,
228 related: dict[str, Any],
229 status: VerificationStatus,
230 ):
231 if status == VerificationStatus.accepted:
232 result = "success"
233 else:
234 result = "failed"
235 if collection_name == docs_col.collection_name:
236 obj = DocumentDBModel(**related)
237 elif collection_name == drivers_col.collection_name:
238 obj = DriverDBModel(**related)
239 elif collection_name == trucks_col.collection_name:
240 obj = TruckDBModel(**related)
241 elif collection_name == trailers_col.collection_name:
242 obj = TrailerDBModel(**related)
243 else:
244 obj = OrganizationDBModel(**related)
245 entity_name = collection_name.rstrip("s")
246 preset = "_".join((result, entity_name, "verification"))
247 employees = await get_users_to_notify(organization.id)
248 notification_model = (
249 self.builder.with_cast_many(CastType.organization, employees)
250 .with_preset_type(preset)
251 .with_object(obj.model_dump(format_ids=False))
252 .build()
253 )
254 self._make_request(notification_model)
256 async def order_confirmed_timeout(
257 self, order: OrderDBModel, minutes_left: int
258 ):
259 if order.carrier is None:
260 return
261 employees = await get_users_to_notify(order.carrier.id)
262 notification = (
263 self.builder.with_cast_many(CastType.organization, employees)
264 .with_preset_type("order_confirmed_timeout_warning")
265 .with_object(
266 {
267 **order.model_dump(format_ids=False),
268 "minutes_left": minutes_left,
269 }
270 )
271 )
272 if order.carrier and order.carrier.contact_user:
273 notification = notification.with_cast_one(
274 CastType.personal, order.carrier.contact_user
275 )
276 self._make_request(notification.build())
278 def fill_companys_recepients(
279 self, order: OrderDBModel, made_by_user_id: uuid.UUID | None = None
280 ) -> NotificationBuilder | None:
281 if not (order.assigned and order.assigned.company):
282 return None
283 notification = self.builder
285 if order.assigned.company.subsidiary and (
286 order_subs_emps := (
287 order.assigned.company.subsidiary.employees
288 if made_by_user_id is None
289 or order.assigned.company.subsidiary.employees is None
290 else [
291 e
292 for e in order.assigned.company.subsidiary.employees
293 if e.id != made_by_user_id
294 ]
295 )
296 ):
297 notification = notification.with_cast_many(
298 CastType.subdivision, order_subs_emps
299 )
300 if order_emp := (
301 order.assigned.company.employee
302 if order.assigned.company.employee
303 and order.assigned.company.employee.id != made_by_user_id
304 else None
305 ):
306 notification = notification.with_cast_one(
307 CastType.order_responsible, order_emp
308 )
309 return notification
311 def company_order_reserved(self, order: OrderDBModel):
312 to_send = self.fill_companys_recepients(order)
313 if to_send is None:
314 return
315 self._make_request(
316 to_send.with_preset_type("company_order_reserved")
317 .with_object(
318 order.model_dump(format_ids=False),
319 )
320 .build()
321 )
323 def company_order_canceled(
324 self, order: OrderDBModel, by_user_id: uuid.UUID
325 ):
326 to_send = self.fill_companys_recepients(order, by_user_id)
327 if to_send is None:
328 return
329 self._make_request(
330 to_send.with_preset_type("company_canceled_order")
331 .with_object(
332 order.model_dump(format_ids=False),
333 )
334 .build()
335 )
337 def company_order_confirmed(self, order: OrderDBModel):
338 to_send = self.fill_companys_recepients(order)
339 if to_send is None:
340 return
341 self._make_request(
342 to_send.with_preset_type("company_confirmed_order")
343 .with_object(
344 order.model_dump(format_ids=False),
345 )
346 .build()
347 )
349 def company_order_unverified(self, order: OrderDBModel):
350 to_send = self.fill_companys_recepients(order)
351 if to_send is None:
352 return
353 self._make_request(
354 to_send.with_preset_type("company_unverified_order")
355 .with_object(
356 order.model_dump(format_ids=False),
357 )
358 .build()
359 )
361 def company_order_completed(self, order: OrderDBModel):
362 to_send = self.fill_companys_recepients(order)
363 if to_send is None:
364 return
365 self._make_request(
366 to_send.with_preset_type("company_order_completed")
367 .with_object(
368 order.model_dump(format_ids=False),
369 )
370 .build()
371 )
373 def company_auction_ends_soon(self, order: OrderDBModel):
374 to_send = self.fill_companys_recepients(order)
375 if to_send is None:
376 return
377 current_dt = get_current_datetime()
378 minutes_left = (
379 order.auction_end_time - current_dt.replace(tzinfo=None)
380 ).total_seconds() // 60
381 if minutes_left < 5:
382 return
383 self._make_request(
384 to_send.with_preset_type("company_auction_ends_soon")
385 .with_object(
386 {
387 **order.model_dump(format_ids=False),
388 **{"minutes_left": minutes_left},
389 },
390 )
391 .build()
392 )
394 def company_auction_ended(self, order: OrderDBModel):
395 to_send = self.fill_companys_recepients(order)
396 if to_send is None:
397 return
398 if order.status == OrderStatus.canceled:
399 to_send = to_send.with_preset_type("company_auction_ended_no_bid")
400 else:
401 to_send = to_send.with_preset_type("company_auction_ended_success")
402 self._make_request(
403 to_send.with_object( # type: ignore[union-attr]
404 order.model_dump(format_ids=False),
405 ).build()
406 )
408 def assignment(self, order: OrderDBModel):
409 to_send: NotificationBuilder = self.fill_companys_recepients( # type: ignore[assignment]
410 order
411 )
412 if to_send is None:
413 return
414 if order.status == OrderStatus.appointment:
415 to_send = to_send.with_preset_type(
416 f"company_{OrderStatus.appointment.value}_assigned_order"
417 )
418 elif order.status == OrderStatus.confirmed:
419 to_send = to_send.with_preset_type(
420 f"company_{OrderStatus.confirmed.value}_assigned_order"
421 )
422 else:
423 to_send = to_send.with_preset_type(
424 f"company_{OrderStatus.exchange.value}_assigned_order"
425 )
426 self._make_request(
427 to_send.with_object(
428 order.model_dump(format_ids=False),
429 ).build()
430 )
432 async def carriers_entity_verification_company_notification(
433 self,
434 obj: DriverDBModel
435 | TrailerDBModel
436 | TruckDBModel
437 | OrganizationDBModel,
438 ):
439 directors_of_comp = [
440 SotransUserDBModel(**u)
441 for u in await users_col.find_batch(
442 {"role": SotransRole.company_director.value}
443 )
444 ]
445 notification = self.builder.with_cast_many(
446 CastType.organization, directors_of_comp
447 )
448 if isinstance(obj, DriverDBModel):
449 notification = notification.with_preset_type(
450 "company_carriers_driver"
451 )
452 elif isinstance(obj, TruckDBModel):
453 notification = notification.with_preset_type(
454 "company_carriers_truck"
455 )
456 elif isinstance(obj, TrailerDBModel):
457 notification = notification.with_preset_type(
458 "company_carriers_trailer"
459 )
460 else:
461 notification = notification.with_preset_type(
462 "company_carriers_organization"
463 )
464 self._make_request(
465 notification.with_object(obj.model_dump(format_ids=False)).build()
466 )
468 def client_assigned(
469 self,
470 client: ClientDBModel,
471 managers: list[SotransUserDBModel] | SotransUserDBModel,
472 ):
473 notification = self.builder.with_preset_type(
474 "client_assigned"
475 ).with_object(client.model_dump(format_ids=False))
476 if isinstance(managers, list):
477 notification = notification.with_cast_many(
478 CastType.subdivision, managers
479 )
480 else:
481 notification = notification.with_cast_one(
482 CastType.client_responsible, managers
483 )
484 self._make_request(notification.build())
486 async def new_in_buffer(self, order: OrderDBModel):
487 notification = self.builder.with_preset_type(
488 "new_in_buffer"
489 ).with_object(order.model_dump(format_ids=False))
490 to_responsible = True
491 if not order.client.responsible:
492 to_responsible = False
493 if order.client.responsible:
494 notification = notification.with_cast_many(
495 CastType.client_responsible, order.client.responsible
496 )
497 if order.client.subsidiary and order.client.subsidiary.id:
498 sub_users = [
499 SotransUserDBModel(**u)
500 for u in await users_col.find_batch(
501 {
502 SotransUserDBFieldsModel.subsidiary_id: str(
503 order.client.subsidiary.id
504 )
505 }
506 )
507 ]
508 if not (sub_users or to_responsible):
509 return
510 notification = notification.with_cast_many(
511 CastType.subdivision, sub_users
512 )
513 else:
514 if not to_responsible:
515 return
516 self._make_request(notification.build())
518 async def _notify_subsidiaries_on_scraped(
519 self, clients: list[ClientToNotifyAbout]
520 ):
521 subdivision_counts: dict[ObjectId, int] = collections.defaultdict(int)
522 subsidiary_receivers: dict[
523 ObjectId, list[SotransUserDBModel]
524 ] = collections.defaultdict(list)
525 for client in clients:
526 if client.subsidiary is None or client.subsidiary.id is None:
527 continue
528 subdivision_counts[client.subsidiary.id] += 1
529 if client.subsidiary.id not in subsidiary_receivers:
530 users = await users_col.find_batch(
531 {
532 SotransUserDBFieldsModel.subsidiary_id: str(
533 client.subsidiary.id
534 )
535 }
536 )
537 subsidiary_receivers[client.subsidiary.id].extend(users)
538 for sc in subdivision_counts:
539 self._make_request(
540 self.builder.with_preset_type("orders_scraped")
541 .with_object({"count": subdivision_counts[sc]})
542 .with_cast_many(CastType.subdivision, subsidiary_receivers[sc])
543 .build()
544 )
546 def _notify_responsible_about_scraped(
547 self, clients: list[ClientToNotifyAbout]
548 ):
549 responsible_receivers: dict[uuid.UUID, int] = collections.defaultdict(
550 int
551 ) # receiver_id to count
552 users_map: dict[uuid.UUID, SotransUserDBModel] = {}
553 for cl in clients:
554 if cl.responsible is None:
555 continue
556 for r in cl.responsible:
557 responsible_receivers[r.id] += 1
558 if r.id not in users_map:
559 users_map[r.id] = r
560 for rr in responsible_receivers:
561 self._make_request(
562 self.builder.with_preset_type("orders_scraped")
563 .with_object({"count": responsible_receivers[rr]})
564 .with_cast_one(CastType.client_responsible, users_map[rr])
565 .build()
566 )
568 def _client_has_responsible(self, client: ClientDBModel):
569 sub = client.subsidiary and client.subsidiary.id
570 resp = bool(client.responsible)
571 return bool(resp or sub)
573 async def orders_scraped(
574 self,
575 scraped_orders: list[OrderDBModel],
576 container: ScrapedNotificationContainer,
577 ):
578 clients_to_notify = [
579 ClientToNotifyAbout(**order.client.model_dump())
580 for order in scraped_orders
581 if order.client and self._client_has_responsible(order.client)
582 ]
583 clients = container.order_by_clients + clients_to_notify
584 if len(clients) < MIN_COUNT_TO_NOTIFY_ON_SCRAPED:
585 container.order_by_clients = clients
586 return
587 self._notify_responsible_about_scraped(clients)
588 await self._notify_subsidiaries_on_scraped(clients)
589 container.order_by_clients = []
591 def confirmation_ended_company(self, order: OrderDBModel):
592 to_send = self.fill_companys_recepients(order)
593 if to_send is None:
594 return
595 notification = (
596 to_send.with_preset_type("company_confirmation_expired")
597 .with_object(order.model_dump(format_ids=False))
598 .build()
599 )
600 self._make_request(notification)
602 def user_registered(self, user: SotransUserDBModel):
603 self._make_request(
604 self.builder.with_preset_type("user_registered")
605 .with_cast_one(CastType.personal, user)
606 .with_object({"password": user.password})
607 .build()
608 )
610 def notify_with_recommended(self, employees: list[dict], count: int):
611 self._make_request(
612 self.builder.with_preset_type("order_recommended")
613 .with_cast_many(CastType.organization, employees)
614 .with_object({"count": count})
615 .build()
616 )
619notification_api = NotificationDirector(
620 NotificationConnector(
621 NotificationAPIConfig.NOTIFICATION_API_URL,
622 ),
623 NotificationBuilder(),
624)
627async def get_users_to_notify(
628 organization_id: ObjectId,
629) -> list[SotransUserDBModel]:
630 return [
631 SotransUserDBModel(**u)
632 for u in await users_col.find_batch(
633 {SotransUserDBFieldsModel.organization_id: str(organization_id)}
634 )
635 ]