Tornado异步网络访问

Yeolar   2013-02-09 13:52  

Tornado源码剖析 Tornado核心框架 Tornado工具模块

Tornado中包含了一整套完整而精巧的非阻塞的HTTP服务器和客户端实现。包括基于 epoll 的I/O事件循环机制,和在此基础上的流处理、HTTP客户端和TCP服务器,HTTP服务器见 Tornado核心框架 。异步网络访问框架是Tornado的底层基础。

tornado.ioloop - 事件循环

关键字

select.epoll, select.kqueue, select.select, fcntl.fcntl, os.pipe, os.fdopen, threading.Lock, epoll.register, epoll.modify, epoll.unregister, epoll.poll, signal.signal, signal.setitimer, traceback.format_stack, thread.get_ident, heapq.heappush, heapq.heappop

ioloop 模块提供了一个非阻塞套接字的I/O事件循环。它基于 epoll/kqueue 这两个 就绪改变时通知的事件API ,如果Python版本低于2.6,可以使用Tornado中提供的 epoll 的Python接口封装,如果系统中原生不支持 epoll ,则使用 selectepoll/kqueue 同时支持水平触发和边缘触发,Tornado默认使用水平触发的方式。

一般的应用只实例化一个 IOLoop 对象,通过 instance 方法,一些特殊的应用,比如多个线程或者测试,会使用多个 IOLoop

IOLoop 中有四个数据集, _events_handlers 保存I/O事件和对应的处理器, _callbacks_timeouts 保存(超时)回调。

对处理器的修改使用:

 1 def add_handler(self, fd, handler, events):
 2     self._handlers[fd] = stack_context.wrap(handler)
 3     self._impl.register(fd, events | self.ERROR)
 4 
 5 def update_handler(self, fd, events):
 6     self._impl.modify(fd, events | self.ERROR)
 7 
 8 def remove_handler(self, fd):
 9     self._handlers.pop(fd, None)
10     self._events.pop(fd, None)
11     try:
12         self._impl.unregister(fd)
13     ...

默认会把 ERROR 添入事件集中。 stack_context.wrap 的作用见 异步回调的异常处理

添加回调可以通过:

 1 def add_timeout(self, deadline, callback):
 2     timeout = _Timeout(deadline, stack_context.wrap(callback))
 3     heapq.heappush(self._timeouts, timeout)
 4     return timeout
 5 
 6 def remove_timeout(self, timeout):
 7     timeout.callback = None  # 从堆中删除元素代价较大,通过使其失效的办法来删除
 8 
 9 def add_callback(self, callback):
10     with self._callback_lock:
11         list_empty = not self._callbacks
12         self._callbacks.append(stack_context.wrap(callback))
13     if list_empty and thread.get_ident() != self._thread_ident:
14         # 如果当前不是IOLoop所在的线程,就不知道它是否已经在轮询,因此我们
15         # 在_callbacks之前为空的情况下,唤醒它(结束轮询)
16         self._waker.wake()

_Timeout 是一个带截止时间的回调的类,把它以堆的结构存储,可以以较低代价取得最小的 timeoutadd_callbackIOLoop 中唯一的线程安全的交互方法,可以用于把控制从其他的线程转到 IOLoop 的线程。

下面我们来看看 start 方法:

 1 def start(self):
 2     if self._stopped:
 3         self._stopped = False
 4         return
 5     ...
 6     self._running = True
 7     while True:
 8         poll_timeout = 3600.0   # 轮询超时时间
 9 
10         with self._callback_lock:   # 新增的回调推迟到下次事件循环执行
11             callbacks = self._callbacks
12             self._callbacks = []
13         for callback in callbacks:
14             self._run_callback(callback)
15 
16         if self._timeouts:
17             now = time.time()
18             while self._timeouts:
19                 if self._timeouts[0].callback is None:
20                     heapq.heappop(self._timeouts)   # 删除已经取消的
21                 elif self._timeouts[0].deadline <= now:
22                     timeout = heapq.heappop(self._timeouts)
23                     self._run_callback(timeout.callback)
24                 else:
25                     # 把轮询超时时间设置为刚好到下一个超时回调
26                     seconds = self._timeouts[0].deadline - now
27                     poll_timeout = min(seconds, poll_timeout)
28                     break
29 
30         if self._callbacks:
31             poll_timeout = 0.0  # 如果有新增的回调,让轮询不等待
32 
33         if not self._running:   # 已经通过stop被停止
34             break
35 
36         if self._blocking_signal_threshold is not None:
37             signal.setitimer(signal.ITIMER_REAL, 0, 0)  # 不打扰轮询
38 
39         try:
40             event_pairs = self._impl.poll(poll_timeout) # 轮询
41         ...
42 
43         if self._blocking_signal_threshold is not None:
44             signal.setitimer(signal.ITIMER_REAL,
45                              self._blocking_signal_threshold, 0)
46 
47         self._events.update(event_pairs)    # 添加新增的事件
48         # 每次取一个,避免_events变化造成的影响
49         while self._events:
50             fd, events = self._events.popitem()
51             try:
52                 self._handlers[fd](fd, events)
53             ...
54     self._stopped = False
55     if self._blocking_signal_threshold is not None:
56         signal.setitimer(signal.ITIMER_REAL, 0, 0)

可以看到,主循环中先执行即时回调,然后执行超时回调,调整轮询的等待时间,最后做轮询并对得到的事件进行处理。

通过 stop 可以停止事件循环:

1 def stop(self):
2     self._running = False
3     self._stopped = True
4     self._waker.wake()

可以像下面这样在同步代码中使用异步方法(比如单元测试):

1 ioloop = IOLoop()
2 async_method(ioloop=ioloop, callback=ioloop.stop)
3 ioloop.start()

实际当中可能还需要对 startstop 做一层封装以得到异步方法的结果,可以参考 异步代码的单元测试

IOLoop 还支持用 set_blocking_signal_threshold 方法设置一个阻塞超时的信号。

_Timeout 是一个超时类,可以给它设置一个截止时间和一个回调函数,截止时间支持UNIX时间戳和 datetime.timedelta

还有一个 PeriodicCallback 类,它包含了一个 io_loop 成员,用于周期性回调,可以作为调度器。

考虑到不同的Python版本和不同系统对事件机制的支持不完善,本模块还实现了 _EPoll_KQueue_Select 三个类。

此外值得一提的是 Waker 类。因为 epoll 监听来自描述符的事件,所以唤醒它的一个简单办法就是建立一个管道,让 epoll 监听它,需要唤醒时就往里面写点儿什么。

tornado.iostream - 非阻塞套接字的封装

关键字

collections.deque, socket.fileno, socket.connect, socket.getsockopt, socket.error, socket.recv, socket.send, socket.do_handshake, socket.read, errno, re

iostream 模块是Tornado的另一个底层模块,它是对 socket 的封装,在 ioloop 模块的基础上提供了非阻塞的流处理。

iostream 中对功能的封装层次较多,这是为了能够更灵活地做数据读写处理,满足各种情况的需要。 _read_from_socketsocket.recv 做了简单的包装,可以根据需要在子类中覆盖它,比如在 SSLIOStream 中改为包装 socket.read_read_to_buffer 封装了 _read_from_socket ,加入了异常处理,并把数据保存到 _read_buffer_read_from_buffer 则负责按照给定条件(如 _read_delimiter_read_regex )用 _consume 消费 _read_buffer 中的数据,通过 _run_callback 调用回调函数。在实际使用时,数据读取的接口为 read_until_regexread_untilread_bytesread_until_close ,除了 read_until_close 直接用 _consume_add_io_state 来完成读取之外,其他的三个函数都是通过 _try_inline_read 间接调用 _read_to_buffer_read_from_buffer 来完成的。写入由 write 直接调用 _handle_write ,后者封装了 socket.send

iostream 的最大缓存通过 max_buffer_size 设置,默认为104857600(100MB),从 socket 读取的块大小通过 read_chunk_size 设置,默认为4096(4KB)。写入缓存做了分块处理,块的尺寸限制在128KB。

还有一个小的细节是在 _read_from_buffer_handle_write_consume 时,对数据队列头做了合并处理,来避免过小的分块。

SSLIOStream 继承了 IOStream ,它也给出了实现自定义的流处理类的例子。

iostream 模块的重点是对 socket 的封装和对数据缓存的处理。关于网络套接字的相关知识,可以参考Stevens先生的:

  • Advanced Programming in the UNIX Environment, Addison-Wesley, 1992.
  • UNIX Network Programming, Volume 1, Second Edition: Networking APIs: Sockets and XTI, Prentice Hall, 1998.

tornado.httpclient - 非阻塞HTTP客户端

关键字

weakref.WeakKeyDictionary, calendar.timegm, email.utils.formatdate, httplib.responses.get

Tornado中提供了两种HTTP客户端的实现,一个是阻塞模式的 HTTPClient ,另一个是非阻塞的 AsyncHTTPClient

前者实际上是用后者实现的,单独实例化一个 IOLoopfetch 启动 _io_loop ,并等待异步的 fetch 的完成,然后停止 _io_loop 并返回 responseclose 调用异步的 close 完成资源的回收。

AsyncHTTPClient 的实现有些技巧:它实际上只提供了一个公共接口,通过 __new__ 来实例化它的子类;并且它的实例化实现为一种伪单例的模式,对应每个 IOLoop 实例只实例化一个实例,可以传递 force_instance=True 来解除这种设定。因为它的伪单例模式是将实例关联到了 IOLoop 实例,这就有个 IOLoop 实例的生存期的问题,不应该影响 IOLoop 实例的正常释放,所以这里用到了 weakref.WeakKeyDictionary ,它是键为弱引用的字典。

 1 @classmethod
 2 def _async_clients(cls):    # 返回弱引用键的字典
 3     assert cls is not AsyncHTTPClient, "should only be called on subclasses"
 4     if not hasattr(cls, '_async_client_dict'):
 5         cls._async_client_dict = weakref.WeakKeyDictionary()
 6     return cls._async_client_dict
 7 
 8 def __new__(cls, io_loop=None, max_clients=None, force_instance=False,
 9             **kwargs):
10     io_loop = io_loop or IOLoop.instance()
11     if cls is AsyncHTTPClient:
12         if cls._impl_class is None:
13             from tornado.simple_httpclient import SimpleAsyncHTTPClient
14             AsyncHTTPClient._impl_class = SimpleAsyncHTTPClient
15         impl = AsyncHTTPClient._impl_class
16     else:
17         impl = cls
18     if io_loop in impl._async_clients() and not force_instance:
19         return impl._async_clients()[io_loop]   # 伪单例模式
20     else:
21         instance = super(AsyncHTTPClient, cls).__new__(impl)
22         ...
23         instance.initialize(io_loop, **args)
24         if not force_instance:
25             impl._async_clients()[io_loop] = instance
26         return instance

用于实现 AsyncHTTPClient 的子类可以通过 configure 来进行配置。默认为 SimpleAsyncHTTPClient ,可以配置成如 CurlAsyncHTTPClient

HTTPRequestHTTPResponse 用来对请求和响应作封装,具体的意义参见文档。注意如果使用 SimpleAsyncHTTPClient 实现的话,代理是不可用的。另一个需要提一下的是 connect_timeoutrequest_timeoutconnect_timeout 指建立连接的超时时间, request_timeout 指整个请求的超时时间,它包括建立连接的阶段,所以实际上建立连接的超时时间取的是这两者的最小值。

HTTPError 用来封装HTTP请求的异常。

tornado.simple_httpclient - 基于 IOStream 的HTTP客户端实现

关键字

collections.deque, functools.partial, urlparse.urlsplit, urlparse.urljoin, re.match, socket, socket.getaddrinfo, ssl, base64.b64encode, contextlib.contextmanager, zlib.decompressobj, copy.copy, StringIO

IOStream 处于网络的传输层,而 *HTTPClient 则处于网络的应用层。 SimpleAsyncHTTPClient 基于 IOLoopIOStream 实现了一个非常精巧的HTTP客户端。它的并发客户端数可以通过 max_clients 来设置,默认为10个。连接请求会被放到 queue 里, _process_queue 负责从 queue 里面取出请求,实例化 _HTTPConnection 连接来完成请求。每个连接处理完成后,通过回调 _release_fetch 重新激活 _process_queue 检查是否队列中还有未处理的请求。

 1 def _process_queue(self):
 2     with stack_context.NullContext():
 3         while self.queue and len(self.active) < self.max_clients:
 4             request, callback = self.queue.popleft()    # 取出待处理请求
 5             key = object()
 6             self.active[key] = (request, callback)
 7             _HTTPConnection(self.io_loop, self, request,    # 发起请求
 8                             functools.partial(self._release_fetch, key),
 9                             callback,
10                             self.max_buffer_size)

_HTTPConnection 完成了对HTTP协议的处理。它可以处理http和https两种协议,并且支持IPv6,https连接只在包含ssl库的情况下支持(Python 2.6+)。对https的验证通过 request 的参数 validate_certca_certsclient_keyclient_cert 来指定,默认会使用ca-certificates.crt证书,关于CA认证,可以参考 Certificates Technical Reference 的系列文章。关于SSL协议版本的选择问题,可以参考 SSL Survey: How many sites support TLS 1.1 and better?

__init__ 中完成了http和https的解析,同时它还支持host映射,用来替换修改/etc/hosts的需要。host映射通过 *HTTPClienthostname_mapping 来配置。最后,它发起请求,并将回调函数设为 _on_connect 。请求的处理和发送实际发生在 _on_connect 中,请求被构建成HTTP协议标准的数据格式,发送出去,目前还不支持代理。请求返回后,在 _on_headers 中完成对头部的解析。它支持数据的gzip压缩,用zlib库解压gzip数据的方法参考 How can I decompress a gzip stream with zlib? 。此外,还支持数据分块。对实体数据的处理由 _on_body 完成,如果数据被分块,则先由 _on_chunk_length 重组数据。是否分块的一个影响是 request.streaming_callback 是以块为单位做回调的,所以如果分块,该回调会多次执行。 _on_body 会对重定向(包括301、302、303、307)重新构建请求,调用 client.fetch 重新请求,并关闭当前流。如果没有重定向, _on_body 负责构建 response ,回调 final_callback ,并关闭流。

_HTTPConnection 中出现了四种回调:来自请求的 header_callback ,在完成头部处理后调用;来自请求的 streaming_callback ,在数据解析时调用; _release_callback 用于满足 SimpleAsyncHTTPClient 的队列管理需要; final_callback 则是发起请求的用户代码的回调函数。

关于HTTP协议的更多相关内容,可以参考 HTTP: The Definitive Guide, O'Reilly, 2002。

tornado.curl_httpclient - 基于 pycurl 的HTTP客户端实现

关键字

pycurl, pycurl.CurlMulti, pycurl.CurlMulti.{setopt, socket_action, socket_all, timeout, info_read}, pycurl.Curl, pycurl.Curl.{setopt, unsetopt, getinfo}, collections.deque, StringIO, threading.activeCount

关于 CurlAsyncHTTPClient ,这里只简单解释一下。和 SimpleAsyncHTTPClient 相比,它支持的特性更多(包括HTTP代理等),速度更快,兼容性更好。所以在有libcurl的情况下,可以考虑使用它替代默认的 SimpleAsyncHTTPClient

CurlAsyncHTTPClient 默认也是10个客户端,但它还有并发连接的概念(libcurl内建),默认为5,可以通过 max_simultaneous_connections 来设置。当前空闲的curl客户端由 _free_list 维护。

 1 def _process_queue(self):
 2     with stack_context.NullContext():
 3         while True:
 4             started = 0
 5             while self._free_list and self._requests:
 6                 started += 1
 7                 curl = self._free_list.pop()    # 取空闲curl实例
 8                 (request, callback) = self._requests.popleft()  # 取出待处理请求
 9                 ...
10                 _curl_setup_request(curl, request, curl.info["buffer"],
11                                     curl.info["headers"])
12                 self._multi.add_handle(curl) # 向CurlMulti实例中添加Curl实例
13             if not started:
14                 break

tornado.netutil - 一些网络工具

关键字

socket, socket.fileno, socket.getaddrinfo, socket.socket, socket.setsockopt, socket.setblocking, socket.bind, socket.listen, socket.accept, socket.close, socket.error, ssl.wrap_socket

netutil 模块提供了一个单线程非阻塞模式的TCP服务器的实现。有关服务器的实现模式可以参考 The C10K Problem 中关于I/O策略的介绍。

netutil 中定义了关于套接字的两个封装函数: bind_sockets 用于创建指定端口的监听套接字,可以指定监听的地址/主机名,否则监听所有的网络接口。 add_accept_handler 用于将 socket.accept 的处理器注册到 IOLoop ,对接收到的请求做回调。

利用以上两个封装函数, TCPServer 实现了一个TCP服务器。 listen 添加监听的端口和地址,它可以重复添加,监听多个端口。 listen 实际调用 bind_socketsadd_socketsadd_sockets 调用了 add_accept_handler

 1 def listen(self, port, address=""):
 2     sockets = bind_sockets(port, address=address)
 3     self.add_sockets(sockets)
 4 
 5 def add_sockets(self, sockets):
 6     ...
 7     for sock in sockets:
 8         self._sockets[sock.fileno()] = sock
 9         add_accept_handler(sock, self._handle_connection,
10                            io_loop=self.io_loop)

TCPServer 中还提供了 bind 封装 bind_sockets ,和 listen 的区别是它把 add_sockets 分离到 start 中,这样可以在 start 中用 process.fork_process 来创建多个进程。

 1 def bind(self, port, address=None, family=socket.AF_UNSPEC, backlog=128):
 2     sockets = bind_sockets(port, address=address, family=family,
 3                            backlog=backlog)
 4     if self._started:
 5         self.add_sockets(sockets)
 6     else:
 7         self._pending_sockets.extend(sockets)   # 在多进程启动前,暂存
 8 
 9 def start(self, num_processes=1):
10     assert not self._started
11     self._started = True
12     if num_processes != 1:
13         process.fork_processes(num_processes)   # 启动多进程
14     sockets = self._pending_sockets
15     self._pending_sockets = []
16     self.add_sockets(sockets)                   # accept

当然,也可以直接用 bind_socketsadd_sockets 组合来启动服务器。所以,像文档中说明的那样,有三种启动方式:

  1. listen 单进程:

    1 server = TCPServer()
    2 server.listen(8888)
    3 IOLoop.instance().start()
    
  2. bind/start 多进程:

    1 server = TCPServer()    # 不能传递IOLoop
    2 server.bind(8888)
    3 server.start(0)  # 创建多进程
    4 IOLoop.instance().start()
    

    注意不能传递 IOLoopTCPServer ,因为 IOLoop 不能被进程间共享。如果想对多进程做更多的控制,可以用下面的第3种方法。

  3. add_sockets 更高级的多进程模式:

    1 sockets = bind_sockets(8888)
    2 tornado.process.fork_processes(0)   # 创建多进程
    3 server = TCPServer()    # 可以传递IOLoop
    4 server.add_sockets(sockets)
    5 IOLoop.instance().start()
    

    add_sockets 的方法也可以用在单进程模式下。

TCPServer_handle_connection 作为 add_accept_handler 的回调函数,它会对接收到的连接请求用 IOStream 建立连接,并调用 handle_stream 处理数据流。 handle_stream 的实现留给继承 TCPServer 的应用层实现来完成。 TCPServer 也支持ssl连接。

http://www.yeolar.com/note/2013/02/09/tornado-async-networking/

http://www.yeolar.com/note/2013/02/09/tornado-async-networking/