Tornado入门(三)【协程】
协程
在Tornado中,协程是推荐使用的异步方式。协程使用yield
关键字暂停或者恢复执行,而不是回调链的方式。
协程跟异步代码一样简单,但是没有使用线程的损耗,通过减少上下文切换的次数,可以让并发更为简单。
示例:
from tornado import gen@gen.coroutine
def fetch_coroutine(url):
http_client = AsyncHTTPClient()
response = yield http_client.fetch(url)
# In Python versions prior to 3.3, returning a value from
# a generator is not allowed and you must use
#raise gen.Return(response.body)
# instead.
return response.body
async和await
Python3.5中引入了关键字
async
和await
,使用这些关键字的函数也称之为本地协程。从Tornado4.3开始,我们可以使用它们来替换基于yield的协程。只需要使用async def foo()
替换函数定义中的@gen.coroutine
修饰器,使用await
替换函数中的yield
即可。在后面的文档中,我们将继续使用yield
风格,以便兼容老的Python版本。但是如果使用新版Python的话,还是推荐使用async
和await
,因为它们运行速度更快。async def fetch_coroutine(url):
http_client = AsyncHTTPClient()
response = await http_client.fetch(url)
return response.body
await
的功能没有yield
那么多,例如,在基于yield
的协程中,你可以yield
一组Future
组成的列表,但是在本地协程中,你必须将列表包裹在tornado.gen.multi
中。为了方便,Tornado提供了函数tornado.gen.convert_yielded
将任意的yield
对象转换成适用于await
的对象。async def f():
executor = concurrent.futures.ThreadPoolExecutor()
await tornado.gen.convert_yielded(executor.submit(g))
本地协程不依赖于任何框架,并不是所有协程都是互相兼容的。当第一个协程被调用的时候,它会选择一个协程执行器,这个执行器接下来会被所有通过
await
调用的协程所共享。Tornado的协程执行器被设计为多功能的,它可以接收任意框架提供的awaitable
对象。其他框架的协程执行器则受到这种限制,例如asyncio
的协程执行器。由于这个原因,当需要同时使用多个框架的时候,推荐使用Tornado的协程执行器。如果需要调用一个已经被asyncio
执行器调用的协程,可以使用tornado.platform.asyncio.to_asnycio_future
适配器。工作原理
当函数中包含
yield
关键字时,称该函数为生成器。所有的生成器都是异步的,当调用的时候,返回的是一个生成器对象而不是计算结果。修饰器@gen.coroutine
通过yield
表达式与生成器通信,调用协程之后,返回一个Future
对象。下面是协程修饰器的简化版实现:
# Simplified inner loop of tornado.gen.Runner
def run(self):
# send(x) makes the current yield return x.
# It returns when the next yield is reached
future = self.gen.send(self.next)
def callback(f):
self.next = f.result()
self.run()
future.add_done_callback(callback)
修饰器从生成器中接收一个
Future
对象,等待Future
执行完,然后解包Future
对象,将结果发送给生成器,作为yield
的结果。大部分代码都不会直接接触到Future
,除非将异步函数返回的Future
传递给yield
表达式。调用协程
协程抛出异常的方式与普通的不一样:所有的异常都会困在
Future
中,直到它被yield
。这也就意味着所有的协程都必须被合理的调用,否则部分错误可能没有被发现。@gen.coroutine
def divide(x, y):
return x / ydef bad_call():
# This should raise a ZeroDivisionError, but it won't because
# the coroutine is called incorrectly.
divide(1, 0)
不管什么情况下,所有调用协程的函数本身也必须是协程,并且在调用中使用
yield
关键字。当重载父类的方法时,要注意查看是否允许使用协程。@gen.coroutine
def good_call():
# yield will unwrap the Future returned by divide() and raise
# the exception.
yield divide(1, 0)
有时我们可能只想触发一个事件,而不等待结果返回,这种情况下,可以使用
IOLoop.spawn_callback
函数,这个函数会使用IOLoop
来处理调用函数,如果调用失败,则记录一条堆栈信息。# The IOLoop will catch the exception and print a stack trace in
# the logs. Note that this doesn't look like a normal call, since
# we pass the function object to be called by the IOLoop.
IOLoop.current().spawn_callback(divide, 1, 0)
当使用
@gen.coroutine
时,推荐使用IOLoop.spawn_callback
;如果是使用async def
则必须使用IOLoop.spawn_callback
,否则协程执行器不会运行。最后,在程序级别,如果
IOLoop
没有运行,则需要先启动IOLoop
,然后运行协程,最后使用IOLoop.run_sync
来停止IOLoop
。# run_sync() doesn't take arguments, so we must wrap the
# call in a lambda.
IOLoop.current().run_sync(lambda: divide(1, 0))
协程模式
【Tornado入门(三)【协程】】与回调函数交互
为了与使用回调的异步函数交互,需要将回调包裹在
Task
对象中,它会返回一个Future
对象。@gen.coroutine
def call_task():
# Note that there are no parens on some_function.
# This will be translated by Task into
#some_function(other_args, callback=callback)
yield gen.Task(some_function, other_args)
调用阻塞函数
调用阻塞函数最简单的方式就是通过使用
ThreadPoolExecutor
,它返回一个匹配协程的Future
对象。thread_pool = ThreadPoolExecutor(4)@gen.coroutine
def call_blocking():
yield thread_pool.submit(blocking_func, args)
并行
协程修饰器可以识别元素内容为
Future
的列表和字典,并等待所有的Future
执行完。@gen.coroutine
def parallel_fetch(url1, url2):
resp1, resp2 = yield [http_client.fetch(url1),
http_client.fetch(url2)]@gen.coroutine
def parallel_fetch_many(urls):
responses = yield [http_client.fetch(url) for url in urls]
# responses is a list of HTTPResponses in the same order@gen.coroutine
def parallel_fetch_dict(urls):
responses = yield {url: http_client.fetch(url)
for url in urls}
# responses is a dict {url: HTTPResponse}
交错执行
有时候,可能需要先保存一个
yield
对象,而不是立即返回:@gen.coroutine
def get(self):
fetch_future = self.fetch_next_chunk()
while True:
chunk = yield fetch_future
if chunk is None: break
self.write(chunk)
fetch_future = self.fetch_next_chunk()
yield self.flush()
上面的模式只适用于
@gen.coroutine
,如果fetch_next_chunk()
使用async def
。则需要通过以下方式调用:fetch_future = tornado.gen.convert_yielded(self.fetch_next_chunk())
。循环
在协程中实现循环略微诡异,因为在捕获循环中的
yield
结果根本做不到,所以需要将循环条件与获取结果分开来,例如这个来自Motor的例子。import motor
db = motor.MotorClient().test@gen.coroutine
def loop_example(collection):
cursor = db.collection.find()
while (yield cursor.fetch_next):
doc = cursor.next_object()
后台运行
协程中通常很少使用周期调度,不过协程可以通过
while True:
循环和tornado.gen.sleep
来实现。@gen.coroutine
def minute_loop():
while True:
yield do_something()
yield gen.sleep(60)# Coroutines that loop forever are generally started with
# spawn_callback().
IOLoop.current().spawn_callback(minute_loop)
上面的例子中,每个循环实际是每隔
60+N
秒执行一次的,N
是do_something()
的执行时间,为了实现精确的每60秒执行一次,可以使用前面介绍的交错模式:@gen.coroutine
def minute_loop2():
while True:
nxt = gen.sleep(60)# Start the clock.
yield do_something()# Run while the clock is ticking.
yield nxt# Wait for the timer to run out.
推荐阅读
- 三生记
- 阿潘三言两语之20180531二比一
- 陪读记(三)
- Day8|Day8 | 遇见Candy(三)
- 快速入门python看过的一些资料
- python|python 爬虫第三天 小项目加扩展无id 重复cssname获取数据与selenium操作
- 【计算讲谈社】第六讲|三星堆奇幻之旅(只有云计算才能带来的体验)
- 区块链合约安全系列(三)(如何认识及预防公链合约中的自毁攻击)
- 未来简史
- 吾日三省|吾日三省 1110 认知能力