asyncio模块包含多种同步机制,每个原语的解释可以看 线程篇 ,这些原语的用法上和线程/进程有一些区别。
import aiohttp import asyncio NUMBERS = range(12) URL = 'http://httpbin.org/get?a={}' sema = asyncio.Semaphore(3) async def fetch_async(a): async with aiohttp.request('GET', URL.format(a)) as r: data = await r.json() return data['args']['a'] async def print_result(a): with (await sema): r = await fetch_async(a) print('fetch({}) = {}'.format(a, r)) loop = asyncio.get_event_loop() f = asyncio.wait([print_result(num) for num in NUMBERS]) loop.run_until_complete(f)
❯ cat lock.py import asyncio import functools def unlock(lock): print('callback releasing lock') lock.release() async def test(locker, lock): print('{} waiting for the lock'.format(locker)) with await lock: print('{} acquired lock'.format(locker)) print('{} released lock'.format(locker)) async def main(loop): lock = asyncio.Lock() await lock.acquire() loop.call_later(0.1, functools.partial(unlock, lock)) await asyncio.wait([test('l1', lock), test('l2', lock)]) loop = asyncio.get_event_loop() loop.run_until_complete(main(loop)) loop.close()
❯ python3 lock.py l1 waiting for the lock l2 waiting for the lock callback releasing lock l1 acquired lock l1 released lock l2 acquired lock l2 released lock
import asyncio import functools async def consumer(cond, name, second): await asyncio.sleep(second) with await cond: await cond.wait() print('{}: Resource is available to consumer'.format(name)) async def producer(cond): await asyncio.sleep(2) for n in range(1, 3): with await cond: print('notifying consumer {}'.format(n)) cond.notify(n=n) await asyncio.sleep(0.1) async def producer2(cond): await asyncio.sleep(2) with await cond: print('Making resource available') cond.notify_all() async def main(loop): condition = asyncio.Condition() task = loop.create_task(producer(condition)) consumers = [consumer(condition, name, index) for index, name in enumerate(('c1', 'c2'))] await asyncio.wait(consumers) task.cancel() task = loop.create_task(producer2(condition)) consumers = [consumer(condition, name, index) for index, name in enumerate(('c1', 'c2'))] await asyncio.wait(consumers) task.cancel() loop = asyncio.get_event_loop() loop.run_until_complete(main(loop)) loop.close()
❯ python3 condition.py notifying consumer 1 c1: Resource is available to consumer notifying consumer 2 c2: Resource is available to consumer Making resource available c1: Resource is available to consumer c2: Resource is available to consumer
import asyncio import functools def set_event(event): print('setting event in callback') event.set() async def test(name, event): print('{} waiting for event'.format(name)) await event.wait() print('{} triggered'.format(name)) async def main(loop): event = asyncio.Event() print('event start state: {}'.format(event.is_set())) loop.call_later( 0.1, functools.partial(set_event, event) ) await asyncio.wait([test('e1', event), test('e2', event)]) print('event end state: {}'.format(event.is_set())) loop = asyncio.get_event_loop() loop.run_until_complete(main(loop)) loop.close()
在asyncio官网上已经举例了2个很好的 队列例子 了,这文就不重复了。asyncio同样支持LifoQueue和PriorityQueue,我们体验下aiohttp+优先级队列的用法吧:
import asyncio import random import aiohttp NUMBERS = random.sample(range(100), 7) URL = 'http://httpbin.org/get?a={}' sema = asyncio.Semaphore(3) async def fetch_async(a): async with aiohttp.request('GET', URL.format(a)) as r: data = await r.json() return data['args']['a'] async def collect_result(a): with (await sema): return await fetch_async(a) async def produce(queue): for num in NUMBERS: print('producing {}'.format(num)) item = (num, num) await queue.put(item) async def consume(queue): while 1: item = await queue.get() num = item[0] rs = await collect_result(num) print('consuming {}...'.format(rs)) queue.task_done() async def run(): queue = asyncio.PriorityQueue() consumer = asyncio.ensure_future(consume(queue)) await produce(queue) await queue.join() consumer.cancel() loop = asyncio.get_event_loop() loop.run_until_complete(run()) loop.close()
❯ python3 prioqueue.py producing 6 producing 4 producing 22 producing 48 producing 9 producing 90 producing 40 consuming 4... consuming 6... consuming 9... consuming 22... consuming 40... consuming 48... consuming 90...
import asyncio class Task(asyncio.futures.Future): def __init__(self, gen, *,loop): super().__init__(loop=loop) self._gen = gen self._loop.call_soon(self._step) def _step(self, val=None, exc=None): try: if exc: f = self._gen.throw(exc) else: f = self._gen.send(val) except StopIteration as e: self.set_result(e.value) except Exception as e: self.set_exception(e) else: f.add_done_callback( self._wakeup) def _wakeup(self, fut): try: res = fut.result() except Exception as e: self._step(None, e) else: self._step(res, None)
如果_step方法没有让协程执行完成,就会添加回调,_wakeup又会继续执行_step... 直到协程程序完成,并set_result。
async def foo(): await asyncio.sleep(2) print('Hello Foo') async def bar(): await asyncio.sleep(1) print('Hello Bar') loop = asyncio.get_event_loop() tasks = [Task(foo(), loop=loop), loop.create_task(bar())] loop.run_until_complete( asyncio.wait(tasks)) loop.close()
❯ python3 task.py Hello Bar Hello Foo
import asyncio from collections import deque def done_callback(fut): fut._loop.stop() class Loop: def __init__(self): self._ready = deque() self._stopping = False def create_task(self, coro): Task = asyncio.tasks.Task task = Task(coro, loop=self) return task def run_until_complete(self, fut): tasks = asyncio.tasks # 获取任务 fut = tasks.ensure_future( fut, loop=self) # 增加任务到self._ready fut.add_done_callback(done_callback) # 跑全部任务 self.run_forever() # 从self._ready中移除 fut.remove_done_callback(done_callback) def run_forever(self): try: while 1: self._run_once() if self._stopping: break finally: self._stopping = False def call_soon(self, cb, *args): self._ready.append((cb, args)) def _run_once(self): ntodo = len(self._ready) for i in range(ntodo): t, a = self._ready.popleft() t(*a) def stop(self): self._stopping = True def close(self): self._ready.clear() def call_exception_handler(self, c): pass def get_debug(self): return False
async def foo(): print('Hello Foo') async def bar(): print('Hello Bar') loop = Loop() tasks = [loop.create_task(foo()), loop.create_task(bar())] loop.run_until_complete( asyncio.wait(tasks)) loop.close()
❯ python3 loop.py Hello Foo Hello Bar
PS:本文全部代码可以在 微信公众号文章代码库项目 ( https://github.com/dongweiming/mp/tree/master/2016-12-2 0 ) 中找到。
广告时间:欢迎参与我的首次知乎Live:「 Python 工程师的入门和进阶」 https://www.zhihu.com/lives/789840559912009728, 可以通过下面的二维码直接进入: