如何将 asyncio 与现有的阻塞库一起使用?

我的阻塞功能很少foo,bar我无法更改它们(一些我无法控制的内部库。与一个或多个网络服务对话)。我如何将其用作异步?例如,我不想做以下事情。

results = []
for inp in inps:
val = foo(inp)
result = bar(val)
results.append(result)
这将是低效的,因为我可以foo在等待第一个输入和相同的bar. 我如何包装它们以便它们可用于 asyncio(即 new async,await语法)?

让我们假设函数是可重入的。即foo在前一个foo正在处理时再次调用是可以的。

更新

使用可重用装饰器扩展答案。例如,请单击此处。

def run_in_executor(f):
@functools.wraps(f)
def inner(*args, **kwargs):
loop = asyncio.get_running_loop()
return loop.run_in_executor(None, functools.partial(f, *args, **kwargs))

return inner

已邀请:
这里有(某种)两个问题:第一,如何异步运行阻塞代码,第二,如何并发运行异步代码(asyncio 是单线程的,所以 GIL 仍然适用,所以它不是真正的并行,但是我离题了)。

可以使用 asyncio.ensure_future 创建并发任务,如此处所述。

要运行同步代码,您需要在 executor 中运行阻塞代码。例子:

import concurrent.futures
import asyncio
import time

def blocking(delay):
time.sleep(delay)
print('Completed.')

async def non_blocking(loop, executor):
# Run three of the blocking tasks concurrently. asyncio.wait will
# automatically wrap these in Tasks. If you want explicit access
# to the tasks themselves, use asyncio.ensure_future, or add a
# "done, pending = asyncio.wait..." assignment
await asyncio.wait(
fs={
# Returns after delay=12 seconds
loop.run_in_executor(executor, blocking, 12),

# Returns after delay=14 seconds
loop.run_in_executor(executor, blocking, 14),

# Returns after delay=16 seconds
loop.run_in_executor(executor, blocking, 16)
},
return_when=asyncio.ALL_COMPLETED
)

loop = asyncio.get_event_loop()
executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
loop.run_until_complete(non_blocking(loop, executor))
如果您想使用 for 循环(如您的示例)安排这些任务,您有几种不同的策略,但底层方法是使用 for 循环(或列表理解等)安排任务,使用 asyncio 等待它们。等待,然后检索结果。例子:

done, pending = await asyncio.wait(
fs=[loop.run_in_executor(executor, blocking_foo, *args) for args in inps],
return_when=asyncio.ALL_COMPLETED
)

# Note that any errors raise during the above will be raised here; to
# handle errors you will need to call task.exception() and check if it
# is not None before calling task.result()
results = [task.result() for task in done]

要回复问题请先登录注册