在 python 中实现类似 golang 的通道结构

Implementing a golang like channels structure in python

我试图在 python 中实现像 channel 这样的 golang。我使用 asyncio 包和一些简单的生成器来实现这个。

import asyncio


def channel(capacity):
    # A very lazy channel implementation
    size = 0
    data = []
    while size < capacity:
        item = yield
        data.append(item)
        size += 1
    for item in data:
        yield item


async def worker(id:int, jobs:channel, results:channel):
    print(f"worker : {id} started")
    await asyncio.sleep(0)
    for job in jobs:
        print(f"worker : {id} received job {job}")
        result = job * job
        await asyncio.sleep(1)
        results.send(result)


async def main():
    jobs = channel(10)
    results = channel(10)
    # priming jobs and results channel
    next(jobs)
    next(results)
    for i in range(10):
        jobs.send(i)
    await worker(1, jobs, results)
    await worker(2, jobs, results)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

即使代码看起来 运行,第二个工人在第一个工人消耗完所有工作之前不会产卵。

我添加了 asyncio.sleep(0) 以强制上下文切换,但第一个 worker 仍然没有将控制权返回给主协程。

我在这里做错了什么?跟channel generator有关系吗?

结果:

worker : 1 started
worker : 1 received job 1
worker : 1 received job 2
worker : 1 received job 3
worker : 1 received job 4
worker : 1 received job 5
worker : 1 received job 6
worker : 1 received job 7
worker : 1 received job 8
worker : 1 received job 9
worker : 2 started

当你这样做时:

    await worker(1, jobs, results)

你正在等待这个协程结束,当它结束时执行下一个(一旦通道为空)。所以第一个工人消耗了所有的物品。

要解决这个问题,您必须 运行 将两个协同程序放在一起。

使用gather,将所有协程转换为任务并等待它们全部完成:

    # with gather
    await asyncio.gather(
        worker(1, jobs, results),
        worker(2, jobs, results)
    )

tasks(来自文档):

Wrap the coro coroutine into a Task and schedule its execution. Return the Task object.

然后等待任务。

    task_1 = asyncio.create_task(worker(1, jobs, results))
    task_2 = asyncio.create_task(worker(2, jobs, results))

    await task_1
    await task_2

也不要创建您的频道,使用 asyncio.Queuemaxsize 作为频道。