Coverage for api/handlers/orders/external_orders.py: 15%

86 statements  

« prev     ^ index     » next       coverage.py v7.6.2, created at 2024-10-10 03:02 +0300

1import asyncio 

2 

3import motor 

4from dbcc import MongoTableEngine 

5from errors import log_warning 

6from exceptions import BadParameterHTTPError, NotFoundHTTPError 

7from mongodb import buf_col, orders_col, trash_orders_col 

8from operations.orders import ( 

9 create_order_with_second_phase, 

10 prepare_to_create_order, 

11) 

12from pymongo import ReturnDocument 

13from pymongo.errors import WriteError 

14from services.notifications.director import ( 

15 ScrapedNotificationContainer, 

16 notification_api, 

17) 

18from services.recommendations import suggestion_api 

19from sotrans_models.models.orders.order import ( 

20 ExternalOrderStatus, 

21 OrderCreateModel, 

22 OrderDBModel, 

23 OrderStatus, 

24) 

25from sotrans_models.models.users import SotransOIDCUserModel 

26from utils.helper import update_order_address 

27 

28 

29async def on_post_batch( 

30 executor: SotransOIDCUserModel, 

31 orders: list[OrderCreateModel], 

32 container: ScrapedNotificationContainer, 

33): 

34 raw_orders = [] 

35 for order in orders: 

36 if not order.external: 

37 log_warning("No external order") 

38 continue 

39 order.external.original_order = order.model_dump() 

40 if order.external.unique_id: 

41 existing_order = await buf_col.find_single( 

42 "external.unique_id", order.external.unique_id 

43 ) 

44 if existing_order: 

45 continue 

46 else: 

47 if not order.external.id: 

48 continue 

49 order.external.unique_id = ( 

50 f"{order.external.resource_name}_{order.external.id}" 

51 ) 

52 raw_orders.append( 

53 prepare_to_create_order(executor, order, by_external=True) 

54 ) 

55 inserted_ids = [] 

56 for prep in raw_orders: 

57 try: 

58 order_data = await prep 

59 except (BadParameterHTTPError, NotFoundHTTPError): 

60 continue 

61 created = await create_order_with_second_phase(order_data, buf_col) 

62 inserted_ids.append(created.id) 

63 

64 inserted = await buf_col.find_batch( 

65 {"id": {"$in": inserted_ids}}, projection={"client": 1} 

66 ) 

67 if inserted: 

68 asyncio.create_task( 

69 notification_api.orders_scraped( 

70 [OrderDBModel(**i) for i in inserted], container 

71 ) 

72 ) 

73 

74 

75async def on_patch_batch(orders: list[OrderCreateModel]): 

76 delta_fields = ( 

77 OrderDBModel.stops[0], 

78 OrderDBModel.client_price, 

79 OrderDBModel.truck_body, 

80 ) 

81 

82 async def update_delta( 

83 collection: MongoTableEngine, update_data_f: dict, updated_f: dict 

84 ): 

85 prev_data = { 

86 k: updated_f[k] 

87 for k in delta_fields 

88 if updated_f[k] != update_data_f[k] 

89 } 

90 try: 

91 await collection.collection.update_one( 

92 {"external.unique_id": order.external.unique_id}, 

93 {"$push": {"updated": prev_data}}, 

94 ) 

95 except WriteError: 

96 await collection.collection.update_one( 

97 {"external.unique_id": order.external.unique_id}, 

98 {"$set": {"updated": [prev_data]}}, 

99 ) 

100 

101 for order in orders: 

102 order_data = order.model_dump() 

103 from_exchange_delta = {} 

104 if order.external.from_exchange: 

105 from_exchange_delta = {OrderDBModel.external.from_exchange: True} 

106 await update_order_address(order_data, by_external=True) 

107 update_data = {k: order_data[k] for k in delta_fields} 

108 buffer_motor_col: motor.MotorCollection = buf_col.collection 

109 updated = await buffer_motor_col.find_one_and_update( 

110 {"external.unique_id": order.external.unique_id}, 

111 {"$set": update_data | from_exchange_delta}, 

112 ) 

113 if updated: 

114 await update_delta(buf_col, update_data, updated) 

115 continue 

116 exchange_motor_col: motor.MotorCollection = orders_col.collection 

117 updated = await exchange_motor_col.find_one_and_update( 

118 {"external.unique_id": order.external.unique_id}, 

119 {"$set": update_data | from_exchange_delta}, 

120 return_document=ReturnDocument.AFTER, 

121 ) 

122 if updated: 

123 await update_delta(orders_col, update_data, updated) 

124 updated_model = OrderDBModel(**updated) 

125 asyncio.create_task( 

126 suggestion_api.update_order( 

127 updated_model.id, updated_model, target=False 

128 ) 

129 ) 

130 asyncio.create_task( 

131 suggestion_api.update_order( 

132 updated_model.id, updated_model, target=True 

133 ) 

134 ) 

135 

136 

137async def on_delete_batch(orders: list[OrderDBModel]): 

138 for order in orders: 

139 match_query = {"external.unique_id": order.external.unique_id} 

140 buf_del = await buf_col.collection.find_one_and_delete(match_query) 

141 if buf_del: 

142 continue 

143 trash_motor: motor.MotorCollection = trash_orders_col.collection 

144 trash_del = await trash_motor.find_one_and_delete(match_query) 

145 if trash_del: 

146 continue 

147 exch_del = await orders_col.collection.find_one_and_delete( 

148 match_query 

149 | { 

150 OrderDBModel.status: { 

151 "$in": [ 

152 OrderStatus.exchange.value, 

153 OrderStatus.appointment.value, 

154 ] 

155 } 

156 } 

157 ) 

158 if exch_del: 

159 exch_del["external"]["status"] = ExternalOrderStatus.missing.value 

160 await trash_orders_col.create(exch_del) 

161 oid = exch_del["id"] 

162 asyncio.create_task(suggestion_api.remove_order(oid, target=False)) 

163 asyncio.create_task(suggestion_api.remove_order(oid, target=True)) 

164 continue 

165 await orders_col.collection.update_one( 

166 match_query 

167 | { 

168 OrderDBModel.status: { 

169 "$in": [ 

170 OrderStatus.confirmed.value, 

171 OrderStatus.reserved.value, 

172 OrderStatus.unverified.value, 

173 OrderStatus.active.value, 

174 ] 

175 } 

176 }, 

177 {"$set": {"external.status": ExternalOrderStatus.missing.value}}, 

178 )