Coverage for api/services/notifications/director.py: 17%

282 statements  

« prev     ^ index     » next       coverage.py v7.6.2, created at 2024-10-10 03:02 +0300

1import asyncio 

2import collections 

3import json 

4import uuid 

5from typing import Any 

6 

7from bson import ObjectId 

8from config import MIN_COUNT_TO_NOTIFY_ON_SCRAPED, NotificationAPIConfig 

9from logging_config import logger 

10from mongodb import docs_col, drivers_col, trailers_col, trucks_col, users_col 

11from pydantic import BaseModel 

12from services.microservice_connector import NotificationConnector 

13from services.notifications.builder import NotificationBuilder 

14from sotrans_models.models.misc.client import ClientDBModel 

15from sotrans_models.models.misc.document import DocumentDBModel 

16from sotrans_models.models.misc.verification import VerificationStatus 

17from sotrans_models.models.orders.bid import BidDBModel 

18from sotrans_models.models.orders.order import OrderDBModel, OrderStatus 

19from sotrans_models.models.organizations import ( 

20 OrganizationDBModel, 

21 SubsidiaryDBModel, 

22) 

23from sotrans_models.models.resources.drivers import DriverDBModel 

24from sotrans_models.models.resources.trailers import TrailerDBModel 

25from sotrans_models.models.resources.trucks import TruckDBModel 

26from sotrans_models.models.roles import SotransRole 

27from sotrans_models.models.services.notifications import ( 

28 CastType, 

29 NotificationCreateBatchModel, 

30 NotificationCreateModel, 

31 NotificationCreateMulticastModel, 

32) 

33from sotrans_models.models.users import ( 

34 SotransUserDBFieldsModel, 

35 SotransUserDBModel, 

36) 

37from utils.dt_utils import get_current_datetime 

38from utils.helper import is_organization_filled 

39 

40 

41class ClientToNotifyAbout(BaseModel): 

42 subsidiary: SubsidiaryDBModel | None = None 

43 responsible: list[SotransUserDBModel] | None = None 

44 

45 

46class ScrapedNotificationContainer(BaseModel): 

47 order_by_clients: list[ClientToNotifyAbout] 

48 

49 

50class NotificationDirector: 

51 connector: NotificationConnector 

52 builder: NotificationBuilder 

53 

54 def __init__( 

55 self, connector: NotificationConnector, builder: NotificationBuilder 

56 ): 

57 self.connector = connector 

58 self.builder = builder 

59 

60 def _make_request( 

61 self, 

62 notification_model: NotificationCreateBatchModel 

63 | NotificationCreateModel 

64 | NotificationCreateMulticastModel 

65 | None, 

66 ): 

67 if notification_model is None: 

68 return 

69 asyncio.create_task( 

70 self.connector.make_request( 

71 json.loads(notification_model.model_dump_json()), 

72 is_multicast=isinstance( 

73 notification_model, NotificationCreateMulticastModel 

74 ), 

75 ) 

76 ) 

77 

78 async def order_archived(self, order: OrderDBModel): 

79 if not (order.carrier and order.carrier.id): 

80 return 

81 users = await get_users_to_notify(order.carrier.id) 

82 if not (users or order.carrier.contact_user): 

83 return 

84 notification_model = self.builder.with_preset_type( 

85 "order_archived" 

86 ).with_object(order.model_dump(format_ids=False)) 

87 if users: 

88 notification_model = notification_model.with_cast_many( 

89 CastType.organization, users 

90 ) 

91 if order.carrier.contact_user: 

92 notification_model = notification_model.with_cast_one( 

93 CastType.personal, order.carrier.contact_user 

94 ) 

95 self._make_request(notification_model.build()) 

96 

97 async def success_inn_verification( 

98 self, organization: OrganizationDBModel 

99 ): 

100 users = await get_users_to_notify(organization.id) 

101 if len(users) == 0: 

102 if organization.owner is None: 

103 raise ValueError( 

104 "success_inn_verification notification requires employees or owner present in organization" 

105 ) 

106 users = [organization.owner] 

107 notification_model = ( 

108 self.builder.with_cast_many(CastType.organization, users) 

109 .with_preset_type("success_inn_verification") 

110 .with_object(organization.model_dump(format_ids=False)) 

111 .build() 

112 ) 

113 self._make_request(notification_model) 

114 

115 async def better_bid(self, order: OrderDBModel, previous_bid: BidDBModel): 

116 if not (previous_bid.carrier and previous_bid.carrier.id): 

117 return 

118 employees = await get_users_to_notify(previous_bid.carrier.id) 

119 notification_model = ( 

120 self.builder.with_cast_one(CastType.personal, previous_bid.owner) 

121 .with_cast_many(CastType.organization, employees) 

122 .with_preset_type("better_bid") 

123 .with_object(order.model_dump(format_ids=False)) 

124 .build() 

125 ) 

126 self._make_request(notification_model) 

127 

128 async def organization_filled( 

129 self, 

130 organization: dict, 

131 ): 

132 is_filled = is_organization_filled(organization) 

133 if is_filled is False: 

134 return None 

135 organization_model = OrganizationDBModel(**organization) 

136 employees = await get_users_to_notify(organization_model.id) 

137 notification_model = ( 

138 self.builder.with_cast_many(CastType.organization, employees) 

139 .with_preset_type("success_organization_filled") 

140 .with_object(organization_model.model_dump(format_ids=False)) 

141 .build() 

142 ) 

143 self._make_request(notification_model) 

144 

145 async def order_is_active(self, order: OrderDBModel): 

146 if not (order.carrier and order.carrier.id): 

147 return 

148 employees = await get_users_to_notify(order.carrier.id) 

149 notification_model = ( 

150 self.builder.with_cast_many(CastType.organization, employees) 

151 .with_preset_type("order_active") 

152 .with_cast_one(CastType.personal, order.carrier.contact_user) 

153 .with_object(order.model_dump(format_ids=False)) 

154 .build() 

155 ) 

156 self._make_request(notification_model) 

157 

158 async def order_confirmed(self, order: OrderDBModel): 

159 if not (order.carrier and order.carrier.id): 

160 return 

161 employees = await get_users_to_notify(order.carrier.id) 

162 notification = ( 

163 self.builder.with_cast_many(CastType.organization, employees) 

164 .with_preset_type("order_confirmed") 

165 .with_object(order.model_dump(format_ids=False)) 

166 ) 

167 if order.carrier and order.carrier.contact_user: 

168 notification = notification.with_cast_one( 

169 CastType.personal, order.carrier.contact_user 

170 ) 

171 self._make_request(notification.build()) 

172 

173 def admin_notification_on_parser( 

174 self, error_data: dict, admins: list[SotransUserDBModel] 

175 ): 

176 notification_model = ( 

177 self.builder.with_preset_type("parser_failed") 

178 .with_cast_many(CastType.organization, admins) 

179 .with_object(error_data) 

180 .build() 

181 ) 

182 self._make_request(notification_model=notification_model) 

183 

184 async def order_canceled(self, order: OrderDBModel): 

185 if not (order.carrier and order.carrier.id): 

186 return 

187 employees = await get_users_to_notify(order.carrier.id) 

188 if not (employees or order.carrier.contact_user): 

189 return 

190 notification = self.builder.with_preset_type( 

191 "carrier_canceled_order" 

192 ).with_object(order.model_dump(format_ids=False)) 

193 if employees: 

194 notification = notification.with_cast_many( 

195 CastType.organization, employees 

196 ) 

197 if order.carrier.contact_user: 

198 notification = notification.with_cast_one( 

199 CastType.personal, order.carrier.contact_user 

200 ) 

201 self._make_request(notification.build()) 

202 

203 async def order_reserved( 

204 self, 

205 order: OrderDBModel, 

206 ): 

207 if order.carrier is None: 

208 return 

209 employees: list[SotransUserDBModel] = await get_users_to_notify( 

210 order.carrier.id 

211 ) 

212 logger.warning(f"order_reserved, users: {len(employees)}") 

213 notification = ( 

214 self.builder.with_cast_many(CastType.organization, employees) 

215 .with_preset_type("order_reserved") 

216 .with_object(order.model_dump(format_ids=False)) 

217 ) 

218 if order.carrier and order.carrier.contact_user: 

219 notification = notification.with_cast_one( 

220 CastType.personal, order.carrier.contact_user 

221 ) 

222 self._make_request(notification.build()) 

223 

224 async def verification_result( 

225 self, 

226 organization: OrganizationDBModel, 

227 collection_name: str, 

228 related: dict[str, Any], 

229 status: VerificationStatus, 

230 ): 

231 if status == VerificationStatus.accepted: 

232 result = "success" 

233 else: 

234 result = "failed" 

235 if collection_name == docs_col.collection_name: 

236 obj = DocumentDBModel(**related) 

237 elif collection_name == drivers_col.collection_name: 

238 obj = DriverDBModel(**related) 

239 elif collection_name == trucks_col.collection_name: 

240 obj = TruckDBModel(**related) 

241 elif collection_name == trailers_col.collection_name: 

242 obj = TrailerDBModel(**related) 

243 else: 

244 obj = OrganizationDBModel(**related) 

245 entity_name = collection_name.rstrip("s") 

246 preset = "_".join((result, entity_name, "verification")) 

247 employees = await get_users_to_notify(organization.id) 

248 notification_model = ( 

249 self.builder.with_cast_many(CastType.organization, employees) 

250 .with_preset_type(preset) 

251 .with_object(obj.model_dump(format_ids=False)) 

252 .build() 

253 ) 

254 self._make_request(notification_model) 

255 

256 async def order_confirmed_timeout( 

257 self, order: OrderDBModel, minutes_left: int 

258 ): 

259 if order.carrier is None: 

260 return 

261 employees = await get_users_to_notify(order.carrier.id) 

262 notification = ( 

263 self.builder.with_cast_many(CastType.organization, employees) 

264 .with_preset_type("order_confirmed_timeout_warning") 

265 .with_object( 

266 { 

267 **order.model_dump(format_ids=False), 

268 "minutes_left": minutes_left, 

269 } 

270 ) 

271 ) 

272 if order.carrier and order.carrier.contact_user: 

273 notification = notification.with_cast_one( 

274 CastType.personal, order.carrier.contact_user 

275 ) 

276 self._make_request(notification.build()) 

277 

278 def fill_companys_recepients( 

279 self, order: OrderDBModel, made_by_user_id: uuid.UUID | None = None 

280 ) -> NotificationBuilder | None: 

281 if not (order.assigned and order.assigned.company): 

282 return None 

283 notification = self.builder 

284 

285 if order.assigned.company.subsidiary and ( 

286 order_subs_emps := ( 

287 order.assigned.company.subsidiary.employees 

288 if made_by_user_id is None 

289 or order.assigned.company.subsidiary.employees is None 

290 else [ 

291 e 

292 for e in order.assigned.company.subsidiary.employees 

293 if e.id != made_by_user_id 

294 ] 

295 ) 

296 ): 

297 notification = notification.with_cast_many( 

298 CastType.subdivision, order_subs_emps 

299 ) 

300 if order_emp := ( 

301 order.assigned.company.employee 

302 if order.assigned.company.employee 

303 and order.assigned.company.employee.id != made_by_user_id 

304 else None 

305 ): 

306 notification = notification.with_cast_one( 

307 CastType.order_responsible, order_emp 

308 ) 

309 return notification 

310 

311 def company_order_reserved(self, order: OrderDBModel): 

312 to_send = self.fill_companys_recepients(order) 

313 if to_send is None: 

314 return 

315 self._make_request( 

316 to_send.with_preset_type("company_order_reserved") 

317 .with_object( 

318 order.model_dump(format_ids=False), 

319 ) 

320 .build() 

321 ) 

322 

323 def company_order_canceled( 

324 self, order: OrderDBModel, by_user_id: uuid.UUID 

325 ): 

326 to_send = self.fill_companys_recepients(order, by_user_id) 

327 if to_send is None: 

328 return 

329 self._make_request( 

330 to_send.with_preset_type("company_canceled_order") 

331 .with_object( 

332 order.model_dump(format_ids=False), 

333 ) 

334 .build() 

335 ) 

336 

337 def company_order_confirmed(self, order: OrderDBModel): 

338 to_send = self.fill_companys_recepients(order) 

339 if to_send is None: 

340 return 

341 self._make_request( 

342 to_send.with_preset_type("company_confirmed_order") 

343 .with_object( 

344 order.model_dump(format_ids=False), 

345 ) 

346 .build() 

347 ) 

348 

349 def company_order_unverified(self, order: OrderDBModel): 

350 to_send = self.fill_companys_recepients(order) 

351 if to_send is None: 

352 return 

353 self._make_request( 

354 to_send.with_preset_type("company_unverified_order") 

355 .with_object( 

356 order.model_dump(format_ids=False), 

357 ) 

358 .build() 

359 ) 

360 

361 def company_order_completed(self, order: OrderDBModel): 

362 to_send = self.fill_companys_recepients(order) 

363 if to_send is None: 

364 return 

365 self._make_request( 

366 to_send.with_preset_type("company_order_completed") 

367 .with_object( 

368 order.model_dump(format_ids=False), 

369 ) 

370 .build() 

371 ) 

372 

373 def company_auction_ends_soon(self, order: OrderDBModel): 

374 to_send = self.fill_companys_recepients(order) 

375 if to_send is None: 

376 return 

377 current_dt = get_current_datetime() 

378 minutes_left = ( 

379 order.auction_end_time - current_dt.replace(tzinfo=None) 

380 ).total_seconds() // 60 

381 if minutes_left < 5: 

382 return 

383 self._make_request( 

384 to_send.with_preset_type("company_auction_ends_soon") 

385 .with_object( 

386 { 

387 **order.model_dump(format_ids=False), 

388 **{"minutes_left": minutes_left}, 

389 }, 

390 ) 

391 .build() 

392 ) 

393 

394 def company_auction_ended(self, order: OrderDBModel): 

395 to_send = self.fill_companys_recepients(order) 

396 if to_send is None: 

397 return 

398 if order.status == OrderStatus.canceled: 

399 to_send = to_send.with_preset_type("company_auction_ended_no_bid") 

400 else: 

401 to_send = to_send.with_preset_type("company_auction_ended_success") 

402 self._make_request( 

403 to_send.with_object( # type: ignore[union-attr] 

404 order.model_dump(format_ids=False), 

405 ).build() 

406 ) 

407 

408 def assignment(self, order: OrderDBModel): 

409 to_send: NotificationBuilder = self.fill_companys_recepients( # type: ignore[assignment] 

410 order 

411 ) 

412 if to_send is None: 

413 return 

414 if order.status == OrderStatus.appointment: 

415 to_send = to_send.with_preset_type( 

416 f"company_{OrderStatus.appointment.value}_assigned_order" 

417 ) 

418 elif order.status == OrderStatus.confirmed: 

419 to_send = to_send.with_preset_type( 

420 f"company_{OrderStatus.confirmed.value}_assigned_order" 

421 ) 

422 else: 

423 to_send = to_send.with_preset_type( 

424 f"company_{OrderStatus.exchange.value}_assigned_order" 

425 ) 

426 self._make_request( 

427 to_send.with_object( 

428 order.model_dump(format_ids=False), 

429 ).build() 

430 ) 

431 

432 async def carriers_entity_verification_company_notification( 

433 self, 

434 obj: DriverDBModel 

435 | TrailerDBModel 

436 | TruckDBModel 

437 | OrganizationDBModel, 

438 ): 

439 directors_of_comp = [ 

440 SotransUserDBModel(**u) 

441 for u in await users_col.find_batch( 

442 {"role": SotransRole.company_director.value} 

443 ) 

444 ] 

445 notification = self.builder.with_cast_many( 

446 CastType.organization, directors_of_comp 

447 ) 

448 if isinstance(obj, DriverDBModel): 

449 notification = notification.with_preset_type( 

450 "company_carriers_driver" 

451 ) 

452 elif isinstance(obj, TruckDBModel): 

453 notification = notification.with_preset_type( 

454 "company_carriers_truck" 

455 ) 

456 elif isinstance(obj, TrailerDBModel): 

457 notification = notification.with_preset_type( 

458 "company_carriers_trailer" 

459 ) 

460 else: 

461 notification = notification.with_preset_type( 

462 "company_carriers_organization" 

463 ) 

464 self._make_request( 

465 notification.with_object(obj.model_dump(format_ids=False)).build() 

466 ) 

467 

468 def client_assigned( 

469 self, 

470 client: ClientDBModel, 

471 managers: list[SotransUserDBModel] | SotransUserDBModel, 

472 ): 

473 notification = self.builder.with_preset_type( 

474 "client_assigned" 

475 ).with_object(client.model_dump(format_ids=False)) 

476 if isinstance(managers, list): 

477 notification = notification.with_cast_many( 

478 CastType.subdivision, managers 

479 ) 

480 else: 

481 notification = notification.with_cast_one( 

482 CastType.client_responsible, managers 

483 ) 

484 self._make_request(notification.build()) 

485 

486 async def new_in_buffer(self, order: OrderDBModel): 

487 notification = self.builder.with_preset_type( 

488 "new_in_buffer" 

489 ).with_object(order.model_dump(format_ids=False)) 

490 to_responsible = True 

491 if not order.client.responsible: 

492 to_responsible = False 

493 if order.client.responsible: 

494 notification = notification.with_cast_many( 

495 CastType.client_responsible, order.client.responsible 

496 ) 

497 if order.client.subsidiary and order.client.subsidiary.id: 

498 sub_users = [ 

499 SotransUserDBModel(**u) 

500 for u in await users_col.find_batch( 

501 { 

502 SotransUserDBFieldsModel.subsidiary_id: str( 

503 order.client.subsidiary.id 

504 ) 

505 } 

506 ) 

507 ] 

508 if not (sub_users or to_responsible): 

509 return 

510 notification = notification.with_cast_many( 

511 CastType.subdivision, sub_users 

512 ) 

513 else: 

514 if not to_responsible: 

515 return 

516 self._make_request(notification.build()) 

517 

518 async def _notify_subsidiaries_on_scraped( 

519 self, clients: list[ClientToNotifyAbout] 

520 ): 

521 subdivision_counts: dict[ObjectId, int] = collections.defaultdict(int) 

522 subsidiary_receivers: dict[ 

523 ObjectId, list[SotransUserDBModel] 

524 ] = collections.defaultdict(list) 

525 for client in clients: 

526 if client.subsidiary is None or client.subsidiary.id is None: 

527 continue 

528 subdivision_counts[client.subsidiary.id] += 1 

529 if client.subsidiary.id not in subsidiary_receivers: 

530 users = await users_col.find_batch( 

531 { 

532 SotransUserDBFieldsModel.subsidiary_id: str( 

533 client.subsidiary.id 

534 ) 

535 } 

536 ) 

537 subsidiary_receivers[client.subsidiary.id].extend(users) 

538 for sc in subdivision_counts: 

539 self._make_request( 

540 self.builder.with_preset_type("orders_scraped") 

541 .with_object({"count": subdivision_counts[sc]}) 

542 .with_cast_many(CastType.subdivision, subsidiary_receivers[sc]) 

543 .build() 

544 ) 

545 

546 def _notify_responsible_about_scraped( 

547 self, clients: list[ClientToNotifyAbout] 

548 ): 

549 responsible_receivers: dict[uuid.UUID, int] = collections.defaultdict( 

550 int 

551 ) # receiver_id to count 

552 users_map: dict[uuid.UUID, SotransUserDBModel] = {} 

553 for cl in clients: 

554 if cl.responsible is None: 

555 continue 

556 for r in cl.responsible: 

557 responsible_receivers[r.id] += 1 

558 if r.id not in users_map: 

559 users_map[r.id] = r 

560 for rr in responsible_receivers: 

561 self._make_request( 

562 self.builder.with_preset_type("orders_scraped") 

563 .with_object({"count": responsible_receivers[rr]}) 

564 .with_cast_one(CastType.client_responsible, users_map[rr]) 

565 .build() 

566 ) 

567 

568 def _client_has_responsible(self, client: ClientDBModel): 

569 sub = client.subsidiary and client.subsidiary.id 

570 resp = bool(client.responsible) 

571 return bool(resp or sub) 

572 

573 async def orders_scraped( 

574 self, 

575 scraped_orders: list[OrderDBModel], 

576 container: ScrapedNotificationContainer, 

577 ): 

578 clients_to_notify = [ 

579 ClientToNotifyAbout(**order.client.model_dump()) 

580 for order in scraped_orders 

581 if order.client and self._client_has_responsible(order.client) 

582 ] 

583 clients = container.order_by_clients + clients_to_notify 

584 if len(clients) < MIN_COUNT_TO_NOTIFY_ON_SCRAPED: 

585 container.order_by_clients = clients 

586 return 

587 self._notify_responsible_about_scraped(clients) 

588 await self._notify_subsidiaries_on_scraped(clients) 

589 container.order_by_clients = [] 

590 

591 def confirmation_ended_company(self, order: OrderDBModel): 

592 to_send = self.fill_companys_recepients(order) 

593 if to_send is None: 

594 return 

595 notification = ( 

596 to_send.with_preset_type("company_confirmation_expired") 

597 .with_object(order.model_dump(format_ids=False)) 

598 .build() 

599 ) 

600 self._make_request(notification) 

601 

602 def user_registered(self, user: SotransUserDBModel): 

603 self._make_request( 

604 self.builder.with_preset_type("user_registered") 

605 .with_cast_one(CastType.personal, user) 

606 .with_object({"password": user.password}) 

607 .build() 

608 ) 

609 

610 def notify_with_recommended(self, employees: list[dict], count: int): 

611 self._make_request( 

612 self.builder.with_preset_type("order_recommended") 

613 .with_cast_many(CastType.organization, employees) 

614 .with_object({"count": count}) 

615 .build() 

616 ) 

617 

618 

619notification_api = NotificationDirector( 

620 NotificationConnector( 

621 NotificationAPIConfig.NOTIFICATION_API_URL, 

622 ), 

623 NotificationBuilder(), 

624) 

625 

626 

627async def get_users_to_notify( 

628 organization_id: ObjectId, 

629) -> list[SotransUserDBModel]: 

630 return [ 

631 SotransUserDBModel(**u) 

632 for u in await users_col.find_batch( 

633 {SotransUserDBFieldsModel.organization_id: str(organization_id)} 

634 ) 

635 ]