Coverage for api/utils/concurrency.py: 71%

7 statements  

« prev     ^ index     » next       coverage.py v7.6.2, created at 2024-10-10 03:02 +0300

1import asyncio 

2from typing import Any, Awaitable, Callable 

3 

4 

5def threadpool(func: Callable) -> Callable[..., Awaitable[Any]]: 

6 async def async_wrapper(*args): 

7 loop = asyncio.get_event_loop() 

8 return await loop.run_in_executor(None, func, *args) 

9 

10 return async_wrapper