# 从生成器看协程本质

# 参考

# 前言

学习协程的第一门课程,是要认识生成器,有了生成器的基础,才能更好地理解协程

友情提醒: 代码均在Python3下编写。Python2中可能有所差异。

# 可迭代、迭代器、生成器

初学Python的时候,对于这三货真的是傻傻分不清。甚至还认为他们是等价的。

其实,他们是不一样的。

我们可以借助collections.abc这个模块(Python2没有),使用isinstance()来类别一个对象是否是可迭代的(Iterable),是否是迭代器(Iterator),是否是生成器(Generator)。

这几个判断方法,在这里适用,但并不是绝对适用,原因见后面补充说明。

import collections
from collections.abc import Iterable, Iterator, Generator

# 字符串
astr = "XiaoMing"
print("字符串:{}".format(astr))
print('可迭代: ', isinstance(astr, Iterable))
print('迭代器: ', isinstance(astr, Iterator))
print('生成器: ', isinstance(astr, Generator))

# 列表
alist = [21, 23, 32,19]
print("列表:{}".format(alist))
print('可迭代: ', isinstance(alist, Iterable))
print('迭代器: ', isinstance(alist, Iterator))
print('生成器: ', isinstance(alist, Generator))

# 字典
adict = {"name": "小明", "gender": "男", "age": 18}
print("字典:{}".format(adict))
print('可迭代: ', isinstance(adict, Iterable))
print('迭代器: ', isinstance(adict, Iterator))
print('生成器: ', isinstance(adict, Generator))

# deque
adeque=collections.deque('abcdefg')
print("deque:{}".format(adeque))
print('可迭代: ', isinstance(adeque, Iterable))
print('迭代器: ', isinstance(adeque, Iterator))
print('生成器: ', isinstance(adeque, Generator))

输出结果

字符串:XiaoMing
可迭代:  True
迭代器:  False
生成器:  False
列表:[21, 23, 32, 19]
可迭代:  True
迭代器:  False
生成器:  False
字典:{'name': '小明', 'gender': '男', 'age': 18}
可迭代:  True
迭代器:  False
生成器:  False
deque:deque(['a', 'b', 'c', 'd', 'e', 'f', 'g'])
可迭代:  True
迭代器:  False
生成器:  False

从结果来看,这些可迭代对象都不是迭代器,也不是生成器。

# 关于可迭代对象

  1. 可以通过,dir()方法查看,若有有__iter__说明是可迭代的,但是如果没有,也不能说明不可迭代,原因见第二条。
  2. 判断是否可迭代,不能仅看是否有__iter__ 来草率决定,因为只实现了__getitem__ 方法的也有可能是可迭代的。因为当没有__iter__时, Python 解释器会去找__getitem__,尝试按顺序(从索引0开始)获取元素,不抛异常,即是可迭代。
  3. 所以,最好的判断方法应该是通过 for循环或者iter() 去真实运行或者用isinstance(xx, Iterable)来进行判定。

# 关于迭代器

对比可迭代对象,迭代器其实就只是多了一个函数而已。就是__next__(),我们可以不再使用for循环来间断获取元素值。而可以直接使用next()方法来实现。

迭代器,是在可迭代的基础上实现的。要创建一个迭代器,我们首先,得有一个可迭代对象。 现在就来看看,如何创建一个可迭代对象,并以可迭代对象为基础创建一个迭代器。

from collections.abc import Iterable, Iterator, Generator

class MyList(object):  # 定义可迭代对象类

    def __init__(self, num):
        self.end = num  # 上边界

    # 返回一个实现了__iter__和__next__的迭代器类的实例
    def __iter__(self):
        return MyListIterator(self.end)


class MyListIterator(object):  # 定义迭代器类

    def __init__(self, end):
        self.data = end  # 上边界
        self.start = 0

    # 返回该对象的迭代器类的实例;因为自己就是迭代器,所以返回self
    def __iter__(self):
        return self

    # 迭代器类必须实现的方法,若是Python2则是next()函数
    def __next__(self):
        while self.start < self.data:
            self.start += 1
            return self.start - 1
        raise StopIteration


if __name__ == '__main__':
    my_list = MyList(5)  # 得到一个可迭代对象
    print(isinstance(my_list, Iterable))  # True
    print(isinstance(my_list, Iterator))  # False
    # 迭代
    for i in my_list:
        print(i)

    my_iterator = iter(my_list)  # 得到一个迭代器
    print(isinstance(my_iterator, Iterable))  # True
    print(isinstance(my_iterator, Iterator))  # True

    # 迭代
    print(next(my_iterator))
    print(next(my_iterator))
    print(next(my_iterator))
    print(next(my_iterator))
    print(next(my_iterator))

输出

0
1
2
3
4

True
False

True
True

0
1
2
3
4

如果上面的代码太多,也可以看这边,你更能理解。

>>> l = [1,2,3,4,5]
>>> dir(l)
['__add__', '__class__', '__class_getitem__', '__contains__', '__delattr__', '__delitem__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getitem__', '__gt__', '__hash__', '__iadd__', '__imul__', '__init__', '__init_subclass__', '__iter__', '__le__', '__len__', '__lt__', '__mul__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__reversed__', '__rmul__', '__setattr__', '__setitem__', '__sizeof__', '__str__', '__subclasshook__', 'append', 'clear', 'copy', 'count', 'extend', 'index', 'insert', 'pop', 'remove', 'reverse', 'sort']
>>> lit = l.__iter__()
>>> dir(lit)
['__class__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__iter__', '__le__', '__length_hint__', '__lt__', '__ne__', '__new__', '__next__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__setstate__', '__sizeof__', '__str__', '__subclasshook__']
>>> lit.__next__()
1
>>> lit.__next__()
2
>>> lit.__next__()
3
>>> lit.__next__()
4
>>> lit.__iter__() is lit
True

补充说明:

  1. 迭代器,是其内部实现了,__next__ 这个魔术方法。(Python3.x)
  2. 可以通过,dir()方法来查看是否有__next__来判断一个变量是否是迭代器的。

# 关于生成器

接下来,是我们的重点,生成器

生成器的概念在 Python 2.2 中首次出现,之所以引入生成器,是为了实现一个在计算下一个值时不需要浪费空间的结构。

前面我们说,迭代器,是在可迭代的基础上,加了一个next()方法。 而生成器,则是在迭代器的基础上(可以用for循环,可以使用next()),再实现了yield

yield 相当于函数中的return。在每次next(),或者for遍历的时候,都会yield这里将新的值返回回去,并在这里阻塞,等待下一次的调用。正是由于这个机制,才使用生成器在Python编程中大放异彩。实现节省内存,实现异步编程。

yield除了return机制,还可以在下次再次调用的时候接受一个值并赋左值(下面的x变量),这里不太关注这点,下不赘述。

def f():
    for i in range(10):
        x = yield i
        print(f'{i} - {x}')

如何创建一个生成器:

L = (x * x for x in range(10))
print(isinstance(L, Generator))  # True

或是一个带yield的函数

# 实现了yield的函数
def mygen(n):
    now = 0
    while now < n:
        yield now
        now += 1

if __name__ == '__main__':
    gen = mygen(10)
    print(isinstance(gen, Generator))  # True

可迭代对象和迭代器,是将所有的值都生成存放在内存中,而生成器则是需要元素才临时生成,节省时间,节省空间。

# 运行/激活生成器

由于生成器并不是一次生成所有元素,而是一次一次的执行返回,那么如何刺激生成器执行(或者说激活)呢?

激活主要有两个方法 - 使用next() - 使用generator.send(None)

分别看下例子,你就知道了。

def mygen(n):
    now = 0
    while now < n:
        yield now
        now += 1

if __name__ == '__main__':
    gen = mygen(4)

    # 通过交替执行,来说明这两种方法是等价的。
    print(gen.send(None))
    print(next(gen))
    print(gen.send(None))
    print(next(gen))

输出

0
1
2
3

# 生成器的执行状态

生成器在其生命周期中,会有如下四个状态

  • GEN_CREATED ## 等待开始执行
  • GEN_RUNNING ## 解释器正在执行(只有在多线程应用中才能看到这个状态)
  • GEN_SUSPENDED ## 在yield表达式处暂停
  • GEN_CLOSED ## 执行结束

通过代码来感受一下,为了不增加代码理解难度,GEN_RUNNING这个状态,我就不举例了。有兴趣的同学,可以去尝试一下多线程。若有疑问,可在后台回复我。

from inspect import getgeneratorstate

def mygen(n):
    now = 0
    while now < n:
        yield now
        now += 1

if __name__ == '__main__':
    gen = mygen(2)
    print(getgeneratorstate(gen))

    print(next(gen))
    print(getgeneratorstate(gen))

    print(next(gen))
    gen.close()  # 手动关闭/结束生成器
    print(getgeneratorstate(gen))

输出

GEN_CREATED
0
GEN_SUSPENDED
1
GEN_CLOSED

# 生成器的异常处理

在生成器工作过程中,若生成器不满足生成元素的条件,就/应该 抛出异常(StopIteration)。

通过列表生成式构建的生成器,其内部已经自动帮我们实现了抛出异常这一步。不信我们来看一下。

image2

所以我们在自己定义一个生成器的时候,我们也应该在不满足生成元素条件的时候,抛出异常。 拿上面的代码来修改一下。

def mygen(n):
    now = 0
    while now < n:
        yield now
        now += 1
    raise StopIteration

if __name__ == '__main__':
    gen = mygen(2)
    next(gen)
    next(gen)
    next(gen)

# 过渡到协程:yield

通过上面的介绍,我们知道生成器为我们引入了暂停函数执行(yield)的功能。

当有了暂停的功能之后,人们就想能不能在生成器暂停的时候向其发送一点东西(其实上面也有提及:send(None))。这种向暂停的生成器发送信息的功能通过 PEP 342 进入 Python 2.5 中,并催生了 Python协程的诞生。根据 wikipedia 中的定义:协程是为非抢占式多任务产生子程序的计算机程序组件,协程允许不同入口点在不同位置暂停或开始执行程序。

以非抢占多任务为关键字再理解理解协程两个字。

注意从本质上而言,协程并不属于语言中的概念,而是编程模型上的概念。

协程和线程,有相似点,多个协程之间和线程一样,只会交叉串行执行;也有不同点,线程之间要频繁进行切换,加锁,解锁,从复杂度和效率来看,和协程相比,这确是一个痛点。协程通过使用 yield 暂停生成器,可以将程序的执行流程交给其他的子程序,从而实现不同子程序的之间的交替执行,由于切换在用户态执行并且不同协程之间一般是没有锁的,因此效率较线程会好很多。

下面通过一个简明的演示来看看,如何向生成器中发送消息。

def jumping_range(N):
    index = 0
    while index < N:
        # 通过send()发送的信息将赋值给jump
        jump = yield index
        if jump is None:
            jump = 1
        index += jump

if __name__ == '__main__':
    itr = jumping_range(5)
    print(next(itr))
    print(itr.send(2))
    print(next(itr))
    print(itr.send(-1))

输出。

0
2
3
2

这里解释下为什么这么输出。 重点是jump = yield index这个语句。

分成两部分: - yield index 是将index return给外部调用程序。 - jump = yield 可以接收外部程序通过send()发送的信息,并赋值给jump

# 协程的意义

在python中,由于GIL的存在,多线程并不能发挥出多核CPU的性能,而协程的出现,恰好可以解决这个问题,它的特点有:

python3.14版本时已经官宣要逐步解决GIL的问题了

  1. 协程是在单线程里实现任务的切换的
  2. 利用同步的方式去实现异步
  3. 不再需要锁,提高了并发性能

举个例子。 假如我们做一个爬虫。我们要爬取多个网页,这里简单举例两个网页(两个spider函数),获取HTML(耗IO耗时),然后再对HTML对行解析取得我们感兴趣的数据。

我们的代码结构精简如下:

def spider_01(url):
    html = get_html(url)
    ...
    data = parse_html(html)

def spider_02(url):
    html = get_html(url)
    ...
    data = parse_html(html)

我们都知道,get_html()等待返回网页是非常耗IO的,一个网页还好,如果我们爬取的网页数据极其庞大,这个等待时间就非常惊人,是极大的浪费。

聪明的程序员,当然会想如果能在get_html()这里暂停一下,不用傻乎乎地去等待网页返回,而是去做别的事。等过段时间再回过头来到刚刚暂停的地方,接收返回的html内容,然后还可以接下去解析parse_html(html)

利用常规的方法,几乎是没办法实现如上我们想要的效果的。所以Python想得很周到,从语言本身给我们实现了这样的功能,这就是yield语法。可以实现在某一函数中暂停的效果。

试着思考一下,假如没有协程,我们要写一个并发程序。可能有以下问题 1. 使用最常规的同步编程要实现异步并发效果并不理想,或者难度极高。 2. 由于GIL锁的存在,多线程的运行需要频繁的加锁解锁,切换线程,这极大地降低了并发性能;

而协程的出现,刚好可以解决以上的问题。

# 协程核心:yield from

yield from 是在Python3.3才出现的语法。所以这个特性在Python2中是没有的。

yield from 后面需要加的是可迭代对象,它可以是普通的可迭代对象,也可以是迭代器,甚至是生成器。

我们可以用一个使用yield和一个使用yield from的例子来对比看下他们之间的异同。

使用yield

# 字符串
astr='ABC'
# 列表
alist=[1,2,3]
# 字典
adict={"name":"wangbm","age":18}
# 生成器
agen=(i for i in range(4,8))

def gen(*args, **kw):
    for item in args:
        for i in item:
            yield i

new_list=gen(astr, alist, adict, agen)
print(list(new_list))
# ['A', 'B', 'C', 1, 2, 3, 'name', 'age', 4, 5, 6, 7]

使用yield from

# 字符串
astr='ABC'
# 列表
alist=[1,2,3]
# 字典
adict={"name":"wangbm","age":18}
# 生成器
agen=(i for i in range(4,8))

def gen(*args, **kw):
    for item in args:
        yield from item

new_list=gen(astr, alist, adict, agen)
print(list(new_list))
# ['A', 'B', 'C', 1, 2, 3, 'name', 'age', 4, 5, 6, 7]

由上面两种方式对比,可以看出,yield from后面加上可迭代对象,他可以把可迭代对象里的每个元素一个一个的yield出来,对比yield来说代码更加简洁,结构更加清晰。

简单的看确实yield from 做的事情只是把可迭代对象里的每个元素一个一个的yield出来,但其实质上起到了一个代理中介 或者 管道 的作用。

继续讲解之前,首先明确几个概念:

  1. 调用方:调用委派生成器的客户端(调用方)代码
  2. 委托生成器:包含yield from表达式的生成器函数
  3. 子生成器:yield from后面加的生成器函数

下面的例子是实现实时计算平均值的。 比如,第一次传入10,那返回平均数自然是10. 第二次传入20,那返回平均数是(10+20)/2=15 第三次传入30,那返回平均数(10+20+30)/3=20

# 子生成器
def average_gen():
    total = 0
    count = 0
    average = 0
    while True:
        new_num = yield average
        count += 1
        total += new_num
        average = total/count

# 委托生成器
def proxy_gen():
    while True:
        yield from average_gen()

# 调用方
def main():
    calc_average = proxy_gen()
    next(calc_average)            # 预激下生成器
    print(calc_average.send(10))  # 打印:10.0
    print(calc_average.send(20))  # 打印:15.0
    print(calc_average.send(30))  # 打印:20.0

if __name__ == '__main__':
    main()

仔细研究代码,调用方可以通过send()直接发送消息给子生成器,而子生成器yield的值,也是直接返回给调用方,而不需要在中间做任何处理,这就是yield from的作用 - 在调用方与子生成器之间建立一个双向通道

你可能会经常看到有些代码,还可以在yield from前面看到可以赋值,但这实际上也不会拦截任何值,例子如下:

# 子生成器
def average_gen():
    total = 0
    count = 0
    average = 0
    while True:
        new_num = yield average
        if new_num is None:
            break
        count += 1
        total += new_num
        average = total/count

    # 每一次return,都意味着当前协程结束。
    return total,count,average

# 委托生成器
def proxy_gen():
    while True:
        # 只有子生成器要结束(return)了,yield from左边的变量才会被赋值,后面的代码才会执行。
        total, count, average = yield from average_gen()
        print("计算完毕!!\n总共传入 {} 个数值, 总和:{},平均数:{}".format(count, total, average))

# 调用方
def main():
    calc_average = proxy_gen()
    next(calc_average)            # 预激协程
    print(calc_average.send(10))  # 打印:10.0
    print(calc_average.send(20))  # 打印:15.0
    print(calc_average.send(30))  # 打印:20.0
    calc_average.send(None)      # 结束协程
    # 如果此处再调用calc_average.send(10),由于上一协程已经结束,将重开一协程

if __name__ == '__main__':
    main()

运行后,输出

10.0
15.0
20.0
计算完毕!!
总共传入 3 个数值, 总和:60,平均数:20.0

# yield from出现的意义

学到这里,我相信你肯定要问,既然委托生成器,起到的只是一个双向通道的作用,我还需要委托生成器做什么?我调用方直接调用子生成器不就好啦?

简单来说,yield from帮我们在背后做了很多处理。

例如stopiteration异常:

# 子生成器
# 子生成器
def average_gen():
    total = 0
    count = 0
    average = 0
    while True:
        new_num = yield average
        if new_num is None:
            break
        count += 1
        total += new_num
        average = total/count
    return total,count,average

# 调用方
def main():
    calc_average = average_gen()
    next(calc_average)            # 预激协程
    print(calc_average.send(10))  # 打印:10.0
    print(calc_average.send(20))  # 打印:15.0
    print(calc_average.send(30))  # 打印:20.0

    # ----------------注意-----------------
    try:
        calc_average.send(None)
    except StopIteration as e:
        total, count, average = e.value
        print("计算完毕!!\n总共传入 {} 个数值, 总和:{},平均数:{}".format(count, total, average))
    # ----------------注意-----------------

if __name__ == '__main__':
    main()

下面是yield from语法在CPython中的源码实现:

#一些说明
"""
_i:子生成器,同时也是一个迭代器
_y:子生成器生产的值
_r:yield from 表达式最终的值
_s:调用方通过send()发送的值
_e:异常对象
"""

_i = iter(EXPR)

try:
    _y = next(_i)
except StopIteration as _e:
    _r = _e.value

else:
    while 1:
        try:
            _s = yield _y
        except GeneratorExit as _e:
            try:
                _m = _i.close
            except AttributeError:
                pass
            else:
                _m()
            raise _e
        except BaseException as _e:
            _x = sys.exc_info()
            try:
                _m = _i.throw
            except AttributeError:
                raise _e
            else:
                try:
                    _y = _m(*_x)
                except StopIteration as _e:
                    _r = _e.value
                    break
        else:
            try:
                if _s is None:
                    _y = next(_i)
                else:
                    _y = _i.send(_s)
            except StopIteration as _e:
                _r = _e.value
                break
RESULT = _r

以上的代码,稍微有点复杂,有兴趣的同学可以结合以下说明去研究看看。

  1. 迭代器(即可指子生成器)产生的值直接返还给调用者
  2. 任何使用send()方法发给委派生产器(即外部生产器)的值被直接传递给迭代器。如果send值是None,则调用迭代器next()方法;如果不为None,则调用迭代器的send()方法。如果对迭代器的调用产生StopIteration异常,委派生产器恢复继续执行yield from后面的语句;若迭代器产生其他任何异常,则都传递给委派生产器。
  3. 子生成器可能只是一个迭代器,并不是一个作为协程的生成器,所以它不支持.throw()和.close()方法,即可能会产生AttributeError 异常。
  4. 除了GeneratorExit 异常外的其他抛给委派生产器的异常,将会被传递到迭代器的throw()方法。如果迭代器throw()调用产生了StopIteration异常,委派生产器恢复并继续执行,其他异常则传递给委派生产器。
  5. 如果GeneratorExit异常被抛给委派生产器,或者委派生产器的close()方法被调用,如果迭代器有close()的话也将被调用。如果close()调用产生异常,异常将传递给委派生产器。否则,委派生产器将抛出GeneratorExit 异常。
  6. 当迭代器结束并抛出异常时,yield from表达式的值是其StopIteration 异常中的第一个参数。
  7. 一个生成器中的return expr语句将会从生成器退出并抛出 StopIteration(expr)异常。

没兴趣看的同学,只要知道,yield from帮我们做了很多的异常处理,而且全面,而这些如果我们要自己去实现的话,一个是编写代码难度增加,写出来的代码可读性极差,这些我们就不说了,最主要的是很可能有遗漏,只要哪个异常没考虑到,都有可能导致程序崩溃什么的。

# 新语法:await

await 语法首先被应用在C#语言中,在Python3.5版本被引入python标准,同时引入的还有async和future。

我们可以简单的认为 await 就是 yield from 的缩写,虽然两者在语法上不兼容,但背后处理的逻辑却差不多。

# asyncio原理简介

有了yield 语法尤其是yield from语法后,各种协程库实现都慢慢出现,其中比较有名的有asyncio和gevent。这里我们尝试去简单描述一下asyncio的实现原理。

注意,这里的基本原理仅供理解,会省略甚至修改不少实现细节。

以下面的例子进行原理的解释:

import asyncio as mini_asyncio

import time
import random

async def hi(msg, sec):
    print(f"enter hi({msg}, {sec}), {time.strftime('%H:%M:%S')}")
    await mini_asyncio.sleep(sec)
    print(f"exit  hi({msg}, {sec}), {time.strftime('%H:%M:%S')}")
    return f'{msg}-{sec}'

async def main():
    print(f"main() begin at {time.strftime('%H:%M:%S')}")
    tasks = []
    for i in range(1, 5):
        t = mini_asyncio.create_task(hi(i, random.randint(1, 3))) # 四个task会被创建并在创建时扔到_ready队列中
                                                        # 实际被加到_ready中的是Handle类包裹的task的_step方法
        tasks.append(t)
    
    for t in tasks:
        print(f"main await at {time.strftime('%H:%M:%S')}")
        b = await t
        print(f"b is: {b}")
    
    print(f"main() end at {time.strftime('%H:%M:%S')}")

mini_asyncio.run(main())

"""

main() begin at 19:56:22
main await at 19:56:22
enter hi(1, 2), 19:56:22
enter hi(2, 2), 19:56:22
enter hi(3, 3), 19:56:22
enter hi(4, 1), 19:56:22
exit  hi(4, 1), 19:56:23
exit  hi(1, 2), 19:56:24   
exit  hi(2, 2), 19:56:24
b is: 1-2
main await at 19:56:24
b is: 2-2
main await at 19:56:24
exit  hi(3, 3), 19:56:25
b is: 3-3
main await at 19:56:25
b is: 4-1
main() end at 19:56:25
"""

  1. 事件循环是asyncio的绝对核心,其负责所有任务的管理和调度
  2. 每个任务可以认为是对一个协程的封装
  3. 当await 一个协程或者一个任务时 等同于执行yield from 或者 send(None), 相当于进入生成器内部执行一步
  4. 当任务await一个主协程并且这个子协程yield一个future对象时,会把主协程设置为这个future对象的callback-在future对象执行完(一般都是有结果)之后就会自动返回来执行主协程
  5. 协程利用了操作系统提供的IO多路复用机制,当真的有一个仅仅需要等待而不需要CPU参与的活(例如等待网络请求)出现时,协程库会把这个事件(网络请求到达)注册到操作系统,并在自己某一个字典里标记这个事件的回调是当前这个任务,在事件触发前都不会再看当前这个任务
  6. 操作系统提供的IO多路复用机制有select、poll、epoll等,实现的功能相同但效率和实现方式不同,不细说
  7. 事件循环本身是一个无限循环,每次循环开始时都会检查自己注册到操作系统的事件是否有已经触发的,如果有,则把其回调放进待执行队列中
  8. 然后事件循环一次执行待执行队列中的任务