Coverage for api/handlers/organizations.py: 13%

205 statements  

« prev     ^ index     » next       coverage.py v7.6.2, created at 2024-10-10 03:02 +0300

1import asyncio 

2import copy 

3from typing import Any 

4 

5import httpx 

6from bson import ObjectId 

7from database.entity import update_etag_and_text_search 

8from database.integration_1c.savers import update_in_1c 

9from database.text_search.mongo_search import update_query_by_search 

10from database.updater import update_organization 

11from exceptions import ( 

12 BadParameterHTTPError, 

13 INNNotFound, 

14 NoAccessHTTPError, 

15 NotAcceptableHTTPError, 

16 NotFoundHTTPError, 

17) 

18from fastapi import HTTPException 

19from handlers.authorization.check_role import assert_any_roles_access, has_role 

20from handlers.resources import documents_creation 

21from keycloak import idp 

22from mongodb import orgs_col, users_col 

23from pymongo import ReturnDocument 

24from services.dadata import verify_inn 

25from services.notifications.director import notification_api 

26from sotrans_models.models._base import InsertByUUIDModel 

27from sotrans_models.models.organizations import ( 

28 InnVerificationStatus, 

29 OrganizationCreateModel, 

30 OrganizationDBModel, 

31 OrganizationUpdateModel, 

32 UpdateINNPayload, 

33) 

34from sotrans_models.models.resources.drivers import DriverDBModel 

35from sotrans_models.models.resources.trailers import TrailerDBModel 

36from sotrans_models.models.resources.trucks import TruckDBModel 

37from sotrans_models.models.responses import ( 

38 GenericGetListResponse, 

39 ProfileCompletionResponse, 

40) 

41from sotrans_models.models.roles import SotransRole 

42from sotrans_models.models.users import ( 

43 SotransKeycloakUserInfoModel, 

44 SotransOIDCUserModel, 

45) 

46from sotrans_models.utils.text_mappers import get_organizations_text_search 

47from starlette import status 

48from utils.data_grabber import ( 

49 BaseGetListQueryParams, 

50 MongoDataGrabber, 

51 adjust_search_query, 

52 update_search_query, 

53) 

54from utils.dt_utils import get_current_datetime 

55from utils.helper import etag_detalizer, get_hash, get_org_oid 

56 

57 

58async def update_contact_user(insert: InsertByUUIDModel | None, data: dict): 

59 if not (insert and insert.id): 

60 return 

61 user = await users_col.find_single("id", insert.id) 

62 if not user: 

63 raise NotFoundHTTPError("contact user") 

64 data[OrganizationDBModel.contact_user] = user 

65 

66 

67async def on_create_organization( 

68 item: OrganizationCreateModel, user: SotransOIDCUserModel 

69) -> OrganizationDBModel | dict[str, Any]: 

70 existing_item = await orgs_col.find_single("owner.id", user.sub) 

71 if existing_item: 

72 raise HTTPException( 

73 status_code=status.HTTP_409_CONFLICT, 

74 detail=f"Пользователь {user.sub} уже является владельцем организации {existing_item['id']}", 

75 ) 

76 existing_item = await orgs_col.find_single("inn", item.inn) 

77 if existing_item: 

78 raise HTTPException( 

79 status_code=status.HTTP_409_CONFLICT, 

80 detail=f"Организация с ИНН {item.inn} уже существует.", 

81 ) 

82 user_dict = await users_col.find_single("id", user.sub) 

83 if user_dict is None: 

84 raise NotFoundHTTPError("пользователь") 

85 try: 

86 dadata_info = await verify_inn(item.inn) 

87 except httpx.HTTPStatusError: # Expired token, for example 

88 org_data = item.model_dump() 

89 if OrganizationDBModel.documents[0] in org_data: 

90 del org_data["documents"] 

91 org_data[OrganizationDBModel.contact_user] = user_dict 

92 full_item = OrganizationDBModel( 

93 created_at=get_current_datetime(), 

94 is_active=True, 

95 inn_verification_status=InnVerificationStatus.failed, 

96 owner=user_dict, 

97 **org_data, 

98 ) 

99 else: 

100 org_data = item.model_dump(exclude_none=True) 

101 org_data[OrganizationDBModel.contact_user] = user_dict 

102 full_item = OrganizationDBModel( 

103 created_at=get_current_datetime(), 

104 is_active=True, 

105 inn_verification_status=InnVerificationStatus.success, 

106 owner=user_dict, 

107 **{ 

108 **dadata_info.model_dump(), 

109 **org_data, 

110 }, 

111 ) 

112 full_item_dump = full_item.model_dump() 

113 created_item = await orgs_col.create( 

114 { 

115 **full_item_dump, 

116 **{ 

117 "etag": get_hash(full_item_dump), 

118 OrganizationDBModel.text_search: get_organizations_text_search( 

119 full_item 

120 ), 

121 }, 

122 } 

123 ) 

124 updated = await documents_creation(item, orgs_col, created_item, user) 

125 created_item = OrganizationDBModel(**created_item) 

126 user.organization_id = str(created_item.id) 

127 user.username = user.email 

128 user_data = user.model_dump() 

129 await idp.update_user(user.sub, SotransKeycloakUserInfoModel(**user_data)) 

130 await idp.add_user_roles([SotransRole.carrier_director], user.sub) 

131 asyncio.create_task( 

132 notification_api.success_inn_verification(created_item) 

133 ) 

134 asyncio.create_task( 

135 notification_api.carriers_entity_verification_company_notification( 

136 OrganizationDBModel(**updated) if updated else created_item 

137 ) 

138 ) 

139 if updated: # only if at least one document applied 

140 asyncio.create_task(notification_api.organization_filled(updated)) 

141 return updated 

142 return created_item 

143 

144 

145async def on_update_organization_by_id( 

146 id: ObjectId, 

147 item: OrganizationUpdateModel, 

148 user: SotransOIDCUserModel, 

149): 

150 etag_q = {} if item.etag is None else {"etag": item.etag} 

151 existing_item: dict = await orgs_col.collection.find_one( 

152 {"_id": id} | etag_q 

153 ) 

154 if not existing_item: 

155 await etag_detalizer(orgs_col, etag_q, {"_id": id}) 

156 raise BadParameterHTTPError("id") 

157 

158 existing_item_model: OrganizationDBModel = OrganizationDBModel( 

159 **existing_item 

160 ) 

161 if ( 

162 not has_role(user, SotransRole.company_director) 

163 and existing_item_model.owner 

164 and existing_item_model.owner.id != user.sub 

165 and existing_item_model.id != user.organization_id 

166 ): 

167 raise NoAccessHTTPError("организация") 

168 assert_any_roles_access( 

169 user, [SotransRole.carrier_director, SotransRole.company_director] 

170 ) 

171 org_data = item.model_dump(exclude_unset=True) 

172 

173 org_updated = {**existing_item, **org_data} 

174 if item.inn is not None: 

175 if existing_item_model.owner.id != user.sub: 

176 raise NoAccessHTTPError( 

177 "Только владелец организации может обновлять ИНН." 

178 ) 

179 if ( 

180 existing_item_model.inn_verification_status 

181 != InnVerificationStatus.failed 

182 ): 

183 raise NotAcceptableHTTPError("ИНН уже принят") 

184 

185 same_inn = await orgs_col.find_single("inn", item.inn) 

186 if same_inn: 

187 raise HTTPException( 

188 status_code=status.HTTP_409_CONFLICT, 

189 detail=f"Организация с ИНН {item.inn} уже существует.", 

190 ) 

191 try: 

192 org = await verify_inn(item.inn) 

193 except INNNotFound: 

194 pass 

195 else: 

196 org_updated = { 

197 **existing_item, 

198 **org.model_dump(exclude_unset=True), 

199 **org_data, 

200 } 

201 

202 if OrganizationDBModel.contact_user in org_updated: 

203 del org_updated[OrganizationDBModel.contact_user] 

204 await update_contact_user(item.contact_user, org_updated) 

205 org_updated[ 

206 OrganizationDBModel.text_search 

207 ] = get_organizations_text_search(OrganizationDBModel(**org_updated)) 

208 updated = await orgs_col.collection.find_one_and_update( 

209 {"_id": id}, 

210 {"$set": org_updated}, 

211 return_document=ReturnDocument.AFTER, 

212 ) 

213 if updated is None: 

214 raise NotFoundHTTPError("организация") 

215 await update_etag_and_text_search( 

216 updated, orgs_col, OrganizationDBModel, get_organizations_text_search 

217 ) 

218 om = OrganizationDBModel(**updated) 

219 asyncio.create_task(update_in_1c(id, orgs_col.collection_name, om)) 

220 asyncio.create_task(notification_api.organization_filled(updated)) 

221 org_for_inner_update = copy.deepcopy(updated) 

222 org_for_inner_update.pop("documents", None) 

223 asyncio.create_task( 

224 update_organization(org_for_inner_update, id, needs_text_search=True) 

225 ) 

226 return om 

227 

228 

229async def on_inn_update( 

230 organization_id: ObjectId, 

231 inn_payload: UpdateINNPayload, 

232 user: SotransOIDCUserModel, 

233) -> OrganizationDBModel | dict[str, Any]: 

234 org = await orgs_col.find_single("_id", ObjectId(organization_id)) 

235 if not org: 

236 raise NotFoundHTTPError("organization") 

237 if ( 

238 not OrganizationDBModel(**org).inn_verification_status 

239 == InnVerificationStatus.failed 

240 ): 

241 raise NotAcceptableHTTPError("INN allready accepted") 

242 if user.sub != org["owner"]["id"]: 

243 raise NoAccessHTTPError("Director only") 

244 try: 

245 org = await verify_inn(inn_payload.inn) 

246 except INNNotFound: 

247 updated = await orgs_col.collection.find_one_and_update( 

248 {"_id": organization_id}, 

249 { 

250 "$set": { 

251 "inn": inn_payload.inn, 

252 OrganizationDBModel.inn_verification_status: InnVerificationStatus.failed.value, 

253 } 

254 }, 

255 return_document=ReturnDocument.AFTER, 

256 ) 

257 else: 

258 updated = await orgs_col.collection.find_one_and_update( 

259 {"_id": organization_id}, 

260 { 

261 "$set": { 

262 "inn": inn_payload.inn, 

263 OrganizationDBModel.inn_verification_status: InnVerificationStatus.success.value, 

264 **org.model_dump(exclude_none=True), 

265 } 

266 }, 

267 return_document=ReturnDocument.AFTER, 

268 ) 

269 if not updated: 

270 raise NotFoundHTTPError("organization") 

271 await update_etag_and_text_search( 

272 updated, 

273 orgs_col, 

274 OrganizationDBModel, 

275 get_organizations_text_search, 

276 ) 

277 om = OrganizationDBModel(**updated) 

278 asyncio.create_task( 

279 update_in_1c(organization_id, orgs_col.collection_name, om) 

280 ) 

281 asyncio.create_task(notification_api.organization_filled(updated)) 

282 return om 

283 

284 

285async def on_get_organization_suggestion( 

286 search_q: str, 

287 limit: int, 

288 skip: int, 

289) -> list[OrganizationDBModel]: 

290 query: dict = {} 

291 if search_q: 

292 await update_query_by_search(orgs_col, search_q, query) 

293 orgs = await orgs_col.find_batch( 

294 query, skip, limit, [(OrganizationDBModel.verification.status, -1)] 

295 ) 

296 return [OrganizationDBModel(**org) for org in orgs] 

297 

298 

299async def on_get_org_completion( # noqa: C901 

300 org_id: ObjectId, 

301) -> ProfileCompletionResponse | dict[str, Any]: 

302 org = await orgs_col.find_single("_id", org_id) 

303 if not org: 

304 raise NotFoundHTTPError("организация") 

305 org_model = OrganizationDBModel(**org) 

306 missing_fields = [] 

307 fields = 16 

308 if not org_model.name: 

309 missing_fields.append("Имя организации") 

310 fields -= 1 

311 if not org_model.short_name: 

312 missing_fields.append("Краткое имя") 

313 fields -= 1 

314 if not org_model.head: 

315 missing_fields.append("Глава организации") 

316 fields -= 1 

317 if not org_model.ownership_type: 

318 missing_fields.append("Форма собственности") 

319 fields -= 1 

320 if not org_model.factual_address: 

321 missing_fields.append("Фактический адрес") 

322 fields -= 1 

323 if not org_model.legal_address: 

324 missing_fields.append("Юридический адрес") 

325 fields -= 1 

326 if not org_model.phone: 

327 missing_fields.append("Номер телефона") 

328 fields -= 1 

329 if not org_model.email: 

330 missing_fields.append("E-mail") 

331 fields -= 1 

332 if not org_model.ogrn: 

333 missing_fields.append("ОГРН") 

334 fields -= 1 

335 if not org_model.kpp: 

336 missing_fields.append("КПП") 

337 fields -= 1 

338 if not org_model.taxation_type: 

339 missing_fields.append("Тип налогообложения") 

340 fields -= 1 

341 if not org_model.bank: 

342 missing_fields.append("Банк") 

343 fields -= 1 

344 if not org_model.bik: 

345 missing_fields.append("БИК") 

346 fields -= 1 

347 if not org_model.ifns_code: 

348 missing_fields.append("Код ИФНС") 

349 fields -= 1 

350 if not org_model.registration_date: 

351 fields -= 1 

352 missing_fields.append("Дата регистрации") 

353 if not org_model.contact_user: 

354 fields -= 1 

355 missing_fields.append("Контактное лицо") 

356 filled_percentage = round(100 * fields / 16, 2) 

357 return ProfileCompletionResponse( 

358 total_percentage=filled_percentage, missing=missing_fields 

359 ) 

360 

361 

362async def on_get_organization_resources( 

363 grabber: MongoDataGrabber, 

364 user: SotransOIDCUserModel, 

365 org_id: ObjectId, 

366 params: BaseGetListQueryParams, 

367) -> GenericGetListResponse[DriverDBModel | TrailerDBModel | TruckDBModel]: 

368 if not has_role( 

369 user, SotransRole.company_logistician 

370 ) and org_id != get_org_oid(user): 

371 raise NoAccessHTTPError("организация") 

372 if not org_id: 

373 org_id = get_org_oid(user) 

374 params.where = update_search_query( 

375 params.where, {DriverDBModel.organization_id: str(org_id)} 

376 ) 

377 resources = await grabber.get_list(params) 

378 return resources