Coverage for api/database/orders.py: 20%

81 statements  

« prev     ^ index     » next       coverage.py v7.6.2, created at 2024-10-10 03:02 +0300

1import copy 

2from datetime import timedelta 

3from typing import Any 

4 

5from bson import ObjectId 

6from database.entity import update_etag_and_text_search 

7from dbcc import MongoTableEngine 

8from exceptions import NotFoundHTTPError 

9from mongodb import buf_col, orders_col, settings_col, trash_orders_col 

10from operations.orders import clear_documents, create_order_with_second_phase 

11from pymongo import ReturnDocument 

12from sotrans_models.models.admin.settings import AdminSettings 

13from sotrans_models.models.orders.bid import BidDBModel 

14from sotrans_models.models.orders.order import OrderDBModel, OrderStatus 

15from sotrans_models.utils.text_mappers import get_orders_text_search 

16from utils.clear import clear_on_move 

17from utils.dt_utils import get_current_datetime 

18from utils.helper import ensure_yapi_location, etag_detalizer 

19 

20 

21async def get_confirmation_interval() -> int: 

22 settings = await settings_col.collection.find_one({}) 

23 settings = AdminSettings(**settings) if settings else AdminSettings() 

24 return settings.confirmation_interval_minutes 

25 

26 

27async def get_reservation_interval() -> int: 

28 settings = await settings_col.collection.find_one({}) 

29 settings = AdminSettings(**settings) if settings else AdminSettings() 

30 return settings.reservation_interval_minutes 

31 

32 

33async def confirm_order_and_return( 

34 order_id: ObjectId, 

35 carrier_data: dict[str, Any], 

36 best_bid: dict | None = None, 

37 restriction_query: dict[str, Any] | None = None, 

38 update_data: dict | None = None, 

39): 

40 """@:param carrier_data - data directly from database""" 

41 if not update_data: 

42 update_data = {} 

43 update = { 

44 OrderDBModel.status: OrderStatus.confirmed.value, 

45 OrderDBModel.confirmation_end_time: get_current_datetime() 

46 + timedelta(minutes=await get_confirmation_interval()), 

47 OrderDBModel.carrier: carrier_data, 

48 } 

49 if best_bid is not None: 

50 update[OrderDBModel.best_bid] = best_bid 

51 update[OrderDBModel.end_price] = best_bid[BidDBModel.value] 

52 if restriction_query is None: 

53 restriction_query = {} 

54 order = await orders_col.collection.find_one_and_update( 

55 restriction_query | {"id": order_id}, 

56 {"$set": update_data | update}, 

57 return_document=ReturnDocument.AFTER, 

58 ) 

59 if order is None: 

60 etag_container = copy.deepcopy(restriction_query) 

61 restriction_query.pop("etag", None) 

62 await etag_detalizer( 

63 orders_col, etag_container, restriction_query | {"id": order_id} 

64 ) 

65 raise NotFoundHTTPError("заказ") 

66 await update_etag_and_text_search( 

67 order, orders_col, OrderDBModel, get_orders_text_search 

68 ) 

69 return order 

70 

71 

72async def move_order_and_return( 

73 order_id: ObjectId, 

74 source_col: MongoTableEngine, 

75 target_col: MongoTableEngine, 

76 order_status: OrderStatus, 

77 restrictions_query: dict[str, Any] | None = None, 

78 updates: dict | None = None, 

79 order_data: dict[str, Any] | None = None, 

80) -> dict[str, Any]: 

81 if restrictions_query is None: 

82 restrictions_query = {} 

83 if order_data: 

84 await source_col.collection.delete_one( 

85 restrictions_query | {"id": order_id} 

86 ) 

87 else: 

88 order_data = await source_col.collection.find_one_and_delete( 

89 restrictions_query | {"id": order_id}, projection={"_id": 0} 

90 ) 

91 if order_data is None: 

92 raise NotFoundHTTPError("заказ(перемещён?)") 

93 await clear_documents(order_data.get("documents")) 

94 if updates is None: 

95 updates = {} 

96 order_data |= updates 

97 order_data[OrderDBModel.status] = order_status.value 

98 clear_on_move(order_data, order_status) 

99 if source_col == buf_col and target_col != trash_orders_col: 

100 await ensure_yapi_location(order_data) 

101 created = await create_order_with_second_phase( 

102 order_data, target_col, move=True 

103 ) 

104 return created.model_dump() 

105 

106 

107async def update_status_and_return( 

108 order_id: ObjectId, 

109 order_status: OrderStatus, 

110 set_: dict | None = None, 

111 push: dict | None = None, 

112 restriction: dict | None = None, 

113) -> dict[str, Any]: 

114 if restriction is None: 

115 restriction = {} 

116 if set_ is None: 

117 set_ = {} 

118 if order_status in (OrderStatus.exchange, OrderStatus.appointment): 

119 order_in = await orders_col.collection.find_one( 

120 {"id": order_id} | restriction 

121 ) 

122 if order_in is not None: 

123 await clear_documents(order_in.get("documents")) 

124 set_ |= { 

125 k: None 

126 for k in ( 

127 OrderDBModel.driver, 

128 OrderDBModel.carrier, 

129 OrderDBModel.truck, 

130 OrderDBModel.trailer, 

131 OrderDBModel.best_bid, 

132 OrderDBModel.document_draft, 

133 "documents", 

134 ) 

135 } 

136 update_q = {"$set": set_ | {OrderDBModel.status: order_status.value}} 

137 if push: 

138 update_q["$push"] = push 

139 order = await orders_col.collection.find_one_and_update( 

140 {"id": order_id} | restriction, 

141 update_q, 

142 return_document=ReturnDocument.AFTER, 

143 ) 

144 if order is None: 

145 etag_container = copy.deepcopy(restriction) 

146 restriction.pop("etag", None) 

147 await etag_detalizer( 

148 orders_col, etag_container, restriction | {"id": order_id} 

149 ) 

150 raise NotFoundHTTPError("order") 

151 await update_etag_and_text_search( 

152 order, orders_col, OrderDBModel, get_orders_text_search 

153 ) 

154 return order