Coverage for api/endpoints/orders/canceled.py: 76%

37 statements  

« prev     ^ index     » next       coverage.py v7.6.2, created at 2024-10-10 03:02 +0300

1from typing import Annotated 

2 

3from fastapi import APIRouter, Depends 

4from handlers.authorization.company_employee_access import ( 

5 flexible_company_assertion_query, 

6) 

7from handlers.grabbers.orders import orders_data_grabber 

8from handlers.orders.cancellation import ( 

9 on_cancelled_to_appointment, 

10 on_cancelled_to_buffer, 

11 on_cancelled_to_confirmed, 

12 on_cancelled_to_exchange, 

13 on_utilize_canceled, 

14) 

15from keycloak import idp 

16from sotrans_models.models._mongo import PydanticObjectIdPath 

17from sotrans_models.models.orders.order import ( 

18 OrderDBModel, 

19 OrderStatus, 

20 OrderUpdateModel, 

21) 

22from sotrans_models.models.responses import ( 

23 ErrorRepr, 

24 GenericGetListResponse, 

25 MultiselectError, 

26) 

27from sotrans_models.models.roles import SotransRole 

28from sotrans_models.models.users import SotransOIDCUserModel 

29from utils.data_grabber import ( 

30 BaseGetListQueryParams, 

31 BaseGetOneQueryParams, 

32 update_search_query, 

33) 

34from utils.helper import multiexec 

35 

36canceled_router = APIRouter( 

37 prefix="/canceled-orders", 

38 tags=["orders", "canceled"], 

39 responses={404: {"model": ErrorRepr}}, 

40) 

41 

42 

43@canceled_router.put("/{order_ids}/to-trash") 

44async def utilize_canceled( 

45 order_ids: str, 

46 _: Annotated[ 

47 SotransOIDCUserModel, 

48 Depends( 

49 idp.get_current_user( 

50 required_role_names=[SotransRole.company_logistician] 

51 ) 

52 ), 

53 ], 

54) -> MultiselectError: 

55 return await multiexec(order_ids, on_utilize_canceled) 

56 

57 

58@canceled_router.put("/{order_ids}/to-exchange") 

59async def cancelled_to_exchange( 

60 order_ids: str, 

61 _: Annotated[ 

62 SotransOIDCUserModel, 

63 Depends( 

64 idp.get_current_user( 

65 required_role_names=[SotransRole.company_logistician] 

66 ) 

67 ), 

68 ], 

69 order: OrderUpdateModel | None = None, 

70) -> MultiselectError: 

71 return await multiexec(order_ids, on_cancelled_to_exchange, order) 

72 

73 

74@canceled_router.put("/{order_ids}/to-buffer") 

75async def cancelled_to_buffer( 

76 order_ids: str, 

77 _: Annotated[ 

78 SotransOIDCUserModel, 

79 Depends( 

80 idp.get_current_user( 

81 required_role_names=[SotransRole.company_manager] 

82 ) 

83 ), 

84 ], 

85 order: OrderUpdateModel | None = None, 

86) -> MultiselectError: 

87 return await multiexec(order_ids, on_cancelled_to_buffer, order) 

88 

89 

90@canceled_router.put("/{order_ids}/to-appointment") 

91async def cancelled_to_appointment( 

92 order_ids: str, 

93 _: Annotated[ 

94 SotransOIDCUserModel, 

95 Depends( 

96 idp.get_current_user( 

97 required_role_names=[SotransRole.company_logistician] 

98 ) 

99 ), 

100 ], 

101 order: OrderUpdateModel | None = None, 

102) -> MultiselectError: 

103 return await multiexec(order_ids, on_cancelled_to_appointment, order) 

104 

105 

106@canceled_router.put("/{order_ids}/to-confirmed") 

107async def cancelled_to_confirmed( 

108 order_ids: str, 

109 _: Annotated[ 

110 SotransOIDCUserModel, 

111 Depends( 

112 idp.get_current_user( 

113 required_role_names=[SotransRole.company_logistician] 

114 ) 

115 ), 

116 ], 

117 order: OrderUpdateModel, 

118) -> MultiselectError: 

119 return await multiexec(order_ids, on_cancelled_to_confirmed, order) 

120 

121 

122@canceled_router.get("") 

123async def get_canceled_orders( 

124 user: Annotated[ 

125 SotransOIDCUserModel, 

126 Depends( 

127 idp.get_current_user( 

128 required_role_names=[SotransRole.company_logistician] 

129 ) 

130 ), 

131 ], 

132 params: BaseGetListQueryParams = Depends(), 

133) -> GenericGetListResponse[OrderDBModel]: 

134 params.where = update_search_query( 

135 params.where, {OrderDBModel.status: OrderStatus.canceled.value} 

136 ) 

137 return await orders_data_grabber.get_list(params, user) 

138 

139 

140@canceled_router.get("/{order_id}") 

141async def get_canceled_by_id( 

142 order_id: PydanticObjectIdPath, 

143 user: Annotated[ 

144 SotransOIDCUserModel, 

145 Depends( 

146 idp.get_current_user( 

147 required_role_names=[SotransRole.company_logistician] 

148 ) 

149 ), 

150 ], 

151 params: BaseGetOneQueryParams = Depends(), 

152) -> OrderDBModel: 

153 restriction_q = await flexible_company_assertion_query(user) 

154 return await orders_data_grabber.get_one_by_id_with_pattern( 

155 order_id, params, restriction_q 

156 )