Coverage for api/endpoints/notification_webhook.py: 48%
40 statements
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
1import asyncio
2import datetime
4from fastapi import APIRouter, Depends
5from handlers.orders.confirm_orders import on_warn_confirmation_ends
6from handlers.orders.recommendations import notify_with_recommended_summary
7from keycloak import idp
8from mongodb import orders_col
9from services.notifications.director import notification_api
10from sotrans_models.models.orders.order import OrderDBModel, OrderStatus
11from sotrans_models.models.roles import SotransRole
12from sotrans_models.models.services.notification.scheduled import (
13 AuctionEndNotificationsData,
14)
15from utils.dt_utils import get_current_datetime
17notifications_router = APIRouter(
18 prefix="/scheduled",
19 dependencies=[
20 Depends(
21 idp.get_current_user(required_role_names=[SotransRole.scheduled])
22 ),
23 ],
24)
27@notifications_router.put("/auction-finish")
28async def auc_finish_scheduled_webhook(
29 auc_end_data: AuctionEndNotificationsData,
30):
31 for o in auc_end_data.orders_taken:
32 asyncio.create_task(notification_api.order_reserved(o))
33 notification_api.company_order_reserved(o)
34 notification_api.company_auction_ended(o)
35 for o in auc_end_data.orders_dropped:
36 asyncio.create_task(notification_api.order_canceled(o))
37 notification_api.company_auction_ended(o)
40@notifications_router.put("/auction-finishes-soon")
41async def auc_soon():
42 max_timeline = get_current_datetime() + datetime.timedelta(hours=1)
43 min_timeline = max_timeline - datetime.timedelta(minutes=45)
44 auc_soon_orders = await orders_col.find_batch(
45 {
46 OrderDBModel.auction_end_time: {
47 "$lte": max_timeline,
48 "$gte": min_timeline,
49 },
50 OrderDBModel.status: OrderStatus.exchange.value,
51 }
52 )
53 for o in auc_soon_orders:
54 notification_api.company_auction_ends_soon(OrderDBModel(**o))
57@notifications_router.post("/confirmation-expired")
58async def notify_on_expired(orders: list[OrderDBModel]):
59 for o in orders:
60 notification_api.confirmation_ended_company(o)
61 asyncio.create_task(notification_api.order_canceled(o))
64@notifications_router.get("/confirmation_tick")
65async def confirmed_timeout_warning():
66 await on_warn_confirmation_ends()
69@notifications_router.get("/recommendations")
70async def recommendations_reminder():
71 await notify_with_recommended_summary()