Yukirito's cookbook

  • 首页

  • 关于

  • 标签

  • 分类

  • 归档

Async IO in Python

发表于 2020-01-06 更新于 2020-04-02 分类于 Python
本文字数: 13k 阅读时长 ≈ 11 分钟

本文将会解释 Python 的异步模块 asyncio 的概念和基本用法。

Async IO 是一种并发编程设计,在 Python 中得到了专门的支持,从 Python 3.4迅速发展到3.7,甚至可能更高。

以下你将会涉及的内容

  • 异步 IO (Asynchronous IO | async IO) : 一种与语言无关的范例(模型) ,其实现跨多种编程语言
  • async/await: 两个用于定义协同程序的新 Python 关键字
  • asyncio: Python 包,它为运行和管理协同程序提供了基础和 API

协同程序(专门的生成器函数)是 Python 中异步 IO 的核心,稍后我们将深入讨论它们。

准备工作

工作环境

  1. 在本文中,我使用术语 async IO 来表示异步 IO 的语言无关设计,而 asyncio 指的是 Python 包。
  2. 你需要 Python 3.7或者更高版本来完整地阅读本文,还需要 aiohttp 和 aiofiles 包
    1
    2
    3
    4
    >$ python3.7 -m venv ./py37async
    >$ source ./py37async/bin/activate#Windows:.\py37async\Scripts\activate.bat
    >$ pip install --upgrade pip aiohttp aiofiles # Optional: aiodns
    >

在Async IO前世今生

Async IO用在什么地方?

Async IO 比起多进程和线程, 知名度会更低一点, 来一点一点了解。

Parallelism(并行性): 在同一时间执行多个操作。
Multiprocessing(多进程): 是一种实现Parallelism(并行性)的手段, 它需要在计算机的中央处理单元(cpu 或内核)上分配任务。 多进程非常适合处理CPU密集型(CPU-bound)的任务: 非常依赖循环或者数学计算通常属于这一类。
Concurrency(并发性)是一个比并行性稍宽泛的术语。 它表明多个任务能够以重叠的方式运行。 (有一种说法是,并发并不意味着并行。)

线程是一种并发执行模型,多个线程轮流执行任务。 一个进程可以包含多个线程。 因为全局解释器锁(GIL)的关系,Python 与线程的关系非常复杂,但这超出了本文的范围。

了解线程处理的重要性在于,它更适合处理IO密集型(IO-bound)相关的任务。 当计算机内核工作从开始到结束时,主要的消耗都在输入 / 输出上面。

总结一下,Concurrency(并发性)包括Multiprocessing(多进程)(适合处理CPU密集型的任务)和threading(线程)(适合处理IO密集型的任务)。 Multiprocessing(多进程)是Parallelism(并行性)的一种形式,Parallelism(并行性)是Concurrency(并发性)的一种特定类型(子集)。 Python multiprocessing、threading 和 concurrent.futures包 为这两者提供了长期的支持。

现在是时候加入一个新成员了。 在过去的几年中,一个分离的设计已经更加全面地内置到 CPython 中: asynchronous IO(异步 IO),通过标准库的 asyncio 包和新的 async 和 await 关键字启用。
需要澄清的是,asynchronous IO 并不是一个新发明的概念,它已经存在或者正在被构建到其他语言和运行时环境中,比如 Go、 C# 或者 Scala。

Python 文档将 asyncio 包称为编写并发代码的库。 但是,异步 IO 不是线程,也不是多进程。 它不是建立在这两者之上的。

实际上,async IO 是一种单线程、单进程的设计: 它使用cooperative multitasking(协作多任务),本笔记结束时你将充实这个术语。 换句话说,尽管在单个进程中使用单个线程,async IO 可以给人一种并发的感觉。 协同程序(async IO 的一个主要特性)可以并发调度,但它们本身并不是并发的。

重申一下,async IO 是并发编程的一种风格,但它不是Parallelism。 与Multiprocessing相比,它更接近于threading,但与这两者非常不同,是并发技巧中的一个独立成员。

还剩下一个问题, asynchronous是什么意思? 要注意下面asynchronous(异步)的介绍并不是一个严格的定义, 但是我们可以如此描述:

  • Asynchronous routines(异步例程)可以在等待最终结果时“暂停” ,同时让其他routines运行

  • 通过上述机制,异步代码促进了并发执行。 换句话说,异步代码给出了并发的外观和感觉

    这里有一个图表把它们放在一起。 白色术语代表概念,绿色术语代表实施或生效的方式

ok, 对于并发编程模型之间的比较先停止。 这个笔记主要关注async IO的子组件、如何使用它以及围绕它涌现的 api。 要彻底探索threading与multiprocessing相对于async IO的区别,请查看 Jim Anderson 对 Python 中并发性的概述: overview of concurrency in Python

async IO 解释

异步输入输出一开始看起来似乎是违反直觉和矛盾的。 搞并发代码的东西如何使用一个线程和一个 CPU 核心? 下面这个例子很好地解释了一切:

陈刀仔之徒卢本伟举办了一场斗地主争霸赛,在这场比赛中,他将与多名水友打牌, 并夺得胜利。 他有两种比赛的方式: 同步和异步。

假设:

  • 24个对手
  • 卢本伟在5秒内打出1次牌
  • 每局游戏平均每人要打出30次牌(总共一局60次)

Synchronous version(同步版本): 卢本伟1次玩1局斗地主,永远不要同时玩两个,直到游戏结束。 每场比赛需时(55 + 5) * 30==1800秒,或者说30分钟。 整个比赛历时24 * 30==720分钟,也就是12小时。

Asynchronous version(异步版本): 卢本伟从一张比赛桌移动到另一张比赛桌,每个桌子移动一次。 当他离开桌子的时候,让水友在等待期间采取下一步行动。 24场比赛中每遍历一次需要卢本伟24 * 5==120秒,或2分钟。 整场斗地主争霸赛现在缩短到120 * 30==3600秒,或者只有1个小时。

只有一位卢本伟,他只有一双手,一次只能移动打一次牌。 但是在赛场不停地切换比赛座, 让时间从12小时减少到了1小时。 因此, cooperative multitasking(协同多任务)是一种fancy的说法,即程序的事件循环(稍后再详述)与多个任务通信,让每个任务在最佳时间轮流运行。

Async IO 需要很长的等待时间,否则函数会被阻塞,并允许其他函数在停机期间运行。 (阻塞函数有效地阻止其他函数从启动到返回的时间内运行。)

asyncio 模块 和 async/await

现在您已经了解了async IO 设计的一些背景知识,接下来让我们探索 Python 的实现。 Python 的 asyncio 包(在 Python 3.4中引入)及其两个关键字 async 和 await 可以达到不同的目的,但是它们一起可以帮助您声明、构建、执行和管理异步代码。

关于 async/await 的语法 和 Native Coroutines(本地协程)

A Word of Caution:

小心你在网上读到的东西。 async IO API 已经从 Python 3.4迅速发展到 Python 3.7。 一些旧的模式已经不再使用,一些最初被禁止的东西现在可以通过新的引入来使用。 本笔记也将很快加入过时的垃圾桶。 - 2020-1-6 17:52:02

async IO 的核心是 coroutines (协同程序)。 coroutines 是 Python 生成器函数的特殊版本。 让我们从一个 baseline(基础版本)定义开始,然后在这里进行构建: 协同程序是一个函数,它可以在到达 return 之前暂停执行,并且可以间接地将控制权传递给另一个 coroutine 一段时间。

稍后,你将更深入地了解传统生成器到底是如何重新用于协同程序的。 目前,了解协同程序如何工作的最简单方法是开始制作一些协同程序。

asyncio API

下面介绍 asyncio 模块最主要的几个API。注意,必须使用 Python 3.7 或更高版本,早期的语法已经变了。

第一步,import 加载 asyncio 模块。

1
import asyncio

第二步,函数前面加上 async 关键字,就变成了 async 函数。这种函数最大特点是执行可以暂停,交出执行权。

1
2
> async def main():
>

第三步,在 async 函数内部的异步任务前面,加上await命令。

1
2
> await asyncio.sleep(1)
>

上面代码中,asyncio.sleep(1) 方法可以生成一个异步任务,休眠1秒钟然后结束。

执行引擎遇到await命令,就会在异步任务开始执行之后,暂停当前 async 函数的执行,把执行权交给其他任务。等到异步任务结束,再把执行权交回 async 函数,继续往下执行。

第四步,async.run() 方法加载 async 函数,启动事件循环。

1
2
> asyncio.run(main())
>

上面代码中,asyncio.run() 在事件循环上监听 async 函数main的执行。等到 main 执行完了,事件循环才会终止。

asyncio 模块在单线程上启动一个事件循环(event loop),时刻监听新进入循环的事件,加以处理,并不断重复这个过程,直到异步任务结束。事件循环的内部机制,可以参考 JavaScript 的模型,两者是一样的。


async 函数示例

让我们采用浸入式方法编写一些异步 IO 代码。 这个简短的程序是 async IO 的 Hello World,但是在说明其核心功能方面还有很长的路要走:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#!/usr/bin/env python3
# count_async.py

import asyncio

async def count():
print("One")
await asyncio.sleep(1)
print("Two")

async def main():
await asyncio.gather(count(), count(), count())

if __name__ == "__main__":
import time
s = time.perf_counter()
asyncio.run(main())
elapsed = time.perf_counter() - s
print(f"{__file__} executed in {elapsed:0.2f} seconds.")

上面脚本中,在 async 函数main的里面,asyncio.gather() 方法将多个异步任务(三个 count())包装成一个新的异步任务,必须等到内部的多个异步任务都执行结束,这个新的异步任务才会结束。

脚本的运行结果如下。

1
2
3
4
5
6
7
8
$ python3 count_async.py
One
One
One
Two
Two
Two
count_async.py executed in 1.01 seconds.

上面运行结果的原因是,三个 count() 依次执行,打印完 One,就休眠1秒钟,把执行权交给下一个 count(),所以先连续打印出三个 One。等到1秒钟休眠结束,执行权重新交回第一个 count(),开始执行 await 命令下一行的语句,所以会接着打印出三个Two。脚本总的运行时间是1秒。

这个输出的顺序是异步 IO 的核心。 对 count ()的每个调用进行通信是单个事件循环或协调器。 当每个任务到达并等待 asyncio.sleep (1)时,该函数对事件循环大喊,并将控制权交还给它,它说: “我要睡一秒钟。 在此期间,继续做一些有意义的事情。”

作为对比,下面是这个例子的同步版本 synchronous.py。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#!/usr/bin/env python3
# count_synchronous.py

import time

def count():
print("One")
time.sleep(1)
print("Two")

def main():
for _ in range(3):
count()

if __name__ == "__main__":
s = time.perf_counter()
main()
elapsed = time.perf_counter() - s
print(f"{__file__} executed in {elapsed:0.2f} seconds.")

执行时,顺序和执行时间会有一个细微但关键的变化:

1
2
3
4
5
6
7
8
$ python3 count_synchronous.py
One
Two
One
Two
One
Two
count_synchronous.py executed in 3.01 seconds.

虽然使用 time.sleep ()和 asyncio.sleep ()可能看起来很老套,但它们被用作任何涉及等待时间的时间密集型进程的替代品。 (你可以等待的最平凡的事情就是一个基本上什么都不做的睡眠调用。) 也就是说,time.sleep ()可以表示任何耗时的阻塞函数调用,而 asyncio.sleep ()用于代替非阻塞调用(但也需要一些时间才能完成)。

正如你将在下一节中看到的,等待包括 asyncio.sleep ()在内的内容的好处是,周围的函数可以暂时将控制权交给另一个更容易立即执行某些操作的函数。 相比之下,time.sleep ()或任何其他阻塞调用都与异步 Python 代码不兼容,因为它会在休眠期间停止一切。

Async IO的规则

在这里, 一个关于 async, await, 和coroutine functions 的更加正式的定义是它们在创建的时候是有序的

  • 语法 async def 引入了一个 native coroutine(本地协程) 或者说一个 asynchronous generator(异步生成器) , 而表达式 async with 和 async for 也是合法的, 稍后你会看到它们
  • 关键字 await 将函数控制传递回 event loop(事件循环)。 (它暂停执行周围的协同程序。) 如果 Python 遇到 g ()范围内的 await f ()表达式,这就是 await 告诉事件循环,“暂停 g ()的执行,直到返回我正在等待的 f ()的结果。 与此同时,让其它部分依旧运行。”

在代码中,第二个要点看起来大致如下:

1
2
3
4
async def g():
# Pause here and come back to g() when f() is ready
r = await f()
return r

对于何时以及如何使用 async / await,还有一套严格的规则。 无论你是否还在学习语法,或者已经接触过使用 async / await,这些方法都很方便:

  • 使用 async def 引入的函数是 coroutine。 它可以使用await, return, 或者 yield,但所有这些都是可选的。 声明 async def noop () : pass 是合法的:

    • 使用 await and/or return 创建一个 coroutine 函数。 要调用协同程序函数,必须 await 它去获取它的结果。
    • 在 async def 块中使用 yield 不太常见(而且最近在 Python 中才是合法的)。 这将创建一个异步生成器,您可以使用 async for 对其进行迭代。 暂时忘记异步生成器,专注于认真对待 coroutine 函数的语法,这些函数使用 await and/or return。
    • 任何使用 async def 定义的内容都不能使用 yield from,这将引发 SyntaxError 错误。
  • 就像在 def 函数之外使用 yield 是 SyntaxError 一样,在 async def coroutine 之外使用 await 也是 SyntaxError 。 你只能在协同程序的主体(body)中使用 await 。

    以下是一些简短的例子,旨在总结上述几条规则:

1
2
3
4
5
6
7
8
9
10
11
12
13
async def f(x):
y = await z(x) # OK - `await` and `return` allowed in coroutines
return y

async def g(x):
yield x # OK - this is an async generator

async def m(x):
yield from gen(x) # No - SyntaxError

def m(x):
y = await z(x) # Still no - SyntaxError (no `async def` here)
return y

最后, 当你使用 await f()的时候, f()必须是一个 awaitable 的对象.
啥? 这没什么卵用?
现在你只需知道一个awaitable的对象必须是(1)另一个coroutine, 或者是(2)一个定义了 .__await__()dunder方法而且返回一个迭代器的对象。
如果您正在编写一个程序,对于大多数目的来说,您只需要担心(1)类情况。

这又给我们带来了另一个技术上的区别,您可能会看到它的弹出: 将函数标记为 coroutine 的一种较老的方法是使用@asyncio 来装饰一个普通的 def 函数。 协同作用。 结果是一个基于生成器的协同程序( generator-based coroutine )。 由于在 Python 3.5中使用了 async / await 语法,这种结构已经过时。

这两个 协程 基本上是等价的(两者都是 awaitable ) ,但第一个协程是基于生成器的( generator-based ),而第二个协程是本地协同程序(native coroutine):

1
2
3
4
5
6
7
8
9
10
import asyncio

@asyncio.coroutine
def py34_coro():
"""Generator-based coroutine, older syntax"""
yield from stuff()

async def py35_coro():
"""Native coroutine, modern syntax"""
await stuff()

如果您自己编写任何代码,为了显式而不是隐式( explicit rather than implicit ),请选择 native coroutines。 基于生成器的协程将在 Python 3.10中删除( removed)。

在本笔记的后半部分,我们将仅仅为了解释而涉及一点点基于生成器的协程。 之所以引入 async / await,是为了使协同程序成为 Python 的一个独立特性,可以轻松地将其与普通的生成器函数区分开来,从而减少模糊性。

不要陷入基于生成器的协同程序,这些协同程序已经被 async / await 故意过时( deliberately outdated )了。 它们有自己的小规则集(例如,await 不能用于基于生成器的协同程序) ,如果坚持使用 async / await 语法,那么这些规则在很大程度上是不相关的。

我们来看一些更复杂的栗子。

这里有一个关于 async IO 如何减少等待时间的栗子: 给定一个 coroutine, 维持(maintain)一个持续产生在range[0, 10]之间随机整数的 makerandom() 函数, 直到其中一个超过阈值(threshold)为止 。

你希望让这个 coroutine 的多个调用不需要相互等待连续地完成。 您可以基本上遵循上面两个脚本中的模式,只需稍作修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#!/usr/bin/env python3
# rand.py

import asyncio
import random

# ANSI colors
c = (
"\033[0m", # End of color
"\033[36m", # Cyan
"\033[91m", # Red
"\033[35m", # Magenta
)

async def makerandom(idx: int, threshold: int = 6) -> int:
print(c[idx + 1] + f"Initiated makerandom({idx}).")
i = random.randint(0, 10)
while i <= threshold:
print(c[idx + 1] + f"makerandom({idx}) == {i} too low; retrying.")
await asyncio.sleep(idx + 1)
i = random.randint(0, 10)
print(c[idx + 1] + f"---> Finished: makerandom({idx}) == {i}" + c[0])
return i

async def main():
res = await asyncio.gather(*(makerandom(i, 10 - i - 1) for i in range(3)))
return res

if __name__ == "__main__":
random.seed(444)
r1, r2, r3 = asyncio.run(main())
print()
print(f"r1: {r1}, r2: {r2}, r3: {r3}")

彩色输出比我能说的要多得多,并且让你了解这个脚本是如何执行的:

这个程序使用一个main coroutine,makerandom () ,并且在3个不同的输入端并发地运行它。 大多数程序将包含小的、模块化的协程和一个将每个较小的协程整合在一起的 wrapper函数。 然后使用 main ()通过在一些迭代器或池(pool)之间映射到central coroutine来收集任务(futures)。

在这个小栗子中,池(pool)是range (3)。 在后面提供的一个更完整的示例中,它是一组需要被请求、解析和并发处理的 URL,而main ()封装了每个 URL 的整个例程(routine)。

虽然“生成随机整数”(比其他任何方法都要 CPU-bound )可能不是 asyncio 的最佳选择,但是在这个示例中,asyncio.sleep ()的出现是为了模拟 IO-bound 的进程,其中涉及不确定的等待时间。 例如,asyncio.sleep ()调用可能表示在消息应用程序中的两个客户机之间发送和接收非随机整数。

Async IO 设计模式

Async IO 提供了自己的一组可能的脚本设计,本节将介绍这些设计。

Chaining Coroutines

协程的一个关键特性是它们可以链接在一起。 (记住,一个协同程序对象是可以被唤醒的( awaitable ),所以另一个协同程序可以等待 (await) 它。) 这样你就可以把程序分解成更小的、可管理的、可回收的协同程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#!/usr/bin/env python3
# chained.py

import asyncio
import random
import time

async def part1(n: int) -> str:
i = random.randint(0, 10)
print(f"part1({n}) sleeping for {i} seconds.")
await asyncio.sleep(i)
result = f"result{n}-1"
print(f"Returning part1({n}) == {result}.")
return result

async def part2(n: int, arg: str) -> str:
i = random.randint(0, 10)
print(f"part2{n, arg} sleeping for {i} seconds.")
await asyncio.sleep(i)
result = f"result{n}-2 derived from {arg}"
print(f"Returning part2{n, arg} == {result}.")
return result

async def chain(n: int) -> None:
start = time.perf_counter()
p1 = await part1(n)
p2 = await part2(n, p1)
end = time.perf_counter() - start
print(f"-->Chained result{n} => {p2} (took {end:0.2f} seconds).")

async def main(*args):
await asyncio.gather(*(chain(n) for n in args))

if __name__ == "__main__":
import sys
random.seed(444)
args = [1, 2, 3] if len(sys.argv) == 1 else map(int, sys.argv[1:])
start = time.perf_counter()
asyncio.run(main(*args))
end = time.perf_counter() - start
print(f"Program finished in {end:0.2f} seconds.")

请仔细关注输出,其中 part1() 睡眠时间可变, part2() 开始处理可用的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$ python3 chained.py 9 6 3
part1(9) sleeping for 4 seconds.
part1(6) sleeping for 4 seconds.
part1(3) sleeping for 0 seconds.
Returning part1(3) == result3-1.
part2(3, 'result3-1') sleeping for 4 seconds.
Returning part1(9) == result9-1.
part2(9, 'result9-1') sleeping for 7 seconds.
Returning part1(6) == result6-1.
part2(6, 'result6-1') sleeping for 4 seconds.
Returning part2(3, 'result3-1') == result3-2 derived from result3-1.
-->Chained result3 => result3-2 derived from result3-1 (took 4.00 seconds).
Returning part2(6, 'result6-1') == result6-2 derived from result6-1.
-->Chained result6 => result6-2 derived from result6-1 (took 8.01 seconds).
Returning part2(9, 'result9-1') == result9-2 derived from result9-1.
-->Chained result9 => result9-2 derived from result9-1 (took 11.01 seconds).
Program finished in 11.01 seconds.

在这种设置中,main()的运行时间等于它收集并调度的任务的最大运行时间

References

  • Async IO in Python: A Complete Walkthrough
  • Overview of Concurrency in Python
如果我的文章帮到了你, 那我就很开心啦~(๑╹◡╹)ノ
Yukirito 微信支付

微信支付

Yukirito 支付宝

支付宝

ArrayList Excecises
2020 月个四前
  • 文章目录
  • 站点概览
Yukirito

Yukirito

Yonghua Chan(陈 永华)
8 日志
6 分类
2 标签
  1. 1. 准备工作
  2. 2. 在Async IO前世今生
    1. 2.1. Async IO用在什么地方?
    2. 2.2. async IO 解释
  3. 3. asyncio 模块 和 async/await
    1. 3.1. 关于 async/await 的语法 和 Native Coroutines(本地协程)
      1. 3.1.1. asyncio API
      2. 3.1.2. async 函数示例
    2. 3.2. Async IO的规则
  4. 4. Async IO 设计模式
    1. 4.1. Chaining Coroutines
  5. 5. References
© 2020 Yukirito | 34k | 31 分钟