tornado 中的异步调用模式

fundamental concepts

同步

调用者等待调用结果的返回

异步

调用者不等待调用结果的返回,而是通过间隔轮询、通知等方式得知结果

阻塞

调用方式影响后续指令的执行

非阻塞

调用方式不影响后续指令的执行


一般来说,阻塞是绝对的,非阻塞则是相对的,因为任何指令或调用的执行都要占用 CPU 周期,或网络,或 IO。非阻塞只是说调用或者资源消耗不影响后续逻辑执行,他们经得起等待。非阻塞往往和异步一起出现。

tornado 是一个异步非阻塞 httpserver,同时也是一个 web framework。

Tornado is a Python web framework and asynchronous networking library, originally developed at FriendFeed. By using non-blocking network I/O, Tornado can scale to tens of thousands of open connections, making it ideal for long polling, WebSockets, and other applications that require a long-lived connection to each user。

在 tornado 中异步和非阻塞是通过 ioloop 和 Future 实现的,Future 是借助 python 的协程来实现非阻塞调用。tornado 提供了不同的异步调用形式来适配不同的调用场景,本文的目的就是为了说明不同形式是如何使用的以及它们的差别。

为了演示方便,这里使用了一个公共模块 util.py 来提供 ioloop 的启动和销毁,下文不做特殊说明皆是如此。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# util.py 
import tornado.ioloop

COUNTER = 0

def stop_loop(times):
global COUNTER
COUNTER += 1
if COUNTER == times:
tornado.ioloop.IOLoop.instance().stop()
print('====> ioloop end')


def start_loop():
print('====> ioloop start')
ioloop = tornado.ioloop.IOLoop.current()
ioloop.start()

最常用的调用方式

在 tornado 中最普遍的使用方式是把函数的调用结果封装成 Future,利用 ioloop 执行并异步获得结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# -*- coding:utf-8 -*-
from tornado.httpclient import AsyncHTTPClient
from tornado.concurrent import Future
from tornado import gen

from util import stop_loop, start_loop


# fetch 返回的是 future
def asyn_fetch_future(url):
http_client = AsyncHTTPClient()
return http_client.fetch(url)


def asyn_fetch_future_callback(future):
result = future.result()
print('future_callback')
print(result.request.url, result.code, result.reason, result.request_time)
stop_loop(1)


if __name__ == '__main__':
result_future = asyn_fetch_future('http://www.apple.com/cn/') #1
result_future.add_done_callback(asyn_fetch_future_callback)
start_loop()

#1 处通过tornado 提供的异步 httpclient 获得一个 Future,为 Future 添加执行结果回调函数获得执行结果,Future 的执行结果也是一个 Future。

输出结果如下

1
2
3
4
5
====> ioloop start
future_callback
('http://www.apple.com/cn/', 200, 'OK', 0.5546278953552246)
====> ioloop end
0.71 real 0.06 user 0.06 sys

这种调用方式必须手动启动 ioloop 并在调用结束之后手动销毁 ioloop,想要获得调用结果必须为 Future 添加回调。

只关心调用,不在乎结果

有时候我们并不在乎函数的调用结果,只要函数正确执行即可,ioloop 提供了 spawn_callback 来执行这样的操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# -*- coding:utf-8 -*-
import tornado.gen
import tornado.ioloop

from util import start_loop, stop_loop


@tornado.gen.coroutine
def divide(x, y):
return x / y


if __name__ == '__main__':
# The IOLoop will catch the exception and print a stack trace in
# the logs. Note that this doesn't look like a normal call, since
# we pass the function object to be called by the IOLoop.
tornado.ioloop.IOLoop.current().spawn_callback(divide, 1, 0)
tornado.ioloop.IOLoop.current().add_timeout(
tornado.ioloop.time.time() + 1, stop_loop, 1)
start_loop()

借助 spawn_callback 可以直接执行函数调用,但是依然需要手动处理 ioloop 的开闭。

一次性执行

run_sync 可以自动开启 ioloop 并在函数执行结束之后关闭 ioloop,此种调用在执行一次性操作时非常有用,比如要对数据库进行一次性修改:

1
2
3
4
5
6
7
8
9
10
@tornado.gen.coroutine
def init_tag():
tb_tag = Tag() #1
for i in range(10):
result = yield tb_tag.insert({'name': random.choice('abcdefghjklpoiuytrewq')})
print(result)


if __name__ == '__main__':
tornado.ioloop.IOLoop.current().run_sync(lambda: init_tag())

#1 是一个 motor Mongodb collection,借助 run_sync 可以非常方便对数据库执行操作,由于run_sync 只接受一个参数,如果函数调用需要传参,可以把函数封装成 lambda。

使用 callback*

如果调用需要与 callback 函数进行交互,可以利用 gen.task

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# -*- coding:utf-8 -*-
import tornado.gen
import tornado.ioloop

from util import start_loop, stop_loop


@tornado.gen.coroutine
def call_task():
result = yield tornado.gen.Task(some_function, 1, 2)
raise tornado.gen.Return(result)


def fetch_coroutine_callback(future):
print('coroutine callback ==> ', future.result())
stop_loop(1)


def some_function(x, y, callback=None):
print('some_function called')
callback(x * y)


if __name__ == '__main__':
future = call_task()
future.add_done_callback(fetch_coroutine_callback)
start_loop()

使用 gen.task 可以直接返回一个 Future ,被调用的函数会自动增加一个 callback 函数,用来在函数执行结束之后把结果进行回调通知。

通过 threadPool 来调用阻塞操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#-*- coding:utf-8 -*-
from tornado.concurrent import Future
from tornado import gen
import time
from concurrent.futures import ThreadPoolExecutor

from util import start_loop, stop_loop


EXECUTOR = ThreadPoolExecutor(max_workers=4)


def fetch_coroutine_callback(future):
print('coroutine callback')
print(future.result())
stop_loop(2)


def sleep_func(t):
print('sleep_func call')
time.sleep(t)
return 'blocking func result'


@gen.coroutine
def blocking():
result = yield EXECUTOR.submit(sleep_func, *(4, ))
raise gen.Return(result)


@gen.coroutine
def not_blocking():
future = Future()
future.set_result('not blocking func result')
result = yield future
raise gen.Return(result)


if __name__ == '__main__':
"""
用多线程的方式借助协程实现异步调用
"""
blocking().add_done_callback(fetch_coroutine_callback)
not_blocking().add_done_callback(fetch_coroutine_callback)
start_loop()

如果代码中有阻塞操作,可以借助 ThreadPoolExecutor 来完成异步的调用,这样把耗时的操作交给别的线程,可以使当前的线程继续执行后续的操作。这里的阻塞操作一般是 IO 或者其他系统调用。

并行执行

tornado 的 Future 是支持并行执行的,可以对多个 future 进行 yield 操作并返回多个结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# -*- coding:utf-8 -*-
from tornado.httpclient import AsyncHTTPClient
from tornado.concurrent import Future
from tornado import gen

from util import stop_loop, start_loop


@gen.coroutine
def fetch_many_coroutine(urls):
http_client = AsyncHTTPClient()
response1, response2 = yield [http_client.fetch(urls[0]), http_client.fetch(urls[1])]
raise gen.Return([response1, response2])

@gen.coroutine
def fetch_coroutine(urls):
http_client = AsyncHTTPClient()
responses = yield [http_client.fetch(url) for url in urls]
raise gen.Return(responses)


def fetch_coroutine_callback(future):
print('coroutine callback')
for result in future.result():
print(
result.request.url,
result.code, result.reason, result.request_time)
stop_loop(2)


if __name__ == '__main__':
"""
使用gen.coroutine 可以很方便让包含 yield 的函数返回future
"""
result_future = fetch_coroutine(['https://baidu.com', 'https://baidu.com'])
result_future.add_done_callback(fetch_coroutine_callback)
result_future = fetch_many_coroutine(['https://baidu.com', 'https://baidu.com'])
result_future.add_done_callback(fetch_coroutine_callback)
start_loop()

文中所有代码见 tornado-asyn,欢迎指正。

三月沙 wechat
扫描关注 wecatch 的公众号