在 python 中实现类似 golang 的通道结构
Implementing a golang like channels structure in python
我试图在 python 中实现像 channel
这样的 golang。我使用 asyncio 包和一些简单的生成器来实现这个。
import asyncio
def channel(capacity):
# A very lazy channel implementation
size = 0
data = []
while size < capacity:
item = yield
data.append(item)
size += 1
for item in data:
yield item
async def worker(id:int, jobs:channel, results:channel):
print(f"worker : {id} started")
await asyncio.sleep(0)
for job in jobs:
print(f"worker : {id} received job {job}")
result = job * job
await asyncio.sleep(1)
results.send(result)
async def main():
jobs = channel(10)
results = channel(10)
# priming jobs and results channel
next(jobs)
next(results)
for i in range(10):
jobs.send(i)
await worker(1, jobs, results)
await worker(2, jobs, results)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
即使代码看起来 运行,第二个工人在第一个工人消耗完所有工作之前不会产卵。
我添加了 asyncio.sleep(0)
以强制上下文切换,但第一个 worker 仍然没有将控制权返回给主协程。
我在这里做错了什么?跟channel generator
有关系吗?
结果:
worker : 1 started
worker : 1 received job 1
worker : 1 received job 2
worker : 1 received job 3
worker : 1 received job 4
worker : 1 received job 5
worker : 1 received job 6
worker : 1 received job 7
worker : 1 received job 8
worker : 1 received job 9
worker : 2 started
当你这样做时:
await worker(1, jobs, results)
你正在等待这个协程结束,当它结束时执行下一个(一旦通道为空)。所以第一个工人消耗了所有的物品。
要解决这个问题,您必须 运行 将两个协同程序放在一起。
使用gather,将所有协程转换为任务并等待它们全部完成:
# with gather
await asyncio.gather(
worker(1, jobs, results),
worker(2, jobs, results)
)
与 tasks(来自文档):
Wrap the coro coroutine into a Task and schedule its execution. Return the Task object.
然后等待任务。
task_1 = asyncio.create_task(worker(1, jobs, results))
task_2 = asyncio.create_task(worker(2, jobs, results))
await task_1
await task_2
也不要创建您的频道,使用 asyncio.Queue 和 maxsize
作为频道。
我试图在 python 中实现像 channel
这样的 golang。我使用 asyncio 包和一些简单的生成器来实现这个。
import asyncio
def channel(capacity):
# A very lazy channel implementation
size = 0
data = []
while size < capacity:
item = yield
data.append(item)
size += 1
for item in data:
yield item
async def worker(id:int, jobs:channel, results:channel):
print(f"worker : {id} started")
await asyncio.sleep(0)
for job in jobs:
print(f"worker : {id} received job {job}")
result = job * job
await asyncio.sleep(1)
results.send(result)
async def main():
jobs = channel(10)
results = channel(10)
# priming jobs and results channel
next(jobs)
next(results)
for i in range(10):
jobs.send(i)
await worker(1, jobs, results)
await worker(2, jobs, results)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
即使代码看起来 运行,第二个工人在第一个工人消耗完所有工作之前不会产卵。
我添加了 asyncio.sleep(0)
以强制上下文切换,但第一个 worker 仍然没有将控制权返回给主协程。
我在这里做错了什么?跟channel generator
有关系吗?
结果:
worker : 1 started
worker : 1 received job 1
worker : 1 received job 2
worker : 1 received job 3
worker : 1 received job 4
worker : 1 received job 5
worker : 1 received job 6
worker : 1 received job 7
worker : 1 received job 8
worker : 1 received job 9
worker : 2 started
当你这样做时:
await worker(1, jobs, results)
你正在等待这个协程结束,当它结束时执行下一个(一旦通道为空)。所以第一个工人消耗了所有的物品。
要解决这个问题,您必须 运行 将两个协同程序放在一起。
使用gather,将所有协程转换为任务并等待它们全部完成:
# with gather
await asyncio.gather(
worker(1, jobs, results),
worker(2, jobs, results)
)
与 tasks(来自文档):
Wrap the coro coroutine into a Task and schedule its execution. Return the Task object.
然后等待任务。
task_1 = asyncio.create_task(worker(1, jobs, results))
task_2 = asyncio.create_task(worker(2, jobs, results))
await task_1
await task_2
也不要创建您的频道,使用 asyncio.Queue 和 maxsize
作为频道。