Coverage for api/handlers/users.py: 13%

385 statements  

« prev     ^ index     » next       coverage.py v7.6.2, created at 2024-10-10 03:02 +0300

1import asyncio 

2import collections 

3import json 

4import uuid 

5from contextlib import suppress 

6 

7from bson import ObjectId 

8from bson.errors import InvalidId 

9from database.updater import ( 

10 remove_user_from_deps, 

11 update_subsidiary, 

12 update_user, 

13) 

14from errors import log_error, log_warning 

15from exceptions import ( 

16 BadParameterHTTPError, 

17 NoAccessHTTPError, 

18 NotFoundHTTPError, 

19) 

20from fastapi import HTTPException 

21from handlers.authorization.check_role import has_role 

22from handlers.grabbers.clients import clients_grabber 

23from handlers.grabbers.orders import orders_data_grabber 

24from handlers.grabbers.users import users_data_grabber 

25from keycloak import idp 

26from mongodb import ( 

27 cm_clients_col, 

28 subsidiaries_col, 

29 users_col, 

30 users_webhook_col, 

31) 

32from pydantic import BaseModel 

33from pymongo import ReturnDocument 

34from pymongo.errors import OperationFailure 

35from services.minio_policy_setter import MinioPolicySetter 

36from services.notifications.director import notification_api 

37from services.telegram import GROUP_TYPE, group_notification 

38from sotrans_fastapi_keycloak._uuid import PydanticUUID 

39from sotrans_fastapi_keycloak.api import get_keycloak_user_model 

40from sotrans_fastapi_keycloak.exceptions import UserNotFound 

41from sotrans_fastapi_keycloak.model import ( 

42 HTTPMethod, 

43 KeycloakRoleModel, 

44 user_model_attributes_serializer, 

45) 

46from sotrans_models.models.misc.client import ( 

47 ClientDBModel, 

48 ManagersClientsDBModel, 

49) 

50from sotrans_models.models.orders.order import OrderDBModel 

51from sotrans_models.models.organizations import SubsidiaryDBModel 

52from sotrans_models.models.responses import GenericGetListResponse 

53from sotrans_models.models.roles import SotransRole 

54from sotrans_models.models.users import ( 

55 CreateUserWithRoleModel, 

56 SotransKeycloakUserCreateModel, 

57 SotransKeycloakUserModel, 

58 SotransKeycloakUserPublicUpdateModel, 

59 SotransKeycloakUserUpdateModel, 

60 SotransOIDCUserModel, 

61 SotransUserDBFieldsModel, 

62 SotransUserDBModel, 

63 UpdateKeycloakUserWebhookModel, 

64 UpdateRoleModel, 

65) 

66from sotrans_models.utils.text_mappers import get_users_text_search 

67from starlette import status 

68from utils.data_grabber import ( 

69 BaseGetListQueryParams, 

70 BaseGetOneQueryParams, 

71 adjust_search_query, 

72 update_search_query, 

73) 

74from utils.helper import ( 

75 clear_user_for_keycloak, 

76 get_org_oid, 

77 secure_pass_generator, 

78) 

79 

80company_hierarchy = [ 

81 SotransRole.admin, 

82 SotransRole.company_director, 

83 SotransRole.company_manager, 

84 SotransRole.company_logistician, 

85] 

86 

87carrier_hierarchy = [ 

88 SotransRole.carrier_director, 

89 SotransRole.carrier_logistician, 

90] 

91 

92 

93def get_most_role(roles: list[str]) -> str: 

94 if not roles: 

95 return "" 

96 for comp_role in company_hierarchy: 

97 if comp_role.value in roles: 

98 return comp_role.value 

99 for car_role in carrier_hierarchy: 

100 if car_role.value in roles: 

101 return car_role.value 

102 return "" 

103 

104 

105async def _assert_director_access( 

106 user_id: str | None, accessor: SotransOIDCUserModel, access_for: str 

107): 

108 user_roles = accessor.roles 

109 if not ( 

110 idp.check_roles(user_roles, [SotransRole.company_director]) 

111 or idp.check_roles(user_roles, [SotransRole.carrier_director]) 

112 ): 

113 raise NoAccessHTTPError("роль") 

114 if user_id is None: 

115 return 

116 user = await users_col.find_single("id", uuid.UUID(user_id)) 

117 if user is None: 

118 raise NotFoundHTTPError("пользователь") 

119 if not has_role( 

120 accessor, SotransRole.company_director 

121 ) and SotransUserDBModel(**user).organization_id != str( 

122 get_org_oid(executor=accessor) 

123 ): 

124 raise NoAccessHTTPError(f"{access_for} директором") 

125 

126 

127def _assert_user_access(executor): 

128 if not idp.check_roles(executor.roles, [SotransRole.company_logistician]): 

129 raise NoAccessHTTPError("Different organization") 

130 

131 

132async def on_create_users( 

133 creation_data: CreateUserWithRoleModel, 

134 executor: SotransOIDCUserModel, 

135 minio_policy_setter=MinioPolicySetter(), 

136): 

137 await _assert_director_access(None, executor, "create users") 

138 # creation_data = SotransKeycloakUserCreateModel( 

139 # **creation_data.model_dump(exclude_unset=True), 

140 # organization_id=executor.organization_id 

141 # ) 

142 role = creation_data.role 

143 creation_data = SotransKeycloakUserCreateModel( 

144 **creation_data.model_dump() 

145 ) 

146 creation_data.organization_id = executor.organization_id 

147 

148 initial_roles = [] 

149 if has_role(executor, SotransRole.company_director): 

150 company_employees = ( 

151 SotransRole.company_director, 

152 SotransRole.company_manager, 

153 SotransRole.company_logistician, 

154 ) 

155 if role and role in company_employees: 

156 initial_roles = [role] 

157 else: 

158 initial_roles = [SotransRole.company_logistician] 

159 elif has_role(executor, SotransRole.carrier_director): 

160 carrier_employees = ( 

161 SotransRole.carrier_director, 

162 SotransRole.carrier_logistician, 

163 ) 

164 

165 if role and role in carrier_employees: 

166 initial_roles = [role] 

167 else: 

168 initial_roles = [SotransRole.carrier_logistician] 

169 

170 password = None 

171 if creation_data.password is None: 

172 password = secure_pass_generator() 

173 creation_data.password = password 

174 

175 # set minio policy according to roles and produce new model 

176 updated_creation_data = ( 

177 minio_policy_setter.create_new_model_with_minio_policy_attr( 

178 creation_data, initial_roles 

179 ) 

180 ) 

181 

182 user: SotransKeycloakUserModel = await idp.create_user( 

183 model=updated_creation_data, 

184 send_email_verification=False, 

185 initial_roles=initial_roles, 

186 ) 

187 new_user = SotransUserDBModel( 

188 **{**user.model_dump(), **dict(roles=initial_roles)} 

189 ) 

190 new_user.role = get_most_role(initial_roles) 

191 

192 new_user.password = password 

193 notification_api.user_registered(new_user) 

194 return new_user 

195 

196 

197async def on_get_users( 

198 executor: SotransOIDCUserModel, params: BaseGetListQueryParams 

199) -> GenericGetListResponse[SotransUserDBModel]: 

200 user_org_oid = get_org_oid(executor) 

201 query = {SotransUserDBFieldsModel.organization_id: str(user_org_oid)} 

202 params.where = update_search_query(params.where, query) 

203 return await users_data_grabber.get_list(params) 

204 

205 

206async def on_get_user( 

207 user_id: uuid.UUID, 

208 executor: SotransOIDCUserModel, 

209 params: BaseGetOneQueryParams, 

210): 

211 q = { 

212 SotransUserDBFieldsModel.organization_id: str(get_org_oid(executor)), 

213 SotransUserDBFieldsModel.id: user_id, 

214 } 

215 user = await users_col.collection.find_one(q, projection=params.projection) 

216 if user is None: 

217 raise NotFoundHTTPError("пользователь") 

218 return user 

219 

220 

221def update_kc_int(d, u): 

222 for k, v in u.items(): 

223 if isinstance(v, collections.abc.Mapping): 

224 d[k] = update_kc_int(d.get(k, {}), v) 

225 else: 

226 d[k] = v 

227 return d 

228 

229 

230async def on_update_user( 

231 user_id: str, 

232 update_data: SotransKeycloakUserUpdateModel 

233 | SotransKeycloakUserPublicUpdateModel, 

234 executor: SotransOIDCUserModel, 

235): 

236 if isinstance(update_data, SotransKeycloakUserUpdateModel): 

237 await _assert_director_access( 

238 user_id, executor, "обновление приватной информации" 

239 ) 

240 user = await users_col.find_single("id", uuid.UUID(user_id)) 

241 if user is None: 

242 raise NotFoundHTTPError("пользователь") 

243 if update_data.password is not None: 

244 await idp.change_password(user_id, update_data.password) 

245 clear_user_for_keycloak(user) 

246 user_kc_model = SotransUserDBModel(**user) 

247 user_in_db_kc = user_model_attributes_serializer(user_kc_model) 

248 user_update_data_kc = get_keycloak_user_model(update_data) 

249 

250 update_kc_int(user_in_db_kc, user_update_data_kc) 

251 clear_user_for_keycloak(user_in_db_kc) 

252 

253 response = await idp._admin_request( 

254 url=f"{idp.users_uri}/{user_id}", 

255 data=user_in_db_kc, 

256 method=HTTPMethod.PUT, 

257 ) 

258 if response.status == 204: # Update successful 

259 with suppress(ValueError): 

260 role = SotransRole(update_data.role) 

261 carrier_employees = ( 

262 SotransRole.carrier_director, 

263 SotransRole.carrier_logistician, 

264 ) 

265 company_employees = ( 

266 SotransRole.company_director, 

267 SotransRole.company_manager, 

268 SotransRole.company_logistician, 

269 ) 

270 

271 async def update_role(): 

272 current_roles = await idp.get_user_roles(user_id) 

273 await idp.remove_user_roles( 

274 [cr.name for cr in current_roles], user_id 

275 ) 

276 await idp.add_user_roles([role], user_id) 

277 

278 if has_role(executor, SotransRole.company_director): 

279 if role in company_employees: 

280 await update_role() 

281 elif has_role(executor, SotransRole.carrier_director): 

282 if role in carrier_employees: 

283 await update_role() 

284 

285 sync_update = {**user, **update_data.model_dump(exclude_unset=True)} 

286 sync_up_model = SotransUserDBModel(**sync_update) 

287 if sync_up_model.roles and update_data.role: 

288 sync_up_model.roles = [update_data.role] 

289 

290 return await users_col.collection.find_one_and_update( 

291 {"id": sync_up_model.id}, 

292 {"$set": sync_up_model.model_dump(exclude_unset=True)}, 

293 return_document=ReturnDocument.AFTER, 

294 ) 

295 

296 log_error(f"{response.status}: {await response.content.read()}") 

297 raise HTTPException( 

298 status.HTTP_503_SERVICE_UNAVAILABLE, 

299 "Запрос на обновление данных пользователя был провален.", 

300 ) 

301 

302 

303async def on_delete_user(user_id: str, executor: SotransOIDCUserModel) -> dict: 

304 await _assert_director_access(user_id, executor, "удаление пользователя") 

305 return await idp.delete_user(user_id=user_id) 

306 

307 

308async def update_users_roles( 

309 user_id: str, executor: SotransOIDCUserModel, role: SotransRole 

310): 

311 if ( 

312 await idp.get_user(user_id=user_id) 

313 ).organization_id != executor.organization_id: 

314 raise NoAccessHTTPError("Employee") 

315 carrier_employees = ( 

316 SotransRole.carrier_director, 

317 SotransRole.carrier_logistician, 

318 ) 

319 company_employees = ( 

320 SotransRole.company_director, 

321 SotransRole.company_manager, 

322 SotransRole.company_logistician, 

323 ) 

324 carrier_chief = ( 

325 has_role(executor, SotransRole.carrier_director) 

326 and role in carrier_employees 

327 ) 

328 company_chief = ( 

329 has_role(executor, SotransRole.company_director) 

330 and role in company_employees 

331 ) 

332 

333 if not (company_chief or carrier_chief): 

334 raise NoAccessHTTPError("Roles") 

335 current_roles = await idp.get_user_roles(user_id) 

336 cur_roles_set = {cr.name for cr in current_roles} 

337 if company_chief and not cur_roles_set & set(company_employees): 

338 raise NoAccessHTTPError("Employees role") 

339 if carrier_chief and not cur_roles_set & set(carrier_employees): 

340 raise NoAccessHTTPError("Employees role") 

341 # 'default-roles-sotransdev' gives ability to edit data in keycloack 

342 # resulting_roles: list[str] = [] 

343 # with suppress(ValueError): 

344 # current_roles.remove('default-roles-sotransdev') 

345 # resulting_roles.append('default-roles-sotransdev') 

346 await idp.remove_user_roles([cr.name for cr in current_roles], user_id) 

347 await idp.add_user_roles([role], user_id) 

348 return UpdateRoleModel(role=role) 

349 

350 

351async def update_subsidiary_by_user_id( 

352 user_id: str, sub_id: str | None, needs_result: bool 

353) -> SotransKeycloakUserModel | None: 

354 user = SotransKeycloakUserUpdateModel( 

355 **(await idp.get_user(user_id)).model_dump() 

356 ) 

357 user.subsidiary_id = sub_id 

358 user_data = get_keycloak_user_model(await idp.get_user(user_id)) 

359 updated_user_data = get_keycloak_user_model(user) 

360 update_kc_int(user_data, updated_user_data) 

361 clear_user_for_keycloak(user_data) 

362 await idp._admin_request( 

363 url=f"{idp.users_uri}/{user_id}", data=user_data, method=HTTPMethod.PUT 

364 ) 

365 if needs_result is False: 

366 return None 

367 return await idp.get_user(user_id=user_id) 

368 

369 

370async def assign_sub_to_user( 

371 user_id: str, sub_id: str | None, needs_result: bool = False 

372) -> SotransKeycloakUserModel | None: 

373 user = await update_subsidiary_by_user_id(user_id, sub_id, needs_result) 

374 if needs_result is False: 

375 return None 

376 return user 

377 

378 

379async def on_update_subsidiary( 

380 user_id: str, sub_id: str 

381) -> SotransKeycloakUserModel: 

382 try: 

383 oid = ObjectId(sub_id) 

384 except InvalidId: 

385 raise BadParameterHTTPError("sub_id") 

386 named_sub = await subsidiaries_col.find_single( 

387 "_id", oid, projection={"name": 1} 

388 ) 

389 if named_sub is None: 

390 raise NotFoundHTTPError("подразделение") 

391 try: 

392 return await assign_sub_to_user(user_id, sub_id, needs_result=True) 

393 except UserNotFound: 

394 raise NotFoundHTTPError("пользователь") 

395 

396 

397def parse_role(data: dict) -> SotransRole | None: 

398 role_update: str = data["representation"] 

399 role_update_json = json.loads(role_update) 

400 try: 

401 role = role_update_json[0]["name"] 

402 except IndexError: 

403 return None 

404 with suppress(ValueError): 

405 return SotransRole(role) 

406 return None 

407 

408 

409async def on_add_role(data: dict): 

410 res_path: str = data["resourcePath"] 

411 up_user_id = uuid.UUID(res_path.split("/")[1]) 

412 role_enum = parse_role(data) 

413 if not role_enum: 

414 return 

415 user = await users_col.find_single("id", up_user_id) 

416 if not user: 

417 log_warning( 

418 f"No user with uuid {up_user_id} for role {role_enum.value}" 

419 ) 

420 return 

421 user_model = SotransUserDBModel(**user) 

422 

423 if role_enum not in user_model.roles: 

424 user_model.roles.append(role_enum) 

425 most_role = get_most_role(user_model.roles) 

426 await users_col.collection.update_one( 

427 {SotransUserDBFieldsModel.id: up_user_id}, 

428 { 

429 "$set": {SotransUserDBFieldsModel.role: most_role}, 

430 "$push": {SotransUserDBFieldsModel.roles[0]: role_enum.value}, 

431 }, 

432 ) 

433 user_model.role = most_role 

434 await update_user(up_user_id, user_model) 

435 

436 

437class UserUpdateTimestamp(BaseModel): 

438 user_id: PydanticUUID 

439 timestamp: float 

440 

441 

442async def check_ts(data: dict, user_id: uuid.UUID): 

443 ts = data.get("proxy_timestamp") 

444 if not ts: 

445 return False 

446 user_ts = await users_webhook_col.find_single("user_id", user_id) 

447 if user_ts is None: 

448 uts_model = UserUpdateTimestamp(user_id=user_id, timestamp=ts) 

449 await users_webhook_col.create(uts_model.model_dump()) 

450 return True 

451 user_ts_model = UserUpdateTimestamp(**user_ts) 

452 if user_ts_model.timestamp > ts: 

453 return False 

454 await users_webhook_col.collection.update_one( 

455 {"user_id": user_id, "timestamp": ts} 

456 ) 

457 return True 

458 

459 

460async def on_user_update_wh(data: dict): 

461 group_notification(f"#user-update \n{data}", mode=GROUP_TYPE.INFO) 

462 python_payload = json.loads(data["representation"]) 

463 user_update_data = UpdateKeycloakUserWebhookModel(**python_payload) 

464 if not await check_ts(data, user_update_data.id): 

465 return 

466 previous_user = await users_col.find_single("id", user_update_data.id) 

467 if not previous_user: 

468 return 

469 roles = previous_user.get("roles") or [] 

470 pu_model = SotransUserDBModel(**previous_user) 

471 to_update_vector = ( 

472 True 

473 if pu_model.surname != user_update_data.surname 

474 or pu_model.name != user_update_data.name 

475 or pu_model.patronymic != user_update_data.patronymic 

476 else False 

477 ) 

478 user = SotransUserDBModel( 

479 **user_update_data.model_dump(), roles=roles, role=get_most_role(roles) 

480 ) 

481 user.text_search = get_users_text_search(user) 

482 previous_user_model = SotransUserDBModel(**previous_user) 

483 change_subsidiary = previous_user_model.subsidiary_id != user.subsidiary_id 

484 if change_subsidiary: 

485 if user.subsidiary_id is not None: 

486 with suppress(InvalidId): 

487 soid = ObjectId(user.subsidiary_id) 

488 subsidiary_name_holder = await subsidiaries_col.find_single( 

489 "_id", soid, projection={"name": 1} 

490 ) 

491 if subsidiary_name_holder: 

492 user.subsidiary_name = subsidiary_name_holder["name"] 

493 else: 

494 user.subsidiary_name = None 

495 await users_col.collection.update_one( 

496 {SotransUserDBFieldsModel.id: user_update_data.id}, 

497 {"$set": user.model_dump()}, 

498 ) 

499 

500 await update_user( 

501 user_update_data.id, 

502 user, 

503 change_subsidiary=change_subsidiary 

504 and previous_user_model.subsidiary_id is not None, 

505 update_vector_ts=to_update_vector, 

506 ) 

507 

508 

509async def on_remove_role(data: dict): 

510 res_path: str = data["resourcePath"] 

511 up_user_id = uuid.UUID(res_path.split("/")[1]) 

512 role_enum = parse_role(data) 

513 if not role_enum: 

514 return 

515 user = await users_col.find_single("id", up_user_id) 

516 if not user: 

517 return 

518 user_model = SotransUserDBModel(**user) 

519 if role_enum in user_model.roles: 

520 user_model.roles.remove(role_enum) 

521 user_model.role = get_most_role(user_model.roles) 

522 await users_col.collection.update_one( 

523 {SotransUserDBFieldsModel.id: up_user_id}, 

524 { 

525 "$set": {SotransUserDBFieldsModel.role: user_model.role}, 

526 "$pull": {SotransUserDBFieldsModel.roles[0]: role_enum.value}, 

527 }, 

528 ) 

529 await update_user(up_user_id, user_model) 

530 

531 

532async def on_user_delete(data: dict): 

533 res_path: str = data["resourcePath"] 

534 user_id = uuid.UUID(res_path.split("/")[1]) 

535 await users_col.collection.delete_one( 

536 {SotransUserDBFieldsModel.id: user_id} 

537 ) 

538 await remove_user_from_deps(user_id) 

539 

540 

541async def on_user_create(data: dict): 

542 res_path: str = data["resourcePath"] 

543 user_id = uuid.UUID(res_path.split("/")[1]) 

544 python_payload = json.loads(data["representation"]) 

545 user_update_data = UpdateKeycloakUserWebhookModel(**python_payload) 

546 user_update_data.id = user_id 

547 user = SotransUserDBModel(**user_update_data.model_dump()) 

548 user.text_search = get_users_text_search(user) 

549 await users_col.collection.insert_one(user.model_dump()) 

550 if user.subsidiary_id is None: 

551 return 

552 try: 

553 sub_id = ObjectId(user.subsidiary_id) 

554 except InvalidId: 

555 return 

556 try: 

557 subsidiary = await subsidiaries_col.collection.find_one_and_update( 

558 {"_id": sub_id}, 

559 {"$push": {SubsidiaryDBModel.employees[0]: user.model_dump()}}, 

560 return_document=ReturnDocument.AFTER, 

561 ) 

562 except OperationFailure: 

563 subsidiary = await subsidiaries_col.collection.find_one_and_update( 

564 {"_id": sub_id}, 

565 {"$set": {SubsidiaryDBModel.employees[0]: [user.model_dump()]}}, 

566 return_document=ReturnDocument.AFTER, 

567 ) 

568 if subsidiary: 

569 subs_model = SubsidiaryDBModel(**subsidiary) 

570 await update_subsidiary(subs_model) 

571 

572 

573async def on_user_register(data: dict): 

574 user_id = data["userId"] 

575 for t in range(60): 

576 try: 

577 user_data = await idp.get_user(user_id) 

578 except UserNotFound: 

579 await asyncio.sleep(0.7) 

580 else: 

581 log_warning(f"{user_id} registered on try {t}") 

582 break 

583 else: 

584 log_error(f"Failed to find user, webhook data:\n{data}") 

585 return 

586 to_create = SotransUserDBModel(**user_data.model_dump()) 

587 to_create.text_search = get_users_text_search(to_create) 

588 await users_col.collection.insert_one(to_create.model_dump()) 

589 

590 

591async def keycloak_webhook_handler(data: dict): 

592 event_type = data.get("type") 

593 if event_type: 

594 match event_type: 

595 case "USER-UPDATE": 

596 await on_user_update_wh(data) 

597 case "REALM_ROLE_MAPPING-CREATE": 

598 await asyncio.sleep(3) 

599 await on_add_role(data) 

600 case "REALM_ROLE_MAPPING-DELETE": 

601 await on_remove_role(data) 

602 case "USER-CREATE": 

603 await on_user_create(data) 

604 case "REGISTER": 

605 asyncio.create_task(on_user_register(data)) 

606 case "USER-DELETE": 

607 await on_user_delete(data) 

608 

609 

610async def on_get_assigned_to_employee_clients( 

611 user_id: uuid.UUID, params: BaseGetListQueryParams 

612) -> GenericGetListResponse[ClientDBModel]: 

613 assigned_clients = await cm_clients_col.find_single( 

614 ManagersClientsDBModel.manager_id, user_id 

615 ) 

616 if not assigned_clients: 

617 return GenericGetListResponse[ClientDBModel](items=[], total=0) 

618 clients_ids = ManagersClientsDBModel(**assigned_clients).clients_ids 

619 params.where = adjust_search_query( 

620 params.where, "id", {str(ci) for ci in clients_ids} 

621 ) 

622 return await clients_grabber.get_list(params) 

623 

624 

625async def on_get_assigned_orders_to_employee( 

626 user_id: uuid.UUID, params: BaseGetListQueryParams 

627) -> GenericGetListResponse[OrderDBModel]: 

628 params.where = update_search_query( 

629 params.where, {OrderDBModel.assigned.company.employee.id: str(user_id)} 

630 ) 

631 return await orders_data_grabber.get_list(params) 

632 

633 

634def parse_user_roles(roles: list[KeycloakRoleModel]) -> list[str]: 

635 result = [] 

636 for role in roles: 

637 with suppress(ValueError): 

638 result.append(SotransRole(role.name).value) 

639 return result 

640 

641 

642async def users_sync_batch_processing( 

643 users_batch: list[SotransKeycloakUserModel], 

644): 

645 for user in users_batch: 

646 user_roles = await idp.get_user_roles(str(user.id)) 

647 role_names = parse_user_roles(user_roles) 

648 most_role = get_most_role(role_names) 

649 try: 

650 if user.subsidiary_id is None: 

651 raise InvalidId 

652 sub_id = ObjectId(user.subsidiary_id) 

653 except (InvalidId, TypeError): 

654 sub_name = None 

655 else: 

656 sub = await subsidiaries_col.find_single( 

657 "_id", sub_id, projection={"name": 1} 

658 ) 

659 if sub is None: 

660 sub_name = None 

661 else: 

662 sub_name = sub["name"] 

663 db_user = SotransUserDBModel( 

664 **{**user.model_dump(), **dict(roles=role_names, role=most_role)}, 

665 subsidiary_name=sub_name, 

666 ) 

667 db_user.text_search = get_users_text_search(db_user) 

668 await users_col.collection.update_one( 

669 {"id": user.id}, 

670 {"$set": db_user.model_dump()}, 

671 upsert=True, 

672 ) 

673 

674 

675async def iterate_for_sync(): 

676 limit = 200 

677 skip = 0 

678 timeout = idp.timeout 

679 idp.timeout = 150 

680 try: 

681 users = await idp.get_all_users(skip, limit) 

682 while users: 

683 await users_sync_batch_processing(users) 

684 skip += limit 

685 users = await idp.get_all_users(skip, limit) 

686 except Exception: 

687 log_error("Keycloak sync crash") 

688 # print("Sync fail") 

689 else: 

690 log_warning("Keycloak sync run finished successfully") 

691 # print("Sync success") 

692 finally: 

693 idp.timeout = timeout