/ 2019-05-21
import datetime
import heapq # 堆模块
import types
import time
class Task:
def __init__(self, wait_until, coro):
self.coro = coro
self.waiting_until = wait_until
def __eq__(self, other):
return self.waiting_until == other.waiting_until
def __lt__(self, other):
return self.waiting_until < other.waiting_until
class SleepingLoop:
def__init__(self, *coros):
self._new = coros
self._waiting = []
def run_until_complete(self):
for coro in self._new:
wait_for = coro.send(None)
heapq.heappush(self._waiting, Task(wait_for, coro))
while self._waiting:
now = datetime.datetime.now()
task = heapq.heappop(self._waiting)
if now < task.waiting_until:
delta = task.waiting_until - now
time.sleep(delta.total_seconds())
now = datetime.datetime.now()
try:
print('*'*50)
wait_until = task.coro.send(now)
print('-'*50)
heapq.heappush(self._waiting, Task(wait_until, task.coro))
except StopIteration:
pass
def sleep(seconds):
now = datetime.datetime.now()
wait_until = now + datetime.timedelta(seconds=seconds)
print('before yield wait_until')
actual = yield wait_until # 返回一个datetime数据类型的时间
print('after yield wait_until')
return actual - now
def countdown(label, length, *, delay=0):
print(label, 'waiting', delay, 'seconds before starting countdown')
delta = yield from sleep(delay)
print(label, 'starting after waiting', delta)
while length:
print(label, 'T-minus', length)
waited = yield from sleep(1)
length -= 1
print(label, 'lift-off!')
def main():
loop = SleepingLoop(countdown('A', 5), countdown('B', 3, delay=2),
countdown('C', 4, delay=1))
start = datetime.datetime.now()
loop.run_until_complete()
print('Total elapsed time is', datetime.datetime.now() - start)
if __name__ == '__main__':
main()
使用 async function 可以定义一个 异步函数,在async关键字定义的函数中不能出现yield和yield from
# 例1
async def download(url): # 加入新的关键字 async ,可以将任何一个普通函数变成协程
return 'eva'
ret = download('http://www.baidu.com/')
print(ret) #
ret.send(None) # StopIteration: eva
# 例2
async def download(url):
return 'eva'
def run(coroutine):
try:
coroutine.send(None)
except StopIteration as e:
return e.value
coro = download('http://www.baidu.com/')
ret = run(coro)
print(ret)
async关键字不能和yield一起使用,引入coroutine装饰器来装饰downloader生成器。
await 操作符后面必须跟一个awaitable对象(通常用于等待一个会有io操作的任务), 它只能在异步函数 async function 内部使用。
# 例3
import types
@types.coroutine # 将一个生成器变成一个awaitable的对象
def downloader(url):
yield 'aaa'
async def download_url(url): # 协程
waitable = downloader(url)
print(waitable) # 生成器
html = await waitable
return html
coro = download_url('http://www.baidu.com')
print(coro) #
ret = coro.send(None)
print(ret)
asyncio是Python 3.4版本引入的标准库,直接内置了对异步IO的支持。
asyncio的编程模型就是一个消息循环。我们从asyncio模块中直接获取一个EventLoop的引用,然后把需要执行的协程扔到EventLoop中执行,就实现了异步IO。
coroutine+yield from
import asyncio
@asyncio.coroutine
def hello():
print("Hello world!")
# 异步调用asyncio.sleep(1):
r = yield from asyncio.sleep(1)
print("Hello again!")
# 获取EventLoop:
loop = asyncio.get_event_loop()
# 执行coroutine
loop.run_until_complete(hello())
loop.close()
async+await
import asyncio
async def hello():
print("Hello world!")
# 异步调用asyncio.sleep(1):
r = await asyncio.sleep(1)
print("Hello again!")
# 获取EventLoop:
loop = asyncio.get_event_loop()
# 执行coroutine
loop.run_until_complete(hello())
loop.close()
执行多个任务
import asyncio
async def hello():
print("Hello world!")
await asyncio.sleep(1)
print("Hello again!")
return 'done'
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([hello(),hello()]))
loop.close()
获取返回值
import asyncio
async def hello():
print("Hello world!")
await asyncio.sleep(1)
print("Hello again!")
return 'done'
loop = asyncio.get_event_loop()
task = loop.create_task(hello())
loop.run_until_complete(task)
ret = task.result()
print(ret)
执行多个任务获取返回值
import asyncio
async def hello(i):
print("Hello world!")
await asyncio.sleep(i)
print("Hello again!")
return 'done',i
loop = asyncio.get_event_loop()
task1 = loop.create_task(hello(2))
task2 = loop.create_task(hello(1))
task_l = [task1,task2]
tasks = asyncio.wait(task_l)
loop.run_until_complete(tasks)
for t in task_l:
print(t.result())
(0)