# epoll和事件循环模型

# 事件循环建模

简而言之,我们需要实现一个 事件循环 ( Event Loop ),它内部有一个 可执行 ( Runnable ) 协程队列:

图片描述

事件循环是一个永久循环,每次循环时它先调度可执行队列里的每个协程 —— 即从队列中取出一个可执行协程,然后调用 send 方法驱动它执行:

图片描述

协程执行的结果可分为两种不同情况。其一,协程没有遇到 IO 操作,一把梭哈到底并最后退出。这时, send 方法抛 StopIteration 异常通知调用者:

图片描述

其二,协程需要进行 IO 操作,这时它应该通过 yield 让出执行权,并将 IO 操作上下文提交给事件循环。IO 操作由事件循环负责执行,操作上下文必须记录协程信息:

图片描述

可执行队列处理完毕后,得到成一个个 IO 操作上下文,事件循环负责将它们注册到 epoll ,以便订阅 IO 事件:

图片描述
接着,事件循环通过 epoll 等待 IO 事件到达。当某个 IO 操作就绪时,事件循环将把对应协程重新放入可执行队列。假设协程 3 等待的 IO 操作已经就绪,epoll 将返回对应 IO 事件,执行 IO 处理函数并将协程放回可执行队列重新调度:

图片描述

事件循环处理完所有 epoll 事件后,将进入下一次循环。这时,又开始处理可执行队列,周而复始。

# epoll

由于事件循环需要同时关注多个 IO 操作,因此需要采用 IO 多路复用 技术。epoll 是 Linux 下的 IO 多路复用技术,很有代表性。接下来以 epoll 为例,简单探讨一下。

服务器应用一般需要通过 套接字 ( socket ) 监听某个端口,等待客户端连接。这个函数用于创建一个监听套接字:

from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR

def create_listen_socket(bind_addr='0.0.0.0', bind_port=55555, backlogs=102400):
    # 创建套接字
    sock = socket(AF_INET, SOCK_STREAM)
    # 设置地址复用选项
    sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    # 绑定监听地址和端口
    sock.bind((bind_addr, bind_port))
    # 开始监听
    sock.listen(backlogs)
  
    return sock

以默认参数创建一个监听套接字,并调用 accept 方法接受客户端连接:

>>> s = create_listen_socket()
>>> s.accept()

accept 调用将 阻塞 ,直到有客户端连接上来才会返回。现在,我们通过 telnet 命令模拟客户端连接:

$ telnet 127.0.0.1 55555

当客户端连上来后,accept 调用就返回了,返回值是一个元组。元组包含一个与客户端通讯的套接字,以及客户端的地址端口对信息:

>>> s.accept()
(<socket.socket fd=4, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 55555), raddr=('127.0.0.1', 41990)>, ('127.0.0.1', 41990))

如果程序还需要处理另一个监听套接字,事情就尴尬了。假设我们在 s 上等待客户端连接,这时 accept 将阻塞;就算 s2 套接字上来了新连接,也无法提前返回:

>>> s2 = create_listen_socket(bind_port=44444)
>>> s.accept()

这该怎么办呢?我们先把套接字设置成 非阻塞 状态,accept 就不会一直阻塞了:

>>> s.setblocking(False)
>>> s.accept()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/fasion/opt/python3.8.5/lib/python3.8/socket.py", line 292, in accept
    fd, addr = self._accept()
BlockingIOError: [Errno 11] Resource temporarily unavailable

由于 s 套接字上没有新连接,accept 将抛出 BlockingIOError 异常,以此告知调用者。这时,我们就可以抽出身来处理 s2 了。如果 s2 也没有新连接了,我们又再次检查 s 。

>>> s2.accept()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/fasion/opt/python3.8.5/lib/python3.8/socket.py", line 292, in accept
    fd, addr = self._accept()
BlockingIOError: [Errno 11] Resource temporarily unavailable
>>> s.accept()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/fasion/opt/python3.8.5/lib/python3.8/socket.py", line 292, in accept
    fd, addr = self._accept()
BlockingIOError: [Errno 11] Resource temporarily unavailable

最终,我们将在 s 和 s2 间来回 轮询 ,但轮询很浪费 CPU 资源!特别是套接字很多时,更是如此!如果能让内核同时关注多个套接字,当它们中有新连接达到时再通知我们就好了 —— 这就是 epoll 擅长的事。

当监听套接字上有新连接时,它会产生 读事件 。因此,我们可以创建一个 epoll 描述符,并将 s 和 s2 注册进去,订阅 读事件 ( EPOLLIN ):

>>> import select
>>> ep = select.epoll()
>>> ep.register(s.fileno(), select.EPOLLIN)
>>> ep.register(s2.fileno(), select.EPOLLIN)

接着,我们调用 poll 方法,等待我们感兴趣的事件:

>>> events = ep.poll()

poll 将一直阻塞,直到 s 或 s2 上有新连接达到。试着连一下 s2 :

$ telnet 127.0.0.1 44444

poll 立马停止阻塞,并向我们返回了一个事件列表,列表项是一个由 文件描述符事件掩码 组成的元组:

>>> events
[(6, 1)]
>>> for fileno, event in events:
...     print(fileno, event)
...
6 1

这个信息告诉我们,哪个套接字上有什么事件发生。如此一来,程序可以精准处理套接字,无须傻傻 轮询 。这就是 epoll 的强大能力,它让高效处理大规模套接字成为可能。