Coverage for api/database/updater.py: 17%

120 statements  

« prev     ^ index     » next       coverage.py v7.6.2, created at 2024-10-10 03:02 +0300

1import copy 

2import uuid 

3from contextlib import suppress 

4from typing import Any, Literal 

5from uuid import UUID 

6 

7from bson import ObjectId 

8from dbcc import MongoTableEngine 

9from exceptions import SubsidiaryIsNotAssigned 

10from mongodb import ( 

11 archive_col, 

12 bids_col, 

13 buf_col, 

14 clients_col, 

15 orders_col, 

16 orgs_col, 

17 subsidiaries_col, 

18 trash_orders_col, 

19 users_col, 

20) 

21from pymongo import ReturnDocument 

22from pymongo.errors import OperationFailure, WriteError 

23from sotrans_models.models.misc.client import ClientDBModel 

24from sotrans_models.models.orders.bid import BidDBModel 

25from sotrans_models.models.orders.order import OrderDBModel 

26from sotrans_models.models.organizations import ( 

27 OrganizationDBModel, 

28 SubsidiaryDBModel, 

29) 

30from sotrans_models.models.users import SotransUserDBModel 

31from sotrans_models.utils.text_mappers import get_orders_text_search 

32from utils.helper import get_subsidiary_oid 

33 

34 

35async def update_carrier_in_orders( 

36 table: MongoTableEngine, 

37 org_id: ObjectId, 

38 organization: dict[str, Any], 

39 needs_text_search: bool, 

40): 

41 await table.collection.update_many( 

42 {OrderDBModel.best_bid.carrier.id: org_id}, 

43 {"$set": {OrderDBModel.best_bid.carrier: organization}}, 

44 ) 

45 organization["id"] = organization["_id"] 

46 if needs_text_search: 

47 by_carrier = await table.find_batch({OrderDBModel.carrier.id: org_id}) 

48 for o in by_carrier: 

49 order = await table.collection.find_one_and_update( 

50 {"_id": o["_id"], OrderDBModel.carrier.id: org_id}, 

51 {"$set": {OrderDBModel.carrier: organization}}, 

52 return_document=ReturnDocument.AFTER, 

53 ) 

54 if order is None: 

55 continue 

56 await table.update_by_id( 

57 o["_id"], 

58 { 

59 OrderDBModel.text_search: get_orders_text_search( 

60 OrderDBModel(**order) 

61 ) 

62 }, 

63 ) 

64 else: 

65 await table.collection.update_many( 

66 {OrderDBModel.carrier.id: org_id}, 

67 {"$set": {OrderDBModel.carrier: organization}}, 

68 ) 

69 

70 

71async def update_organization( 

72 organization: dict, 

73 organization_id: ObjectId, 

74 needs_text_search: bool = False, 

75): 

76 for col in (archive_col, orders_col, trash_orders_col): 

77 await update_carrier_in_orders( 

78 col, organization_id, organization, needs_text_search 

79 ) 

80 await bids_col.collection.update_many( 

81 {BidDBModel.carrier.id: organization_id}, 

82 {"$set": {BidDBModel.carrier: organization}}, 

83 ) 

84 

85 

86async def update_user_in_orders( 

87 table: MongoTableEngine, 

88 user_uuid: uuid.UUID, 

89 updated_user: dict[str, Any], 

90 to_update_vector: bool = False, 

91): 

92 await table.collection.update_many( 

93 {OrderDBModel.assigned.company.employee.id: user_uuid}, 

94 {"$set": {OrderDBModel.assigned.company.employee: updated_user}}, 

95 ) 

96 if to_update_vector: 

97 assigned_orders = await table.find_batch( 

98 {OrderDBModel.assigned.company.employee.id: user_uuid} 

99 ) 

100 for o in assigned_orders: 

101 await table.collection.update_one( 

102 {"_id": o["_id"]}, 

103 { 

104 "$set": { 

105 OrderDBModel.text_search: get_orders_text_search( 

106 OrderDBModel(**o), 

107 ) 

108 } 

109 }, 

110 ) 

111 

112 await table.collection.update_many( 

113 {OrderDBModel.best_bid.owner.id: user_uuid}, 

114 {"$set": {OrderDBModel.best_bid.owner: updated_user}}, 

115 ) 

116 

117 

118async def update_user( 

119 user_uuid: str | uuid.UUID, 

120 user_update: SotransUserDBModel, 

121 change_subsidiary: bool = False, 

122 update_vector_ts: bool = False, 

123): 

124 if isinstance(user_uuid, str): 

125 user_uuid = uuid.UUID(user_uuid) 

126 employee = await users_col.find_single("id", user_uuid) 

127 if not employee: 

128 return 

129 updated_user = { 

130 **employee, 

131 **user_update.model_dump(exclude_none=True), 

132 "id": user_uuid, # user model field 

133 } 

134 updated_user_model = SotransUserDBModel(**updated_user) 

135 org_update_needed: Literal[False] | dict[str, Any] = False 

136 update_org_or_sub_args = ( 

137 {SubsidiaryDBModel.employees[0].id: user_uuid}, 

138 {"$set": {f"{SubsidiaryDBModel.employees[0]}.$": updated_user}}, 

139 ) 

140 if change_subsidiary: 

141 await pull_employee_from_sub(user_uuid) 

142 subs = await subsidiaries_col.collection.find_one_and_update( 

143 *update_org_or_sub_args 

144 ) 

145 if not subs: 

146 with suppress(SubsidiaryIsNotAssigned): 

147 subs_id = await get_subsidiary_oid(updated_user_model) 

148 try: 

149 subs = await subsidiaries_col.collection.find_one_and_update( 

150 {"_id": subs_id}, 

151 {"$push": {SubsidiaryDBModel.employees[0]: updated_user}}, 

152 return_document=ReturnDocument.AFTER, 

153 ) 

154 except OperationFailure: 

155 subs = await subsidiaries_col.collection.find_one_and_update( 

156 {"_id": subs_id}, 

157 {"$set": {SubsidiaryDBModel.employees[0]: [updated_user]}}, 

158 return_document=ReturnDocument.AFTER, 

159 ) 

160 

161 org_with_owner = await orgs_col.collection.find_one_and_update( 

162 {OrganizationDBModel.owner.id: user_uuid}, 

163 {"$set": {OrganizationDBModel.owner: updated_user}}, 

164 projection={"documents": 0}, 

165 return_document=ReturnDocument.AFTER, 

166 ) 

167 if org_with_owner: 

168 org_update_needed = org_with_owner 

169 org_with_contact = await orgs_col.collection.find_one_and_update( 

170 {OrganizationDBModel.contact_user.id: user_uuid}, 

171 {"$set": {OrganizationDBModel.contact_user: updated_user}}, 

172 projection={"documents": 0}, 

173 return_document=ReturnDocument.AFTER, 

174 ) 

175 if org_with_contact: 

176 org_update_needed = org_with_contact 

177 

178 await clients_col.collection.update_many( 

179 {ClientDBModel.responsible[0].id: user_uuid}, 

180 {"$set": {f"{ClientDBModel.responsible[0]}.$": updated_user}}, 

181 ) 

182 for col in (archive_col, trash_orders_col, orders_col, buf_col): 

183 col.collection.update_many( 

184 {OrderDBModel.client.responsible[0].id: user_uuid}, 

185 { 

186 "$set": { 

187 f"{OrderDBModel.client.responsible[0]}.$": updated_user 

188 } 

189 }, 

190 ) 

191 

192 await bids_col.collection.update_many( 

193 {BidDBModel.owner.id: user_uuid}, 

194 {"$set": {BidDBModel.owner: updated_user}}, 

195 ) 

196 

197 # update bids and logisticians 

198 for col in (archive_col, trash_orders_col, orders_col, buf_col): 

199 await update_user_in_orders( 

200 col, user_uuid, updated_user, update_vector_ts 

201 ) 

202 

203 if org_update_needed is not False: 

204 await update_organization(org_update_needed, org_update_needed["id"]) 

205 

206 if subs is not None: 

207 await update_subsidiary(SubsidiaryDBModel(**subs)) 

208 

209 

210async def update_clients_in_orders(client_id: ObjectId, client_data: dict): 

211 for col in (archive_col, trash_orders_col, orders_col, buf_col): 

212 await col.collection.update_one( 

213 {OrderDBModel.client.id: client_id}, 

214 {"$set": {OrderDBModel.client: client_data}}, 

215 ) 

216 

217 

218async def update_subsidiary(subsidiary: SubsidiaryDBModel): 

219 for col in trash_orders_col, orders_col, buf_col, archive_col: 

220 await col.collection.update_many( 

221 {OrderDBModel.assigned.company.subsidiary.id: subsidiary.id}, 

222 { 

223 "$set": { 

224 OrderDBModel.assigned.company.subsidiary: subsidiary.model_dump() 

225 } 

226 }, 

227 ) 

228 

229 

230async def remove_subsidiary_from_deps(sub_id: ObjectId): 

231 for col in trash_orders_col, orders_col, buf_col, archive_col: 

232 await col.collection.update_many( 

233 {OrderDBModel.assigned.company.subsidiary.id: sub_id}, 

234 {"$set": {OrderDBModel.assigned.company.subsidiary: None}}, 

235 ) 

236 

237 

238async def remove_client_from_deps(client_id: ObjectId): 

239 for col in trash_orders_col, orders_col, buf_col, archive_col: 

240 await col.collection.update_many( 

241 {OrderDBModel.client.id: client_id}, 

242 {"$set": {OrderDBModel.client: None}}, 

243 ) 

244 

245 

246async def remove_user_in_orders(col: MongoTableEngine, user_id: uuid.UUID): 

247 await col.collection.update_many( 

248 {OrderDBModel.assigned.company.employee.id: user_id}, 

249 {"$set": {OrderDBModel.assigned.company.employee: None}}, 

250 ) 

251 await col.collection.update_many( 

252 {OrderDBModel.best_bid.owner.id: user_id}, 

253 {"$set": {OrderDBModel.best_bid.owner: None}}, 

254 ) 

255 

256 

257async def remove_user_from_deps(user_id: uuid.UUID | str): 

258 if isinstance(user_id, str): 

259 user_id = uuid.UUID(user_id) 

260 update_org_or_sub_args = ( 

261 {SubsidiaryDBModel.employees[0].id: user_id}, 

262 {"$pull": {SubsidiaryDBModel.employees[0]: {"id": user_id}}}, 

263 ) 

264 await subsidiaries_col.collection.update_one(*update_org_or_sub_args) 

265 up_org: Literal[False] | dict[str, Any] = False 

266 org = await orgs_col.collection.find_one_and_update( 

267 {OrganizationDBModel.owner.id: user_id}, 

268 {"$set": {OrganizationDBModel.owner: None}}, 

269 return_document=ReturnDocument.AFTER, 

270 ) 

271 if org: 

272 up_org = org 

273 

274 org = await orgs_col.collection.find_one_and_update( 

275 {OrganizationDBModel.contact_user.id: user_id}, 

276 {"$set": {OrganizationDBModel.contact_user: None}}, 

277 projection={"documents": 0}, 

278 return_document=ReturnDocument.AFTER, 

279 ) 

280 if org: 

281 up_org = org 

282 await clients_col.collection.update_many( 

283 {ClientDBModel.responsible[0].id: user_id}, 

284 {"$pull": {ClientDBModel.responsible[0]: {"id": user_id}}}, 

285 ) 

286 for col in trash_orders_col, buf_col, orders_col, archive_col: 

287 col.collection.update_many( 

288 {OrderDBModel.client.responsible[0].id: user_id}, 

289 {"$pull": {OrderDBModel.client.responsible[0]: {"id": user_id}}}, 

290 ) 

291 await bids_col.collection.update_many( 

292 {BidDBModel.owner.id: user_id}, {"$set": {BidDBModel.owner: None}} 

293 ) 

294 for col in trash_orders_col, orders_col, buf_col, archive_col: 

295 await remove_user_in_orders(col, user_id) 

296 if up_org is not False: 

297 await update_organization(up_org, up_org["_id"]) 

298 

299 

300async def pull_employee_from_sub(employee_id: UUID): 

301 with suppress(WriteError): 

302 updated = await subsidiaries_col.collection.find_one_and_update( 

303 {"employees.id": employee_id}, 

304 {"$pull": {"employees": {"id": employee_id}}}, 

305 return_document=ReturnDocument.AFTER, 

306 ) 

307 if not updated: 

308 return 

309 await update_subsidiary(SubsidiaryDBModel(**updated)) 

310 

311 

312async def update_resource_in_order( 

313 resource_id: ObjectId, 

314 resource: dict[str, Any] | None, 

315 field: Literal["driver", "truck", "trailer"], 

316): 

317 for collection in (orders_col, trash_orders_col, buf_col, archive_col): 

318 await collection.collection.update_many( 

319 {f"{field}.id": resource_id}, {"$set": {field: resource}} 

320 )