由于asyncio有非常多的内容,且对Python工程师非常重要,我将分为三篇文章来介绍它。本篇还不是 关于使用asyncio进行网络编程 的文章,而是继续并发主题,看看使用asyncio怎么实现高效的并发程序。
在Python 2的时代,高性能的网络编程主要是使用Twisted、Tornado和Gevent这三个库,但是它们的异步代码相互之间既不兼容也不能移植。如上一节说的,Gvanrossum希望在Python 3 实现一个原生的基于生成器的协程库,其中直接内置了对异步IO的支持,这就是asyncio,它在Python 3.4被引入到标准库。
Python 3.5添加了async和await这两个关键字,分别用来替换 asyncio.coroutine
和 yield from
。自此,协程成为新的语法,而不再是一种生成器类型了。事件循环与协程的引入,可以极大提高高负载下程序的I/O性能。除此之外还增加了 async with
(异步上下文管理)、 asyncfor
(异步迭代器)语法。特别说的是,在新发布的Python 3.6里面终于可以用 异步生成器 了!
顺便说一下Twisted。虽然在之前的公司Twisted使用的还挺广泛,它的Reactor、Factory、Deferred、Protocol等编程的思想很有启发性,在当时已经非常先进了,而asyncio也借鉴了一部分。但是它太重、大量的回调(Javascript工程师很容易接受,比如Deferred,小明我不喜欢)、没有及时更新的中文相关的技术文档和书籍所以学习曲线较高、没有更多的公司出来分享对应的实践,再加上协程的冲击,最近1-2年已经很少看到它的身影,不建议新人再去学习它了。
首先需要明确一点,asyncio使用单线程、单个进程的方式切换(通常程序等待读或写数据时就是切换上下文的时机),那这样效率高嘛?
实践是检验真理的唯一标准。我们用之前介绍的concurrent.futures和asyncio分别试验下。当然下面例子的结果仅供参考,因为无法保证被请求的网站的服务水平,这会造成对结果或多或少有影响,可以多跑几次综合的来看。
import time import requests from concurrent.futures import ThreadPoolExecutor NUMBERS = range(12) URL = 'http://httpbin.org/get?a={}' def fetch(a): r = requests.get(URL.format(a)) return r.json()['args']['a'] start = time.time() with ThreadPoolExecutor(max_workers=3) as executor: for num, result in zip(NUMBERS, executor.map(fetch, NUMBERS)): print('fetch({}) = {}'.format(num, result)) print('Use requests+ThreadPoolExecutor cost: {}'.format(time.time() - start))
非常正统的方式,运行的效果如下:
❯ python3 scraper_thread.py fetch(0) = 0 fetch(1) = 1 fetch(2) = 2 fetch(3) = 3 fetch(4) = 4 fetch(5) = 5 fetch(6) = 6 fetch(7) = 7 fetch(8) = 8 fetch(9) = 9 fetch(10) = 10 fetch(11) = 11 Use requests+ThreadPoolExecutor cost: 6.493273019790649
注意,和ThreadPoolExecutor有关的实现都在scraper_thread.py一个文件中,让网络状态的影响更小,运行的效果也是在一次运行之后截取的。
现在我们加入asyncio再试试:
import asyncio async def run_scraper_tasks(executor): loop = asyncio.get_event_loop() blocking_tasks = [] for num in NUMBERS: task = loop.run_in_executor(executor, fetch, num) task.__num = num blocking_tasks.append(task) completed, pending = await asyncio.wait(blocking_tasks) results = {t.__num: t.result() for t in completed} for num, result in sorted(results.items(), key=lambda x: x[0]): print('fetch({}) = {}'.format(num, result)) start = time.time() executor = ThreadPoolExecutor(3) event_loop = asyncio.get_event_loop() event_loop.run_until_complete( run_scraper_tasks(executor) ) print('Use asyncio+requests+ThreadPoolExecutor cost: {}'.format(time.time() - start))
如果之前没有使用过asyncio的同学可能看完就一脸:flushed:了。我们之前编写多进程和多线程代码的时候,会感觉和我们的线性思维方法是一致的,所以写起来很舒服,理解和维护也相对容易。大家要做好一些心理准备,Python核心开发们在努力让Python开发者用同步编程的方式去写异步代码, 但是还是需要调整一下心态,做好迎接新的写法的准备。我来分析下这个例子:
当我们给一个函数添加了async关键字,就会把它变成一个异步函数。
每个线程有一个事件循环,主线程调用asyncio.get_event_loop时会创建事件循环,你需要把异步的任务丢给这个循环的run_until_complete方法,事件循环会 安排协同程序的执行 。和方法名字一样,异步的任务完成方法才会就执行完成了。
为了在asyncio中使用concurrent.futures的执行器,我这用到了run_in_executor,它可以接收要 同步 执行的任务。
给task设置__num属性,是因为后面的completed中的Future对象只包含结果,但是我们并不知道num是什么,所以hack了下,之后的例子中会有其他的方案,本文是给大家提供各种解题的思路,在合适的场景还是有用处的。
await asyncio.wait(blocking_tasks)
就是协同的执行那些同步的任务,直到完成。
最后根据__num找到和执行结果的对应关系,排序然后打印结果。
有一点得强调: async/await是Python提供的异步编程API,而asyncio只是一个利用 async/await API进行异步编程的框架
运行一下看看性能有没有提升:
fetch(0) = 0 fetch(1) = 1 fetch(2) = 2 fetch(3) = 3 fetch(4) = 4 fetch(5) = 5 fetch(6) = 6 fetch(7) = 7 fetch(8) = 8 fetch(9) = 9 fetch(10) = 10 fetch(11) = 11 Use asyncio+requests+ThreadPoolExecutor cost: 6.142597913742065
多跑几次可以发现和requests+ThreadPoolExecutor相比没有什么优势,就像是封装了一层,是有损耗的。
讲到这里,我们想想为什么asyncio的强大优势没有显示出来?
现存的一些库其实并不能原生的支持asyncio(因为会发生阻塞或者功能不可用),比如requests,如果要写爬虫,配合asyncio的应该用aiohttp,其他的如数据库驱动等各种Python对应的库也都得使用对应的aioXXX版本了。
我们看一下使用aiohttp会发生什么。第一步就是把fetch函数改成异步的:
import aiohttp async def fetch_async(a): async with aiohttp.request('GET', URL.format(a)) as r: data = await r.json() return data['args']['a']
其实看起来和requests的接口差不多,只是你要熟悉这种编程模式就好了。
start = time.time() event_loop = asyncio.get_event_loop() tasks = [fetch_async(num) for num in NUMBERS] results = event_loop.run_until_complete(asyncio.gather(*tasks)) for num, result in zip(NUMBERS, results): print('fetch({}) = {}'.format(num, result))
代码比上个例子简单不少,这里需要注意,asyncio.gather可以按顺序搜集异步任务执行的结果,我们就不需要用到之前提过的__num(而且也hack不了,因为fetch_async是一个生成器,不能那样添加属性)
希望能进行协程切换的地方,就需要使用await关键字。如上的例子中r.json方法会等待I/O(也就是正在做一个网络请求),这种就可以切换去做其他的时候,之后再切换回来。
运行一下:
fetch(0) = 0 fetch(1) = 1 fetch(2) = 2 fetch(3) = 3 fetch(4) = 4 fetch(5) = 5 fetch(6) = 6 fetch(7) = 7 fetch(8) = 8 fetch(9) = 9 fetch(10) = 10 fetch(11) = 11 Use asyncio+aiohttp cost: 1.8903069496154785
有木有亮瞎眼,3倍的提升!!!
接着我们再加回ThreadPoolExecutor。之前说asyncio是单线程单进程的,那么我多线程同时运行,会不会翻倍 ╰( °▽° )╯
作为工程师,有想法就实践来验证下。写代码之前我们回忆一下,XXExecutor其实就是封装了队列,但是由于run_in_executor并不能传入异步的函数,我们不能按照例子2来用。独立使用队列其实效果应该和ThreadPoolExecutor差不多,那我们可不可以把任务平均切分一下,尽量让每个线程拿到的任务差不多。这就是我选择 NUMBERS = range(12)
的原因:可以均分。
async def fetch_async(a): async with aiohttp.request('GET', URL.format(a)) as r: data = await r.json() return a, data['args']['a'] def sub_loop(numbers): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) tasks = [fetch_async(num) for num in numbers] results = loop.run_until_complete(asyncio.gather(*tasks)) for num, result in results: print('fetch({}) = {}'.format(num, result)) async def run(executor, numbers): await asyncio.get_event_loop().run_in_executor(executor, sub_loop, numbers) def chunks(l, size): n = math.ceil(len(l) / size) for i in range(0, len(l), n): yield l[i:i + n] event_loop = asyncio.get_event_loop() tasks = [run(executor, chunked) for chunked in chunks(NUMBERS, 3)] results = event_loop.run_until_complete(asyncio.gather(*tasks)) print('Use asyncio+aiohttp+ThreadPoolExecutor cost: {}'.format(time.time() - start))
我在解释下这个例子中的几点:
现在任务被拆分,不能用 zip(NUMBERS, results)
的方式拿到正确的num和结果的对应关系了,也由于由于不能给fetch_async加一个__num的属性随意直接改了任务的返回值,把num也返回了
chunks是一个给任务分组的函数,分三份是因为ThreadPoolExecutor用了三个线程。
非主线程不能使用主线程的事件循环对象,所以在sub_loop中我对重新设置了新的对象。
见证奇迹的时刻到了:
fetch(8) = 8 fetch(9) = 9 fetch(10) = 10 fetch(11) = 11 fetch(0) = 0 fetch(1) = 1 fetch(2) = 2 fetch(3) = 3 fetch(4) = 4 fetch(5) = 5 fetch(6) = 6 fetch(7) = 7 Use asyncio+aiohttp+ThreadPoolExecutor cost: 2.66983699798584
╮(╯_╰)╭ 忧伤,就算最后没有对结果排序,依然慢了一些。还是asyncio+aiohttp最好了。
为了验证多进程模式下的上述实验的效果,我找了一台服务器,把ThreadPoolExecutor都替换成ProcessPoolExecutor。我就直接贴答案了:
> python3 scraper_process.py ... Use requests+ProcessPoolExecutor cost: 2.2943034172058105 ... Use asyncio+requests+ThreadPoolExecutor cost: 2.609675407409668 ... Use asyncio+aiohttp cost: 0.6706254482269287 ... Use asyncio+aiohttp+ThreadPoolExecutor cost: 1.690920352935791
结论就是 在Python 3,请直接原生的使用asyncio 吧。
首先我们先补充点基础知识。先说「10K问题怎么解决」
在Nginx没有流行起来的时候,常被提到一个词 10K(并发1W)。在互联网的早期,网速很慢、用户群很小需求也只是简单的页面浏览,所以最初的服务器设计者们使用基于进程/线程模型,也就是一个TCP连接就是分配一个进程(线程)。谁都没有想到现在Web 2.0时候用户群里和复杂的页面交互问题,而现在即时通信和实在实时互动已经很普遍了。那么你设想如果每一个用户都和服务器保持一个(甚至多个)TCP连接才能进行实时的数据交互,别说BAT这种量级的网站,就是豆瓣这种比较小的网站,同时的并发连接也要过亿了。进程是操作系统最昂贵的资源,一台机器无法创建很多进程。如果要创建10K个进程,那么操作系统是无法承受的。就算我们不讨论随着服务器规模大幅上升带来复杂度几何级数上升的问题,采用分布式系统,只是维持1亿用户在线需要10万台服务器,成本巨大,也只有FLAG、BAT这样公司才有财力购买如此多的服务器。
为了解决这一问题,出现了「用同一进程/线程来同时处理若干连接」的思路,也就是I/O多路复用:
select。每个连接对应一个描述符(socket),循环处理各个连接,先查下它的状态,ready了就进行处理,不ready就不进行处理。但是缺点很多:
单个进程能够监视的文件描述符的数量存在最大限制
对socket进行扫描时是线性扫描,即采用轮询的方法,效率较低。
需要维护一个用来存放大量的数据结构,这样会使得用户空间和内核空间在传递该结构时复制开销大
poll。本质上和select没有区别,但是由于它是基于链表来存储的,没有最大连接数的限制。缺点是:
大量的的数组被整体复制于用户态和内核地址空间之间,而不管这样的复制是不是有意义。
poll的特点是「水平触发(只要有数据可以读,不管怎样都会通知)」,如果报告后没有被处理,那么下次poll时会再次报告它。
epoll。它使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。epoll支持水平触发和边缘触发,最大的特点在于「边缘触发」,它只告诉进程哪些刚刚变为就绪态,并且只会通知一次。使用epoll的优点很多:
没有最大并发连接的限制,能打开的fd的上限远大于1024(1G的内存上能监听约10万个端口)
效率提升,不是轮询的方式,不会随着fd数目的增加效率下降
内存拷贝,利用mmap()文件映射内存加速与内核空间的消息传递;即epoll使用mmap减少复制开销
因为Linux是互联网企业中使用率最高的操作系统,epoll就成为C10K killer、高并发、高性能、异步非阻塞这些技术的代名词了。FreeBSD推出了kqueue,Linux推出了epoll,Windows推出了IOCP,Solaris推出了/dev/poll。这些操作系统提供的功能就是为了解决C10K问题。epoll技术的编程模型就是异步非阻塞回调,也可以叫做Reactor、事件驱动、事件轮循(EventLoop)、libevent、Tornado、Node.js这些就是epoll时代的产物。
看了上面一段话,是不是感觉对着一坨概念理解的更清晰了呢?Python 3.4中还新增了一个与asyncio配套的新模块:selectors. 这个模块将select、epoll、kqueue等等系统级异步IO接口抽象成Selector类型,规定了统一的对外接口,于是程序只管使用selector的接口就行了。一般使用 selectors.DefaultSelector
就好了,它是这个模块根据你的系统自动帮你选择的最合适的Selector。
就这样小公司也可以玩高并发了。但是时代在发展,现在大家讨论的都是10M、100M这种挑战,而写过Node.js都知道异步嵌套回调非常难写,同样的问题也存在于Twisted:对代码的理解和调试都变得困难,维护性很低。上述的技术已经无能为力了。从前面的演化过程中,我们可以看到,根本的思路是要 高效的去阻塞,让CPU可以干核心的任务。所以,千万级并发实现的秘密:内核不是解决方案,而是问题所在!
这意味着:
不要让内核执行所有繁重的任务。将数据包处理,内存管理,处理器调度等任务从内核转移到应用程序高效地完成。让Linux只处理控制层,数据层完全交给应用程序来处理。
当连接很多时,首先需要大量的进程/线程来做事。同时系统中的应用进程/线程们可能大量的都处于 ready 状态,需要系统去不断的进行快速切换,而我们知道系统上下文的切换是有代价的。虽然现在Linux系统的调度算法已经设计的很高效了,但对于10M这样大规模的场景仍然力有不足。
所以我们面临的瓶颈有两个:
进程/线程作为处理单元还是太厚重
系统调度的代价太高
很自然地,我们会想到,如果有一种更轻量级的进程/线程作为处理单元,而且它们的调度可以做到很快(最好不需要锁),那就完美了。这个时候「协程」出现了,下一小节我们继续了解它。
它们在实现上都是试图用一组少量的线程来实现多个任务,一旦某个任务阻塞,则可能用同一线程继续运行其他任务,避免大量上下文的切换。每个协程所独占的系统资源往往只有栈部分。而且,各个协程之间的切换,往往是用户通过代码来显式指定的(跟各种 callback 类似),不需要内核参与,可以很方便的实现异步。
协程本质上也是异步非阻塞技术,它是将事件回调进行了包装,让程序员看不到里面的事件循环。程序员就像写阻塞代码一样简单。比如调用 client->recv() 等待接收数据时,就像阻塞代码一样写。实际上是底层库在执行recv时悄悄保存了一个状态,比如代码行数,局部变量的值。然后就跳回到EventLoop中了。什么时候真的数据到来时,它再把刚才保存的代码行数,局部变量值取出来,又开始继续执行。
简单的说,进程/线程是操作系统充当了EventLoop调度,而协程是自己用epoll进行调度。
协程是异步非阻塞的另外一种展现形式。Golang,Erlang,Lua协程都是这个模型。那什么是异步和非阻塞呢?
在网站可以找到很多对I/O模型进行对比和解释的文章,推荐阅读知友 严肃 对它们的理解](https://www.zhihu.com/question/19732473/answer/20851256)(已获得授权):
1.同步与异步 同步和异步关注的是消息通信机制 (synchronous communication/ asynchronous communication) 所谓同步,就是在发出一个 调用 时,在没有得到结果之前,该 调用 就不返回。但是一旦调用返回,就得到返回值了。 换句话说,就是由 调用者 主动等待这个 调用 的结果。
而异步则是相反, 调用 在发出之后,这个调用就直接返回了,所以没有返回结果。换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果。而是在 调用 发出后,* 被调用者*通过状态、通知来通知调用者,或通过回调函数处理这个调用。
典型的异步编程模型比如Node.js,举个通俗的例子: 你打电话问书店老板有没有《分布式系统》这本书,如果是同步通信机制,书店老板会说,你稍等,”我查一下",然后开始查啊查,等查好了(可能是5秒,也可能是一天)告诉你结果(返回结果)。 而异步通信机制,书店老板直接告诉你我查一下啊,查好了打电话给你,然后直接挂电话了(不返回结果)。然后查好了,他会主动打电话给你。在这里老板通过“回电”这种方式来回调。
阻塞与非阻塞 阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态.
阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。 非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。
还是上面的例子, 你打电话问书店老板有没有《分布式系统》这本书,你如果是阻塞式调用,你会一直把自己“挂起”,直到得到这本书有没有的结果,如果是非阻塞式调用,你不管老板有没有告诉你,你自己先一边去玩了, 当然你也要偶尔过几分钟check一下老板有没有返回结果。 在这里阻塞与非阻塞与是否同步异步无关。跟老板通过什么方式回答你结果无关。
事件循环是一种处理多并发量的有效方式,在 维基百科 中它被描述为「一种等待程序分配事件或消息的编程架构」,,我们可以定义事件循环来简化使用轮询方法来监控事件。它的意义最通俗的说法就是「当A发生时,执行B」。事件循环利用poller对象,使得程序员不用控制任务的添加、删除和事件的控制。事件循环使用回调方法来知道事件的发生。例如,有一个资源描述符A,当一个写事件在A中发生就会调用一个回调函数。如下应用都实现了事件循环:
Tornado web server
Twisted
Gevent
当然也包含asyncio,他是asyncio提供的「中央处理设备」,支持如下操作:
注册、执行和取消延迟调用(超时)
创建可用于多种类型的通信的服务端和客户端的Transports
启动进程以及相关的和外部通信程序的Transports
将耗时函数调用委托给一个线程池
单线程(进程)的架构也避免的多线程(进程)修改可变状态的锁的问题。
关于C10K、异步回调、协程、同步阻塞
聊聊C10K问题及解决方案
广告时间:欢迎参与我的首次知乎Live:「 Python 工程师的入门和进阶」 https://www.zhihu.com/lives/789840559912009728, 可以通过下面的二维码直接进入: