Coverage for api/database/integration_1c/integration_mappers.py: 17%
91 statements
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
1import datetime
2from contextlib import suppress
3from typing import Any
5from bson import ObjectId
6from bson.errors import InvalidId
7from sotrans_models.models.misc.client import ClientDBModel
8from sotrans_models.models.misc.verification import VerificationStatus
9from sotrans_models.models.orders.order import OrderDBModel, StopType
10from sotrans_models.models.organizations import (
11 OrganizationDBModel,
12 SubsidiaryDBModel,
13)
14from sotrans_models.models.resources.drivers import DriverDBModel, DriverStatus
15from sotrans_models.models.resources.trailers import TrailerDBModel
16from sotrans_models.models.resources.trucks import LoadingType, TruckDBModel
17from sotrans_models.models.users import SotransUserDBModel
18from sotrans_models.utils.text_mappers import populate_if_filled
21def make_vehicle_1c(
22 vehicle: TruckDBModel | TrailerDBModel | dict, is_truck: bool = True
23) -> dict[str, Any]:
24 if isinstance(vehicle, dict):
25 vehicle = (
26 TruckDBModel(**vehicle) if is_truck else TrailerDBModel(**vehicle)
27 )
28 side_load = (
29 LoadingType.side in vehicle.body.loading_type
30 or LoadingType.any in vehicle.body.loading_type
31 if vehicle.body.loading_type
32 else False
33 )
34 top_load = (
35 LoadingType.top in vehicle.body.loading_type
36 or LoadingType.any in vehicle.body.loading_type
37 if vehicle.body.loading_type
38 else False
39 )
41 verification = None
42 if vehicle.verification:
43 verification = (
44 "Принято"
45 if vehicle.verification.status == VerificationStatus.accepted
46 else "Отклонено"
47 )
48 return {
49 "Верификация": verification,
50 "ИдентификаторБиржа": str(vehicle.id),
51 "Комментарий": vehicle.note,
52 "БоковаяЗагрузка": side_load,
53 "ВерхняяЗагрузка": top_load,
54 "НомерСТС": vehicle.sts_number,
55 "РазрешеннаяМаксМасса": vehicle.body.weight,
56 "ТипТС": "тягач" if is_truck else "прицеп",
57 }
60async def make_org_1c(
61 organization: OrganizationDBModel | dict,
62) -> dict[str, Any]:
63 if isinstance(organization, dict):
64 organization = OrganizationDBModel(**organization)
65 verification = None
66 if organization.verification:
67 verification = (
68 "Принято"
69 if organization.verification.status == VerificationStatus.accepted
70 else "Отклонено"
71 )
72 contact_info_table = []
73 if organization.phone:
74 contact_info_table.append({"НомерТелефона": organization.phone})
75 if organization.email:
76 contact_info_table.append({"АдресЭП": organization.email})
77 org_1c = {
78 "Верификация": verification,
79 "ИдентификаторБиржа": str(organization.id),
80 "ИНН": organization.inn,
81 "НаименованиеПолное": organization.name,
82 "ДатаРегистрации": organization.registration_date,
83 "ДатаСоздания": organization.created_at,
84 "КонтактнаяИнформация": contact_info_table,
85 }
86 if organization.contact_user:
87 org_1c["ОсновноеКонтактноеЛицо"] = await make_user_1c(
88 organization.contact_user
89 )
90 return org_1c
93def make_subsidiary_1c(subsidiary: SubsidiaryDBModel) -> dict[str, Any]:
94 return {
95 "НаименованиеВКазначействе": subsidiary.name,
96 "ИдентификаторБиржа": str(subsidiary.id),
97 }
100async def make_user_1c(
101 user: SotransUserDBModel | dict,
102 subsidiary: SubsidiaryDBModel | None = None,
103) -> dict[str, Any]:
104 if isinstance(user, dict):
105 user = SotransUserDBModel(**user)
106 if subsidiary is None and user.subsidiary_id:
107 with suppress(InvalidId):
108 sub_id = ObjectId(user.subsidiary_id)
109 subsidiary = SubsidiaryDBModel(
110 name=user.subsidiary_name, id=sub_id
111 )
112 contact_info_table = []
113 if user.phone:
114 contact_info_table.append({"НомерТелефона": user.phone})
115 if user.email:
116 contact_info_table.append({"АдресЭП": user.email})
117 return {
118 "Подразделение": None
119 if subsidiary is None
120 else make_subsidiary_1c(subsidiary),
121 "КонтактнаяИнформация": contact_info_table,
122 }
125def make_client_1c(client: ClientDBModel | dict) -> dict[str, Any]:
126 if isinstance(client, dict):
127 client = ClientDBModel(**client)
128 return {
129 "ИдентификаторБиржа": str(client.id),
130 "НаименованиеПолное": client.name or "",
131 "ИНН": client.inn,
132 "КПП": client.kpp,
133 "ОсновнойБанковскийСчет": client.correspondent_account,
134 "Комментарий": client.note,
135 }
138def make_driver_1c(driver: DriverDBModel | dict) -> dict[str, Any]:
139 if isinstance(driver, dict):
140 driver = DriverDBModel(**driver)
141 verification = None
142 if driver.verification:
143 verification = (
144 "Принято"
145 if driver.verification.status == VerificationStatus.accepted
146 else "Отклонено"
147 )
148 fio_container: list[str] = []
149 for f in (driver.surname, driver.name, driver.patronymic):
150 populate_if_filled(f, fio_container)
151 fio = " ".join(fio_container)
152 initials = "".join(letter[0] for letter in fio_container).upper()
153 driver_1c = {
154 "Верификация": verification,
155 "ИдентификаторБиржа": str(driver.id),
156 "ФИО": fio,
157 "Блокировка": driver.status == DriverStatus.blocked,
158 "Примечание": driver.note,
159 "Фамилия": driver.surname,
160 "Имя": driver.name,
161 "Отчество": driver.patronymic or "",
162 "Инициалы": initials,
163 "МестоРождения": driver.passport.birthplace,
164 "ДатаРождения": driver.passport.birthdate,
165 "ИНН": driver.inn,
166 "ДатаРегистрации": driver.created_at,
167 }
168 if driver.phone:
169 driver_1c["КонтактнаяИнформация"] = [{"НомерТелефона": driver.phone}]
170 return driver_1c
173def get_real_loading_dt(order: OrderDBModel) -> datetime.datetime | None:
174 if not order.completed_stages:
175 return None
176 for completed_stage in order.completed_stages:
177 if completed_stage.id == 1:
178 return completed_stage.timestamp
179 return None
182def get_real_unloading_dt(order: OrderDBModel) -> datetime.datetime | None:
183 if not (order.stops and order.completed_stages):
184 return None
185 last_stop_idx = len(order.stops)
186 for completed_stage in order.completed_stages:
187 if last_stop_idx == completed_stage.id:
188 return completed_stage.timestamp
189 return None
192def make_order_1c(order: OrderDBModel | dict) -> dict[str, Any]:
193 if isinstance(order, dict):
194 order = OrderDBModel(**order)
195 return {
196 "ИдентификаторБиржа": str(order.id),
197 "Агент": make_client_1c(order.client),
198 "АдресДоставки": order.stops[-1].address.line if order.stops else "",
199 "ВалютаДокумента": order.currency,
200 "Водитель": make_driver_1c(order.driver),
201 "ДатаПогрузки": order.stops[0].datetime if order.stops else None,
202 "ДатаПогрузкиФакт": get_real_loading_dt(order),
203 "ДатаРазгрузки": order.stops[-1].datetime if order.stops else None,
204 "ДатаРазгрузкиФакт": get_real_unloading_dt(order),
205 "ДлинаПути": order.route_km,
206 "Тягач": make_vehicle_1c(order.truck),
207 "Комментарий": order.note,
208 "ДатаСоздания": order.created_at,
209 "Аукцион": bool(order.best_bid),
210 "Погрузка": [
211 s.address.line
212 for s in order.stops or []
213 if s.stop_type == StopType.loading
214 ],
215 "Разгрузка": [
216 s.address.line
217 for s in order.stops or []
218 if s.stop_type == StopType.loading
219 ],
220 "Грузы": [{"ОписаниеГруза": order.cargo_type}],
221 }