我们今天继续深入学习asyncio。
asyncio模块包含多种同步机制,每个原语的解释可以看 线程篇 ,这些原语的用法上和线程/进程有一些区别。
并发的去爬取显然可以让爬虫工作显得更有效率,但是我们应该把抓取做的无害,这样既可以保证我们不容易发现,也不会对被爬的网站造成一些额外的压力。
在这里吐槽下,豆瓣现在几乎成了爬虫练手专用网站,我个人也不知道为啥?欢迎留言告诉我。难道是豆瓣一直秉承尊重用户的原则不轻易对用户才去封禁策略,造成大家觉得豆瓣最适合入门么?BTW,我每天在后台都能看到几十万次无效的抓取,也就是抓取程序写的有问题,但还在不停地请求着...
好吧回到正题,比如我现在要抓取http://httpbin.org/get?a=X这样的页面,X为1-10000的数字,一次性的产生1w次请求显然很快就会被封掉。那么我们可以用Semaphore控制同时的并发量(例子中为了演示,X为0-11):
import aiohttp import asyncio NUMBERS = range(12) URL = 'http://httpbin.org/get?a={}' sema = asyncio.Semaphore(3) async def fetch_async(a): async with aiohttp.request('GET', URL.format(a)) as r: data = await r.json() return data['args']['a'] async def print_result(a): with (await sema): r = await fetch_async(a) print('fetch({}) = {}'.format(a, r)) loop = asyncio.get_event_loop() f = asyncio.wait([print_result(num) for num in NUMBERS]) loop.run_until_complete(f)
在运行的时候可以感受到并发受到了信号量的限制,基本保持在同时处理三个请求的标准。
看下面的例子:
❯ cat lock.py import asyncio import functools def unlock(lock): print('callback releasing lock') lock.release() async def test(locker, lock): print('{} waiting for the lock'.format(locker)) with await lock: print('{} acquired lock'.format(locker)) print('{} released lock'.format(locker)) async def main(loop): lock = asyncio.Lock() await lock.acquire() loop.call_later(0.1, functools.partial(unlock, lock)) await asyncio.wait([test('l1', lock), test('l2', lock)]) loop = asyncio.get_event_loop() loop.run_until_complete(main(loop)) loop.close()
这个例子中我们首先使用acquire加锁,通过call_later方法添加一个0.1秒后释放锁的函数。看一下调用:
❯ python3 lock.py l1 waiting for the lock l2 waiting for the lock callback releasing lock l1 acquired lock l1 released lock l2 acquired lock l2 released lock
我们根据线程篇Condition的例子,改成一下:
import asyncio import functools async def consumer(cond, name, second): await asyncio.sleep(second) with await cond: await cond.wait() print('{}: Resource is available to consumer'.format(name)) async def producer(cond): await asyncio.sleep(2) for n in range(1, 3): with await cond: print('notifying consumer {}'.format(n)) cond.notify(n=n) await asyncio.sleep(0.1) async def producer2(cond): await asyncio.sleep(2) with await cond: print('Making resource available') cond.notify_all() async def main(loop): condition = asyncio.Condition() task = loop.create_task(producer(condition)) consumers = [consumer(condition, name, index) for index, name in enumerate(('c1', 'c2'))] await asyncio.wait(consumers) task.cancel() task = loop.create_task(producer2(condition)) consumers = [consumer(condition, name, index) for index, name in enumerate(('c1', 'c2'))] await asyncio.wait(consumers) task.cancel() loop = asyncio.get_event_loop() loop.run_until_complete(main(loop)) loop.close()
这次演示了2种通知的方式:
使用notify方法挨个通知单个消费者
使用notify_all方法一次性的通知全部消费者
由于producer和producer2是异步的函数,所以不能使用之前call_later方法,需要用create_task把它创建成一个任务(Task)。但是最后记得要把任务取消掉。
执行以下看看效果:
❯ python3 condition.py notifying consumer 1 c1: Resource is available to consumer notifying consumer 2 c2: Resource is available to consumer Making resource available c1: Resource is available to consumer c2: Resource is available to consumer
模仿锁的例子实现:
import asyncio import functools def set_event(event): print('setting event in callback') event.set() async def test(name, event): print('{} waiting for event'.format(name)) await event.wait() print('{} triggered'.format(name)) async def main(loop): event = asyncio.Event() print('event start state: {}'.format(event.is_set())) loop.call_later( 0.1, functools.partial(set_event, event) ) await asyncio.wait([test('e1', event), test('e2', event)]) print('event end state: {}'.format(event.is_set())) loop = asyncio.get_event_loop() loop.run_until_complete(main(loop)) loop.close()
看起来也确实和锁的意思很像,不同的是,事件被触发时,2个消费者不用获取锁就要尽快的执行下去了。
在asyncio官网上已经举例了2个很好的 队列例子 了,这文就不重复了。asyncio同样支持LifoQueue和PriorityQueue,我们体验下aiohttp+优先级队列的用法吧:
import asyncio import random import aiohttp NUMBERS = random.sample(range(100), 7) URL = 'http://httpbin.org/get?a={}' sema = asyncio.Semaphore(3) async def fetch_async(a): async with aiohttp.request('GET', URL.format(a)) as r: data = await r.json() return data['args']['a'] async def collect_result(a): with (await sema): return await fetch_async(a) async def produce(queue): for num in NUMBERS: print('producing {}'.format(num)) item = (num, num) await queue.put(item) async def consume(queue): while 1: item = await queue.get() num = item[0] rs = await collect_result(num) print('consuming {}...'.format(rs)) queue.task_done() async def run(): queue = asyncio.PriorityQueue() consumer = asyncio.ensure_future(consume(queue)) await produce(queue) await queue.join() consumer.cancel() loop = asyncio.get_event_loop() loop.run_until_complete(run()) loop.close()
看到使用了新的ensure_future方法,其实它和之前说的create_task意思差不多,都是为了把一个异步的函数变成一个协程的Task。它们的区别是:
create_task是AbstractEventLoop的抽象方法,不同的loop可以实现不同的创建Task方法,这里用的是BaseEventLoop的实现。
ensure_future是asyncio封装好的创建Task的函数,它还支持一些参数,甚至指定loop。一般应该使用它,除非用到后面提到的uvloop这个第三方库。
这个例子中,首先我们从0-99中随机取出7个数字,放入优先级队列,看看消费者是不是按照从小到大的顺序执行的呢?
❯ python3 prioqueue.py producing 6 producing 4 producing 22 producing 48 producing 9 producing 90 producing 40 consuming 4... consuming 6... consuming 9... consuming 22... consuming 40... consuming 48... consuming 90...
确实是这样的。
说到这里,看看Task是什么?
Task类用来管理协同程序运行的状态。根据源码,我保留核心,实现一个简单的Task类帮助大家理解:
import asyncio class Task(asyncio.futures.Future): def __init__(self, gen, *,loop): super().__init__(loop=loop) self._gen = gen self._loop.call_soon(self._step) def _step(self, val=None, exc=None): try: if exc: f = self._gen.throw(exc) else: f = self._gen.send(val) except StopIteration as e: self.set_result(e.value) except Exception as e: self.set_exception(e) else: f.add_done_callback( self._wakeup) def _wakeup(self, fut): try: res = fut.result() except Exception as e: self._step(None, e) else: self._step(res, None)
如果_step方法没有让协程执行完成,就会添加回调,_wakeup又会继续执行_step... 直到协程程序完成,并set_result。
写个使用它的例子:
async def foo(): await asyncio.sleep(2) print('Hello Foo') async def bar(): await asyncio.sleep(1) print('Hello Bar') loop = asyncio.get_event_loop() tasks = [Task(foo(), loop=loop), loop.create_task(bar())] loop.run_until_complete( asyncio.wait(tasks)) loop.close()
第一个任务是用我们自己的Task创建的,第二个是用BaseEventLoop自带的create_task。
运行一下:
❯ python3 task.py Hello Bar Hello Foo
自定义的Task类和asyncio自带的是可以好好协作的。
asyncio根据你的操作系统信息会帮你选择默认的事件循环类,在*nix下使用的类继承于BaseEventLoop,在上面已经提到了。和Task一样,我们剥离出一份最核心的实现:
import asyncio from collections import deque def done_callback(fut): fut._loop.stop() class Loop: def __init__(self): self._ready = deque() self._stopping = False def create_task(self, coro): Task = asyncio.tasks.Task task = Task(coro, loop=self) return task def run_until_complete(self, fut): tasks = asyncio.tasks # 获取任务 fut = tasks.ensure_future( fut, loop=self) # 增加任务到self._ready fut.add_done_callback(done_callback) # 跑全部任务 self.run_forever() # 从self._ready中移除 fut.remove_done_callback(done_callback) def run_forever(self): try: while 1: self._run_once() if self._stopping: break finally: self._stopping = False def call_soon(self, cb, *args): self._ready.append((cb, args)) def _run_once(self): ntodo = len(self._ready) for i in range(ntodo): t, a = self._ready.popleft() t(*a) def stop(self): self._stopping = True def close(self): self._ready.clear() def call_exception_handler(self, c): pass def get_debug(self): return False
其中call_exception_handler和get_debug是必须存在的。
写个例子用一下:
async def foo(): print('Hello Foo') async def bar(): print('Hello Bar') loop = Loop() tasks = [loop.create_task(foo()), loop.create_task(bar())] loop.run_until_complete( asyncio.wait(tasks)) loop.close()
执行:
❯ python3 loop.py Hello Foo Hello Bar
也可以和asyncio.wait正常协作了。
PS:本文全部代码可以在 微信公众号文章代码库项目 ( https://github.com/dongweiming/mp/tree/master/2016-12-2 0 ) 中找到。
广告时间:欢迎参与我的首次知乎Live:「 Python 工程师的入门和进阶」 https://www.zhihu.com/lives/789840559912009728, 可以通过下面的二维码直接进入: