Coverage for api/utils/concurrency.py: 71%
7 statements
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
« prev ^ index » next coverage.py v7.6.2, created at 2024-10-10 03:02 +0300
1import asyncio
2from typing import Any, Awaitable, Callable
5def threadpool(func: Callable) -> Callable[..., Awaitable[Any]]:
6 async def async_wrapper(*args):
7 loop = asyncio.get_event_loop()
8 return await loop.run_in_executor(None, func, *args)
10 return async_wrapper