Coverage for api/utils/excel.py: 13%
397 statements
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
1import functools
2import string
3import uuid
4from contextlib import suppress
5from io import BytesIO
6from typing import Any, Callable, Iterator, Sequence
8import aiofiles
9import config
10import pytz
11from bson import ObjectId
12from bson.errors import InvalidId
13from config import DEFAULT_RATE_STEP
14from database.text_search.mongo_search import build_search_query
15from dbcc import MongoTableEngine
16from errors import log_error
17from exceptions import BadParameterHTTPError, NoAccessHTTPError
18from fastapi import UploadFile
19from handlers.authorization.check_role import has_role
20from handlers.orders.buffer_orders import on_create_order, on_get_buffer
21from mongodb import clients_col, drivers_col, trailers_col, trucks_col
22from openpyxl.reader.excel import load_workbook
23from openpyxl.workbook import Workbook
24from openpyxl.worksheet.worksheet import Worksheet
25from services.opensearch_client import opensearch_adapter
26from sotrans_models.body_props import BODY_TYPES_MATCH, LOADING_TYPES_MATCH
27from sotrans_models.models._base import InsertByOIDModel, InsertByUUIDModel
28from sotrans_models.models.orders.order import (
29 AssignedCreateModel,
30 AssignedFromCompanyCreate,
31 OrderCreateModel,
32 OrderDBModel,
33 StopAddress,
34 StopModel,
35 StopType,
36)
37from sotrans_models.models.resources.trucks import (
38 BodySettingsModel,
39 BodyType,
40 LoadingType,
41)
42from sotrans_models.models.responses import GenericGetListResponse
43from sotrans_models.models.roles import SotransRole
44from sotrans_models.models.users import SotransOIDCUserModel
45from utils.concurrency import threadpool
46from utils.data_grabber import BaseGetListQueryParams
47from utils.dt_utils import ensure_dt, parse_datetime_str, parse_datetime_to_str
49XLS_IO_FIELD_NAMES = (
50 "Номер заказа",
51 "Краткий номер",
52 "*Цена клиента",
53 "*Конец аукциона",
54 "Стартовая цена",
55 "Шаг ставки",
56 "Конечная цена",
57 "*Дата и время первой погрузки",
58 "*Место первой погрузки",
59 "*Дата и время последней разгрузки",
60 "*Место последней разгрузки",
61 "Доп остановки",
62 "*Клиент",
63 "Груз",
64 "Тип кузова",
65 "Тип погрузки",
66 "Тоннаж",
67 "Объем",
68 "Примечания",
69 "Ответственный филиал",
70 "Ответственный логист",
71 "Перевозчик",
72 "Водитель",
73 "Транспорт",
74 "Прицеп",
75)
77(
78 NO,
79 SHORT_NO,
80 CLIENT_PRICE,
81 AUC_END_TIME,
82 START_PRICE,
83 RATE_STEP,
84 END_PRICE,
85 FIRST_LOADING_DT,
86 FIRST_LOADING_PLACE,
87 LAST_UNLOADING_DT,
88 LAST_UNLOADING_PLACE,
89 ADDITIONAL_STOPS,
90 CLIENT,
91 CARGO,
92 BODY_TYPE,
93 LOADING_TYPE,
94 WEIGHT,
95 VOLUME,
96 NOTES,
97 SUBSIDIARY,
98 LOGISTICIAN,
99 CARRIER,
100 DRIVER,
101 TRUCK,
102 TRAILER,
103) = XLS_IO_FIELD_NAMES
105VARIANTS = "варианты;варианты"
107HEADER_COMMENTS = {
108 ADDITIONAL_STOPS: "датавремя;тип;место|датавремя;тип;место",
109 BODY_TYPE: VARIANTS,
110 LOADING_TYPE: VARIANTS,
111 SUBSIDIARY: "Первый по названию, либо номер",
112 LOGISTICIAN: "Первый по фио, либо номер",
113 CARRIER: "Первый по наименованию, инн, либо номер в системе",
114 DRIVER: "Первый по фио, либо номер",
115 TRUCK: "Первый по гос номеру, либо номер в системе",
116 TRAILER: "Первый по гос номеру, либо номер в системе",
117 CLIENT: "Первый по названию, либо номер в системе",
118}
121class ExcelExporter:
122 workdir_path = "./assets"
124 def __init__(
125 self,
126 title: str,
127 heading: tuple[list[str], list[str | None]],
128 heading_cells_merge: list[str],
129 formatter: Callable[[Any], list[Any]],
130 ):
131 self.heading_cells_merge = heading_cells_merge
132 self.heading = heading
133 self.formatter = formatter
134 self.title = title
136 async def write_excel(self, data: list[Any]):
137 workbook = Workbook()
138 sheet = workbook.active
139 sheet.title = self.title
140 self.header_maker(sheet)
141 for item in data:
142 try:
143 sheet.append(self.formatter(item))
144 except Exception:
145 log_error("error on formatting excel output")
146 continue
147 file_path = f"{self.workdir_path}/{uuid.uuid4()}.xlsx"
148 await self.save_workbook(file_path, workbook)
149 return file_path
151 @threadpool
152 def save_workbook(self, file_path: str, workbook: Workbook):
153 workbook.save(file_path)
155 def header_maker(self, sheet: Worksheet):
156 for line in self.heading:
157 sheet.append(line)
158 for cell_letter in self.heading_cells_merge:
159 sheet.merge_cells(f"{cell_letter}1:{cell_letter}2")
162def get_column_index(
163 field_name: str,
164 field_names: list[str],
165) -> int:
166 """We put absent element allways in the tail of result and in the end truncate this cell"""
167 with suppress(ValueError):
168 return field_names.index(field_name)
169 return -1
172def get_first_loading(model_stops: list[StopModel]) -> tuple[str | None, ...]:
173 f_l_dt = None
174 f_l_address = None
175 first_stop_index = 0
176 with suppress(IndexError):
177 first_stop = model_stops[first_stop_index]
178 while first_stop.stop_type == StopType.unloading:
179 first_stop_index += 1
180 first_stop = model_stops[first_stop_index]
181 model_stops.pop(first_stop_index)
182 f_l_dt = (
183 parse_datetime_to_str(first_stop.datetime)
184 if first_stop.datetime
185 else None
186 )
187 f_l_address = (
188 first_stop.address.location_address or first_stop.address.line
189 )
190 return f_l_dt, f_l_address
193def get_last_stop(model_stops: list[StopModel]) -> tuple[str | None, ...]:
194 l_u_dt = None
195 l_u_address = None
196 last_stop_index = -1
197 with suppress(IndexError):
198 last_stop = model_stops[last_stop_index]
199 while last_stop.stop_type == StopType.loading:
200 last_stop_index -= 1
201 last_stop = model_stops[last_stop_index]
202 model_stops.pop(last_stop_index)
203 l_u_dt = (
204 parse_datetime_to_str(last_stop.datetime)
205 if last_stop.datetime
206 else None
207 )
208 l_u_address = (
209 last_stop.address.location_address or last_stop.address.line
210 )
211 return l_u_dt, l_u_address
214def get_additional_stops(model_stops: list[StopModel]) -> str:
215 additional_stops = ""
216 as_flag = False
217 for stop in model_stops:
218 if as_flag:
219 additional_stops += "|"
220 additional_stops += (
221 parse_datetime_to_str(stop.datetime) if stop.datetime else "--"
222 )
223 additional_stops += ";"
224 additional_stops += (
225 "погрузка" if stop.stop_type == StopType.loading else "разгрузка"
226 )
227 additional_stops += ";"
228 additional_stops += stop.address.location_address or stop.address.line
229 if not as_flag:
230 as_flag = True
231 return additional_stops
234def get_body_type(body_types_in: list[BodyType]) -> str:
235 body_types = ""
236 at_list_one = False
237 reverse_bt = {i[1]: i[0] for i in BODY_TYPES_MATCH.items()}
238 for bt in body_types_in:
239 bt_rus = reverse_bt.get(bt)
240 if bt_rus:
241 if at_list_one:
242 body_types += ";"
243 body_types += bt_rus
244 if not at_list_one:
245 at_list_one = True
246 return body_types
249def get_loading_type(loading_type_in: list[LoadingType] | None) -> str:
250 loading_type = ""
252 if not loading_type_in:
253 return loading_type
254 reverse_loading = {i[1]: i[0] for i in LOADING_TYPES_MATCH.items()}
255 at_list_one_loading = False
256 for lt in loading_type_in:
257 lt_rus = reverse_loading.get(lt)
258 if lt_rus:
259 if at_list_one_loading:
260 loading_type += ";"
261 loading_type += lt_rus
262 if not at_list_one_loading:
263 at_list_one_loading = True
264 return loading_type
267def get_driver_fio(model: OrderDBModel) -> str:
268 driver_fio = ""
269 if not model.driver:
270 return driver_fio
271 if model.driver.surname:
272 driver_fio += model.driver.surname
273 driver_fio += " "
274 if model.driver.name:
275 driver_fio += model.driver.name
276 driver_fio += " "
277 if model.driver.patronymic:
278 driver_fio += model.driver.patronymic
279 return driver_fio
282def get_employee_full_name(model: OrderDBModel) -> str:
283 employee = (
284 model.assigned
285 and model.assigned.company
286 and model.assigned.company.employee
287 )
288 efio = ""
289 if employee:
290 if employee.surname:
291 efio += employee.surname
292 efio += " "
293 if employee.name:
294 efio += employee.name
295 efio += " "
296 if employee.patronymic:
297 efio += employee.patronymic
298 if not efio:
299 efio = str(employee.id)
300 return efio
303async def export_orders_to_excel(buffer_models: list[OrderDBModel], cut: bool):
304 fields = list(XLS_IO_FIELD_NAMES)
305 if cut:
306 fields.remove(CLIENT)
307 fields.remove(CLIENT_PRICE)
308 index_getter: Callable[[str], int] = functools.partial(
309 get_column_index, field_names=fields
310 )
311 header_comments: list[str | None] = [None] * (len(fields) + 1)
312 merges: list[str | None] = list(string.ascii_uppercase)
313 for k in HEADER_COMMENTS:
314 i = get_column_index(k, fields)
315 header_comments[i] = HEADER_COMMENTS[k]
316 merges[i] = None
317 header_comments = header_comments[:-1]
318 merges = merges[: len(header_comments)]
319 merges = list(filter(None, merges))
321 def order_formatter(model: OrderDBModel):
322 line: list[Any] = [None] * (len(fields) + 1)
324 line[index_getter(NO)] = str(model.id)
325 line[index_getter(SHORT_NO)] = str(model.id)[-6:]
326 line[index_getter(CLIENT_PRICE)] = model.client_price
327 line[index_getter(AUC_END_TIME)] = parse_datetime_to_str(
328 model.auction_end_time, tz=pytz.timezone("Europe/Moscow")
329 )
330 line[index_getter(START_PRICE)] = model.start_price
331 line[index_getter(RATE_STEP)] = model.rate_step
332 line[index_getter(END_PRICE)] = model.end_price
334 model_stops = model.stops
335 (
336 line[index_getter(FIRST_LOADING_DT)],
337 line[index_getter(FIRST_LOADING_PLACE)],
338 ) = get_first_loading(model_stops)
339 (
340 line[index_getter(LAST_UNLOADING_DT)],
341 line[index_getter(LAST_UNLOADING_PLACE)],
342 ) = get_last_stop(model_stops)
343 line[index_getter(ADDITIONAL_STOPS)] = get_additional_stops(
344 model_stops
345 )
346 line[index_getter(CLIENT)] = (
347 model.client.short_name
348 or model.client.name
349 or str(model.client.id)
350 )
351 line[index_getter(CARGO)] = model.cargo_type
353 line[index_getter(BODY_TYPE)] = get_body_type(
354 model.truck_body.body_type
355 )
357 line[index_getter(LOADING_TYPE)] = get_loading_type(
358 model.truck_body.loading_type
359 )
360 line[index_getter(WEIGHT)] = model.truck_body.weight
361 line[index_getter(VOLUME)] = model.truck_body.volume
362 line[index_getter(NOTES)] = model.note
363 line[index_getter(SUBSIDIARY)] = (
364 model.assigned
365 and model.assigned.company
366 and model.assigned.company.subsidiary
367 and model.assigned.company.subsidiary.name
368 )
369 line[index_getter(LOGISTICIAN)] = get_employee_full_name(model)
370 line[index_getter(CARRIER)] = (
371 model.carrier.short_name
372 or model.carrier.name
373 or str(model.carrier.id)
374 if model.carrier
375 else ""
376 )
377 driver_fio = get_driver_fio(model)
378 line[index_getter(DRIVER)] = (
379 driver_fio or str(model.driver.id) if model.driver else ""
380 )
381 line[index_getter(TRUCK)] = (
382 model.truck.sts_number if model.truck else ""
383 )
384 line[index_getter(TRAILER)] = (
385 model.trailer.sts_number if model.trailer else ""
386 )
387 line = line[:-1]
388 return line
390 exporter = ExcelExporter(
391 heading_cells_merge=merges, # type: ignore[arg-type]
392 title="Заказы в буфере",
393 heading=(fields, header_comments),
394 formatter=order_formatter,
395 )
396 return await exporter.write_excel(buffer_models)
399@threadpool
400def parse_excel_import(excel_path: str):
401 workbook: Workbook = load_workbook(excel_path)
402 sheet: Worksheet = workbook.active
403 sheet_data = list(sheet.values)
404 return sheet_data
407KB = 1024
410async def save_excel(xls_file: UploadFile):
411 workdir_path = "./assets"
412 file_path = f"{workdir_path}/{xls_file.filename}"
413 try:
414 async with aiofiles.open(file_path, "wb") as f:
415 while contents := xls_file.file.read(KB):
416 await f.write(contents)
417 except Exception:
418 log_error()
419 raise BadParameterHTTPError
420 finally:
421 xls_file.file.close()
422 return file_path
425async def export_buffer(
426 user: SotransOIDCUserModel, params: BaseGetListQueryParams
427):
428 buffer = await on_get_buffer(user, params)
429 to_cut = not has_role(user, SotransRole.company_logistician)
430 export_file = await export_orders_to_excel(buffer.items, to_cut)
431 return export_file
434def parse_stops(stops: str | None) -> list[StopModel]:
435 stop_data: list[StopModel] = []
436 if not stops:
437 return stop_data
438 stops_strs = stops.split("|")
439 for stop_str in stops_strs:
440 dt_str, stop_type_rus, address = stop_str.split(";")
441 dt = parse_datetime_str(dt_str)
442 stop_type = "loading" if stop_type_rus == "погрузка" else "unloading"
443 stop_data.append(
444 StopModel(
445 datetime=dt,
446 address=StopAddress(line=address),
447 stop_type=StopType(stop_type),
448 )
449 )
450 return stop_data
453def make_body(
454 load_types: str | None,
455 truck_types: str | None,
456 w: float | str | None,
457 v: int | str | None,
458):
459 system_loading_types: list[LoadingType] = []
460 if load_types:
461 for lts in load_types.split(";"):
462 ltm = LOADING_TYPES_MATCH.get(lts.strip().lower())
463 if ltm is None:
464 continue
465 system_loading_types.append(ltm)
467 system_body_types: list[BodyType] = []
468 if truck_types:
469 for bt in truck_types.split(";"):
470 btm = BODY_TYPES_MATCH.get(bt.strip().lower())
471 if btm is None:
472 continue
473 system_body_types.append(btm)
474 if w:
475 w = float(w)
476 if v:
477 v = int(v)
478 body = BodySettingsModel(
479 body_type=system_body_types,
480 loading_type=system_loading_types,
481 weight=w,
482 volume=v,
483 )
484 return body
487def get_column_data(
488 name: str, heading: dict[str, int], data_line: Sequence[str]
489) -> str | None:
490 index = heading.get(name)
491 if index is None:
492 return None
493 with suppress(IndexError):
494 return data_line[index]
495 return None
498async def get_resource_id(
499 collection: MongoTableEngine, search_q: str
500) -> ObjectId | None:
501 if not search_q:
502 return None
503 ids_in = await opensearch_adapter.vector_search(
504 f"{config.MONGO_DB_NAME}.{collection.collection_name}", search_q
505 )
506 if not ids_in:
507 query = build_search_query(search_q, collection.collection_name)
508 regex_search_res = await collection.collection.find_one(
509 query, projection={"_id": 1}
510 )
511 if regex_search_res is None:
512 return None
513 return regex_search_res["_id"]
514 return ids_in[0]
517async def get_client_id(column_getter: functools.partial) -> ObjectId:
518 client_data = column_getter(CLIENT)
519 try:
520 client_id = ObjectId(client_data)
521 except InvalidId:
522 client_ids = await opensearch_adapter.vector_search(
523 f"{config.MONGO_DB_NAME}.{clients_col.collection_name}",
524 client_data,
525 )
526 if not client_ids:
527 q = build_search_query(client_data, clients_col.collection_name)
528 client_regex_search_res = await clients_col.collection.find_one(
529 q, projection={"_id": 1}
530 )
531 if client_regex_search_res is None:
532 raise ValueError
533 client_id = client_regex_search_res["_id"]
534 else:
535 client_id = client_ids[0]
536 return client_id
539async def make_order_from_line(
540 column_getter: functools.partial,
541) -> OrderCreateModel:
542 stops = []
543 loading_dt = column_getter(FIRST_LOADING_DT)
544 loading_place = column_getter(FIRST_LOADING_PLACE)
545 stops.append(
546 StopModel(
547 datetime=ensure_dt(loading_dt),
548 address=StopAddress(line=loading_place),
549 stop_type=StopType.loading,
550 )
551 )
552 other_stops = column_getter(ADDITIONAL_STOPS)
553 stops.extend(parse_stops(other_stops))
554 unloading_dt = column_getter(LAST_UNLOADING_DT)
555 unloading_place = column_getter(LAST_UNLOADING_PLACE)
556 stops.append(
557 StopModel(
558 datetime=ensure_dt(unloading_dt),
559 stop_type=StopType.unloading,
560 address=StopAddress(line=unloading_place),
561 )
562 )
563 subsidiary_id: str | ObjectId | None = column_getter(SUBSIDIARY)
564 client_id = await get_client_id(column_getter)
565 if subsidiary_id:
566 subsidiary_id = ObjectId(subsidiary_id)
567 employee_id = column_getter(LOGISTICIAN)
568 if employee_id:
569 employee_id = uuid.UUID(employee_id) # type: ignore[assignment]
570 carrier = column_getter(CARRIER)
571 if carrier:
572 try:
573 carrier = ObjectId(carrier)
574 except InvalidId:
575 raise ValueError
576 assigned = AssignedCreateModel(
577 company=AssignedFromCompanyCreate(
578 subsidiary=InsertByOIDModel(id=subsidiary_id),
579 employee=InsertByUUIDModel(id=employee_id),
580 )
581 )
582 truck_body = make_body(
583 column_getter(LOADING_TYPE),
584 column_getter(BODY_TYPE),
585 column_getter(WEIGHT),
586 column_getter(VOLUME),
587 )
588 start_price = column_getter(START_PRICE)
589 if start_price:
590 start_price = int(start_price)
591 end_price = column_getter(END_PRICE)
592 client_price = column_getter(CLIENT_PRICE)
593 rate_step = column_getter(RATE_STEP)
594 if not rate_step:
595 rate_step = DEFAULT_RATE_STEP
596 truck_id = column_getter(TRUCK)
597 if truck_id:
598 try:
599 truck_id = ObjectId(truck_id)
600 except InvalidId:
601 truck_id = await get_resource_id(trucks_col, truck_id)
602 trailer_id = column_getter(TRAILER)
603 if trailer_id:
604 try:
605 trailer_id = ObjectId(trailer_id)
606 except InvalidId:
607 trailer_id = await get_resource_id(trailers_col, trailer_id)
608 driver_id = column_getter(DRIVER)
609 if driver_id:
610 try:
611 driver_id = ObjectId(driver_id)
612 except InvalidId:
613 driver_id = await get_resource_id(drivers_col, driver_id)
614 return OrderCreateModel(
615 client=InsertByOIDModel(id=client_id),
616 assigned=assigned,
617 carrier=InsertByOIDModel(id=carrier),
618 truck=InsertByOIDModel(id=truck_id),
619 driver=InsertByOIDModel(id=driver_id),
620 trailer=InsertByOIDModel(id=trailer_id),
621 auction_end_time=parse_datetime_str(
622 column_getter(AUC_END_TIME), "+03:00"
623 ),
624 truck_body=truck_body,
625 stops=stops,
626 cargo_type=column_getter(CARGO),
627 start_price=start_price,
628 end_price=end_price,
629 client_price=client_price,
630 rate_step=rate_step,
631 note=column_getter(NOTES),
632 )
635def check_line(column_getter: functools.partial):
636 return (
637 column_getter(FIRST_LOADING_DT)
638 and column_getter(FIRST_LOADING_PLACE)
639 and column_getter(LAST_UNLOADING_DT)
640 and column_getter(LAST_UNLOADING_PLACE)
641 and column_getter(CLIENT)
642 and column_getter(CLIENT_PRICE)
643 and column_getter(AUC_END_TIME)
644 )
647def process_data_header(data_iterator: Iterator) -> dict:
648 try:
649 header_line = next(data_iterator)
650 except StopIteration:
651 raise BadParameterHTTPError("Нет заголовка")
652 data_header = {}
653 possible_field_names = set(XLS_IO_FIELD_NAMES)
654 for index, column in enumerate(header_line):
655 if column in possible_field_names:
656 data_header[column] = index
657 with suppress(StopIteration):
658 next(data_iterator)
659 return data_header
662async def parse_order_data(data: list[tuple]) -> list[OrderCreateModel]:
663 data_iterator = iter(data)
664 data_header = process_data_header(data_iterator)
665 orders: list = []
666 all_errors: list = []
667 for i, item in enumerate(data_iterator, start=3):
668 column_getter = functools.partial(
669 get_column_data, heading=data_header, data_line=item
670 )
671 if not check_line(column_getter):
672 continue
673 try:
674 orders.append(await make_order_from_line(column_getter))
675 except ValueError:
676 log_error(f"line {i} validation error")
677 all_errors.append(i)
678 if all_errors:
679 errors = ", ".join(map(str, all_errors))
680 raise BadParameterHTTPError(f"ошибки в строках {errors}")
681 return orders
684async def on_import_to_buffer(
685 excel_file: BytesIO, user: SotransOIDCUserModel
686) -> GenericGetListResponse[OrderDBModel]:
687 data = await parse_excel_import(excel_file)
688 orders = await parse_order_data(data)
689 imported_orders = []
690 for order in orders:
691 try:
692 imp_order = await on_create_order(user, order)
693 except (BadParameterHTTPError, NoAccessHTTPError):
694 continue
695 imported_orders.append(imp_order)
696 return GenericGetListResponse[OrderDBModel](
697 items=imported_orders, total=len(imported_orders)
698 )