# mini协程库

# 引入Future

引入 Future ,代表一个在未来才能获取到的数据。Future 一般由协程创建,典型的场景是这样的:协程在等待一个 IO 事件,这时它便创建一个 Future 对象,并把执行权归还给事件循环。

例子中的 Future 类,有 4 个重要的属性:

  • loop ,当前事件循环对象;
  • done ,标识目标数据是否就绪;
  • result ,目标数据;
  • co ,关联协程,Future 就绪后,事件循环 loop 将把它放入可执行队列重新调度;

注意到,Future 是一个 可等待对象 ( awaitable ),它实现了 await 方法。当数据未就绪时,通过 yield 让出执行权,这时事件循环将协程记录在 Future 中。当数据就绪后,事件循环将协程放回可执行队列重新调度。

# 实现

import select

from collections import deque
from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR

def create_listen_socket(bind_addr='0.0.0.0', bind_port=55555, backlogs=102400):
    sock = socket(AF_INET, SOCK_STREAM)
    sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    sock.bind((bind_addr, bind_port))
    sock.listen(backlogs)
    return sock

class Future:

    def __init__(self, loop):
        self.loop = loop
        self.done = False
        self.result = None
        self.co = None

    def set_coroutine(self, co):
        self.co = co

    def set_result(self, result):
        self.done = True
        self.result = result

        if self.co:
            self.loop.add_coroutine(self.co)

    def __await__(self):
        if not self.done:
            yield self
        return self.result

class AsyncSocket:

    def __init__(self, sock, loop):
        sock.setblocking(False)

        self.sock = sock
        self.loop = loop

    def fileno(self):
        return self.sock.fileno()

    def create_future_for_events(self, events):
        future = self.loop.create_future()

        def handler(fileno, active_events):
            self.loop.unregister_from_polling(self.fileno())
            future.set_result(active_events)

        self.loop.register_for_polling(self.fileno(), events, handler)

        return future

    async def accept(self):
        while True:
            try:
                sock, addr = self.sock.accept()
                return AsyncSocket(sock=sock, loop=self.loop), addr
            except BlockingIOError:
                future = self.create_future_for_events(select.EPOLLIN)
                await future

    async def recv(self, bufsize):
        while True:
            try:
                return self.sock.recv(bufsize)
            except BlockingIOError:
                future = self.create_future_for_events(select.EPOLLIN)
                await future

    async def send(self, data):
        while True:
            try:
                return self.sock.send(data)
            except BlockingIOError:
                future = self.create_future_for_events(select.EPOLLOUT)
                await future

class EventLoop:

    def __init__(self):
        self.epoll = select.epoll()

        self.runnables = deque()
        self.handlers = {}

    def create_future(self):
        return Future(loop=self)

    def create_listen_socket(self, bind_addr, bind_port, backlogs=102400):
        sock = create_listen_socket(bind_addr, bind_port, backlogs)
        return AsyncSocket(sock=sock, loop=self)

    def register_for_polling(self, fileno, events, handler):
        print('register fileno={} for events {}'.format(fileno, events))
        self.handlers[fileno] = handler
        self.epoll.register(fileno, events)

    def unregister_from_polling(self, fileno):
        print('unregister fileno={}'.format(fileno))
        self.epoll.unregister(fileno)
        self.handlers.pop(fileno)

    def add_coroutine(self, co):
        self.runnables.append(co)

    def run_coroutine(self, co):
        try:
            future = co.send(None)
            future.set_coroutine(co)
        except StopIteration as e:
            print('coroutine {} stopped'.format(co.__name__))

    def schedule_runnable_coroutines(self):
        while self.runnables:
            self.run_coroutine(co=self.runnables.popleft())

    def run_forever(self):
        while True:
            self.schedule_runnable_coroutines()

            events = self.epoll.poll(1)
            for fileno, event in events:
                handler = self.handlers.get(fileno)
                if handler:
                    handler(fileno, events)

class TcpServer:

    def __init__(self, loop, bind_addr='0.0.0.0', bind_port=55555):
        self.loop = loop
        self.listen_sock = self.loop.create_listen_socket(bind_addr=bind_addr, bind_port=bind_port)
        self.loop.add_coroutine(self.serve_forever())

    async def serve_client(self, sock):
        while True:
            data = await sock.recv(1024)
            if not data:
                print('client disconnected')
                break

            await sock.send(data.upper())

    async def serve_forever(self):
        while True:
            sock, (addr, port) = await self.listen_sock.accept()
            print('client connected addr={} port={}'.format(addr, port))

            self.loop.add_coroutine(self.serve_client(sock))

def main():
    loop = EventLoop()
    server = TcpServer(loop=loop)
    loop.run_forever()

if __name__ == '__main__':
    main()

TcpServer 只是一个普通的协程式应用,无须赘述。接下来,我们逐步分析,看看程序启动后都发生什么事情:

  1. 创建事件循环 EventLoop 对象,它将创建 epoll 描述符;
  2. 创建 TcpServer 对象,它通过事件循环 loop 创建监听套接字,并将 serve_forever 协程放入可执行队列;
  3. 事件循环 loop.run_forever 开始执行,它先调度可执行队列;
  4. 可执行队列一开始只有一个协程 TcpServer.serve_forever ,它将开始执行 (由 run_coroutine 驱动);
  5. 执行权来到 TcpServer.serve_forever 协程,它调用 AsyncSocket.accept 准备接受一个新连接;
  6. 假设原生套接字未就绪,它将抛出 BlockingIOError 异常;
  7. 由于 IO 未就绪,协程创建一个 Future 对象,用来等待一个未来的 IO 事件 ( AsyncSocket.accept );
  8. 于此同时,协程调用事件循环 register_for_polling 方法订阅 IO 事件,并注册回调处理函数 handler
  9. future 是可以个可等待对象,await future 将执行权交给它的 await 函数;
  10. 由于一开始 future 是未就绪的,这时 yield 将协程执行逐层归还给事件循环,future 对象也被同时上报;
  11. 执行权回到事件循环,run_coroutine 收到协程上报的 future 后将协程设置进去,以便 future 就绪后重新调度协程;
  12. 可执行队列变空后,事件循环开始调用 epoll.poll 等待协程注册的 IO 事件 ( serve_forever );
  13. 当注册事件到达后,事件循环取出回调处理函数并调用;
  14. handler 先将套接字从 epoll 解除注册,然后调用 set_result 将活跃事件作为目标数据记录到 future 中;
  15. set_result 将协程重新放回可执行队列;
  16. IO 事件处理完毕,进入下一次事件循环;
  17. 事件循环再次调度可执行队列,这时 TcpServer.serve_forever 协程再次拿到执行权;
  18. TcpServer.serve_forever 协程从 yield 语句恢复执行,开始返回目标数据,也就是先前设置的活跃事件;
  19. AsyncSocket.acceptawait future 语句取得活跃事件,然后循环继续;
  20. 循环再次调用原生套接字,这时它早已就绪,得到一个新套接字,简单包装后作为结果返回给调用者;
  21. TcpServer.serve_forever 拿到代表新连接的套接字后,创建一个 serve_client 协程并交给事件循环 loop
  22. TcpServer.serve_forever 进入下一次循环,调用 accept 准备接受下一个客户端连接;
  23. 如果监听套接字未就绪,执行权再次回到事件循环;
  24. 事件循环接着调度可执行队列里面的协程,TcpServer.serve_client 协程也开始执行了;
  25. etc