Coverage for api/indexes.py: 24%
80 statements
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
1from os import environ
2from typing import Literal
4import config
5from bson import ObjectId
6from logging_config import logger
7from pymongo import MongoClient
8from sotrans_models.models.misc.client import ClientDBModel
9from sotrans_models.models.resources.drivers import DriverDBModel
10from sotrans_models.models.resources.trailers import TrailerDBModel
11from sotrans_models.models.resources.trucks import TruckDBModel
13MONGO_URI = environ.get("MONGO_URL", "")
15DB_NAME = environ.get("MONGO_DB_NAME", "")
17logger.info(f" Indexes MONGO_URI: {MONGO_URI}, MONGO_DB_NAME: {DB_NAME}")
18mongo_client = MongoClient(
19 MONGO_URI,
20)
22db = mongo_client[DB_NAME]
24IDX_VERSION = "0.2.0"
27def indexes_fts():
28 for orders_collection in (
29 "exchange_orders",
30 "buffer_orders",
31 "trash_orders",
32 "archive",
33 ):
34 db[orders_collection].drop_indexes()
36 for orders_collection in (
37 "exchange_orders",
38 "buffer_orders",
39 "trash_orders",
40 "archive",
41 ):
42 db[orders_collection].create_index("id", unique=True)
44 db["organizations"].drop_indexes()
46 db["drivers"].drop_indexes()
48 for col in ("trucks", "trailers"):
49 db[col].drop_indexes()
51 db["clients"].drop_indexes()
53 db["documents"].drop_indexes()
55 db["subsidiaries"].drop_indexes()
56 db["users"].drop_indexes()
58 db["api_keys"].drop_indexes()
59 db["api_keys"].create_index("name", unique=True)
60 db["api_keys"].create_index("api_key", unique=True)
61 migrate_for_unique_drivers()
62 db["drivers"].create_index(
63 [
64 DriverDBModel.passport.number,
65 DriverDBModel.passport.series,
66 DriverDBModel.organization_id,
67 ],
68 unique=True,
69 )
70 migrate_for_unique_vehicles("trucks")
71 db["trucks"].create_index(TruckDBModel.license_plate, unique=True)
72 migrate_for_unique_vehicles("trailers")
73 db["trailers"].create_index(TrailerDBModel.license_plate, unique=True)
74 migrate_for_unique_complex_key_clients()
75 db["clients"].create_index(
76 [ClientDBModel.inn, ClientDBModel.kpp], unique=True
77 )
80def migrate_for_unique_complex_key_clients():
81 checked_clients_ids = []
82 while True:
83 client = db["clients"].find_one({"_id": {"$nin": checked_clients_ids}})
84 if client is None:
85 return
86 cid = client.get("_id")
87 db["clients"].delete_many(
88 {
89 "_id": {"$ne": cid},
90 ClientDBModel.inn: client.get(ClientDBModel.inn),
91 ClientDBModel.kpp: client.get(ClientDBModel.kpp),
92 }
93 )
94 checked_clients_ids.append(cid)
97def migrate_for_unique_vehicles(col_name: Literal["trucks", "trailers"]):
98 checked_ids: list[ObjectId] = []
99 while True:
100 veh = db[col_name].find_one({"_id": {"$nin": checked_ids}})
101 if not veh:
102 return
103 licence_plate = veh.get(TrailerDBModel.license_plate)
104 db[col_name].delete_many(
105 {
106 "_id": {"$ne": veh["_id"]},
107 TrailerDBModel.license_plate: licence_plate,
108 }
109 )
110 checked_ids.append(veh["_id"])
113def migrate_for_unique_drivers():
114 checked_ids = []
115 while True:
116 driver = db["drivers"].find_one({"_id": {"$nin": checked_ids}})
117 if not driver:
118 return
119 passport = driver[DriverDBModel.passport]
120 org_id = driver[DriverDBModel.organization_id]
121 if not isinstance(passport, dict):
122 db["drivers"].delete_one({"_id": driver["_id"]})
123 continue
124 series = passport.get("series")
125 number = passport.get("number")
126 db["drivers"].delete_many(
127 {
128 "_id": {
129 "$ne": driver["_id"],
130 },
131 DriverDBModel.passport.series: series,
132 DriverDBModel.passport.number: number,
133 DriverDBModel.organization_id: org_id,
134 }
135 )
136 checked_ids.append(driver["_id"])
139def run_indexes(version: str):
140 if config.TESTING: 140 ↛ 142line 140 didn't jump to line 142 because the condition on line 140 was always true
141 return
142 record = db["versions"].find_one({"target": "indexes"})
143 if not record or record.get("version") != version:
144 indexes_fts()
145 db["versions"].insert_one({"target": "indexes", "version": version})