# mini协程库
# 引入Future
引入 Future ,代表一个在未来才能获取到的数据。Future 一般由协程创建,典型的场景是这样的:协程在等待一个 IO 事件,这时它便创建一个 Future 对象,并把执行权归还给事件循环。
例子中的 Future 类,有 4 个重要的属性:
- loop ,当前事件循环对象;
- done ,标识目标数据是否就绪;
- result ,目标数据;
- co ,关联协程,Future 就绪后,事件循环 loop 将把它放入可执行队列重新调度;
注意到,Future 是一个 可等待对象 ( awaitable ),它实现了 await 方法。当数据未就绪时,通过 yield 让出执行权,这时事件循环将协程记录在 Future 中。当数据就绪后,事件循环将协程放回可执行队列重新调度。
# 实现
import select
from collections import deque
from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
def create_listen_socket(bind_addr='0.0.0.0', bind_port=55555, backlogs=102400):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind((bind_addr, bind_port))
sock.listen(backlogs)
return sock
class Future:
def __init__(self, loop):
self.loop = loop
self.done = False
self.result = None
self.co = None
def set_coroutine(self, co):
self.co = co
def set_result(self, result):
self.done = True
self.result = result
if self.co:
self.loop.add_coroutine(self.co)
def __await__(self):
if not self.done:
yield self
return self.result
class AsyncSocket:
def __init__(self, sock, loop):
sock.setblocking(False)
self.sock = sock
self.loop = loop
def fileno(self):
return self.sock.fileno()
def create_future_for_events(self, events):
future = self.loop.create_future()
def handler(fileno, active_events):
self.loop.unregister_from_polling(self.fileno())
future.set_result(active_events)
self.loop.register_for_polling(self.fileno(), events, handler)
return future
async def accept(self):
while True:
try:
sock, addr = self.sock.accept()
return AsyncSocket(sock=sock, loop=self.loop), addr
except BlockingIOError:
future = self.create_future_for_events(select.EPOLLIN)
await future
async def recv(self, bufsize):
while True:
try:
return self.sock.recv(bufsize)
except BlockingIOError:
future = self.create_future_for_events(select.EPOLLIN)
await future
async def send(self, data):
while True:
try:
return self.sock.send(data)
except BlockingIOError:
future = self.create_future_for_events(select.EPOLLOUT)
await future
class EventLoop:
def __init__(self):
self.epoll = select.epoll()
self.runnables = deque()
self.handlers = {}
def create_future(self):
return Future(loop=self)
def create_listen_socket(self, bind_addr, bind_port, backlogs=102400):
sock = create_listen_socket(bind_addr, bind_port, backlogs)
return AsyncSocket(sock=sock, loop=self)
def register_for_polling(self, fileno, events, handler):
print('register fileno={} for events {}'.format(fileno, events))
self.handlers[fileno] = handler
self.epoll.register(fileno, events)
def unregister_from_polling(self, fileno):
print('unregister fileno={}'.format(fileno))
self.epoll.unregister(fileno)
self.handlers.pop(fileno)
def add_coroutine(self, co):
self.runnables.append(co)
def run_coroutine(self, co):
try:
future = co.send(None)
future.set_coroutine(co)
except StopIteration as e:
print('coroutine {} stopped'.format(co.__name__))
def schedule_runnable_coroutines(self):
while self.runnables:
self.run_coroutine(co=self.runnables.popleft())
def run_forever(self):
while True:
self.schedule_runnable_coroutines()
events = self.epoll.poll(1)
for fileno, event in events:
handler = self.handlers.get(fileno)
if handler:
handler(fileno, events)
class TcpServer:
def __init__(self, loop, bind_addr='0.0.0.0', bind_port=55555):
self.loop = loop
self.listen_sock = self.loop.create_listen_socket(bind_addr=bind_addr, bind_port=bind_port)
self.loop.add_coroutine(self.serve_forever())
async def serve_client(self, sock):
while True:
data = await sock.recv(1024)
if not data:
print('client disconnected')
break
await sock.send(data.upper())
async def serve_forever(self):
while True:
sock, (addr, port) = await self.listen_sock.accept()
print('client connected addr={} port={}'.format(addr, port))
self.loop.add_coroutine(self.serve_client(sock))
def main():
loop = EventLoop()
server = TcpServer(loop=loop)
loop.run_forever()
if __name__ == '__main__':
main()
TcpServer 只是一个普通的协程式应用,无须赘述。接下来,我们逐步分析,看看程序启动后都发生什么事情:
- 创建事件循环 EventLoop 对象,它将创建 epoll 描述符;
- 创建 TcpServer 对象,它通过事件循环 loop 创建监听套接字,并将 serve_forever 协程放入可执行队列;
- 事件循环 loop.run_forever 开始执行,它先调度可执行队列;
- 可执行队列一开始只有一个协程 TcpServer.serve_forever ,它将开始执行 (由 run_coroutine 驱动);
- 执行权来到 TcpServer.serve_forever 协程,它调用 AsyncSocket.accept 准备接受一个新连接;
- 假设原生套接字未就绪,它将抛出 BlockingIOError 异常;
- 由于 IO 未就绪,协程创建一个 Future 对象,用来等待一个未来的 IO 事件 ( AsyncSocket.accept );
- 于此同时,协程调用事件循环 register_for_polling 方法订阅 IO 事件,并注册回调处理函数 handler ;
- future 是可以个可等待对象,await future 将执行权交给它的 await 函数;
- 由于一开始 future 是未就绪的,这时 yield 将协程执行逐层归还给事件循环,future 对象也被同时上报;
- 执行权回到事件循环,run_coroutine 收到协程上报的 future 后将协程设置进去,以便 future 就绪后重新调度协程;
- 可执行队列变空后,事件循环开始调用 epoll.poll 等待协程注册的 IO 事件 ( serve_forever );
- 当注册事件到达后,事件循环取出回调处理函数并调用;
- handler 先将套接字从 epoll 解除注册,然后调用 set_result 将活跃事件作为目标数据记录到 future 中;
- set_result 将协程重新放回可执行队列;
- IO 事件处理完毕,进入下一次事件循环;
- 事件循环再次调度可执行队列,这时 TcpServer.serve_forever 协程再次拿到执行权;
- TcpServer.serve_forever 协程从 yield 语句恢复执行,开始返回目标数据,也就是先前设置的活跃事件;
- AsyncSocket.accept 内 await future 语句取得活跃事件,然后循环继续;
- 循环再次调用原生套接字,这时它早已就绪,得到一个新套接字,简单包装后作为结果返回给调用者;
- TcpServer.serve_forever 拿到代表新连接的套接字后,创建一个 serve_client 协程并交给事件循环 loop ;
- TcpServer.serve_forever 进入下一次循环,调用 accept 准备接受下一个客户端连接;
- 如果监听套接字未就绪,执行权再次回到事件循环;
- 事件循环接着调度可执行队列里面的协程,TcpServer.serve_client 协程也开始执行了;
- etc
← epoll和事件循环模型 关键源码 →