如何使用 trio 构建开始和结束同步调用?
how to structure beginning and ending synchronous calls using trio?
我的要求是结构化的三重奏伪代码(实际的三重奏函数调用,但虚拟工作人员在此处填写),因此我可以理解并尝试在同步之间切换的良好流程控制实践和异步进程。
我想执行以下操作...
- 将 json-data 的文件加载到数据字典中
- 旁白:数据字典看起来像 { 'key_a': {(info_dict_a)}, 'key_b': {info_dict_b} }
- 让 n 名工人中的每一个...
- 访问该数据字典以查找下一个要处理的记录信息字典
- 从正在处理的记录中准备一些数据并post将数据url
- 处理 post-响应以更新正在处理记录的信息字典中的 'response' 键
- 用密钥的信息字典更新数据字典
- 用更新后的data-dict覆盖json-data的原始文件
旁白:我知道除了笨拙地重复重写 json 文件之外,还有其他方法可以实现我的总体目标——但我不要求输入;我真的很想充分理解 trio,以便能够将其用于 this 流程。
所以,我想要同步的进程:
- 获取下一条记录到进程的信息字典
- 更新数据字典
- 用更新后的数据字典覆盖json-data的原始文件
trio 的新手,我有工作代码 here ...我相信 是 同步获取下一个要处理的记录(通过使用 trio.Semaphore() 技术)。但我很确定我不会同步保存文件。
几年前学习 Go,我觉得我已经掌握了交织同步和异步调用的方法——但我还没有掌握三重奏。提前致谢。
以下是我编写(伪)代码的方式:
async def process_file(input_file):
# load the file synchronously
with open(input_file) as fd:
data = json.load(fd)
# iterate over your dict asynchronously
async with trio.open_nursery() as nursery:
for key, sub in data.items():
if sub['updated'] is None:
sub['updated'] = 'in_progress'
nursery.start_soon(post_update, {key: sub})
# save your result json synchronously
save_file(data, input_file)
trio
向您保证,一旦您退出 async with
块,您生成的每个 任务 都会完成,因此您可以安全地保存您的文件,因为不再更新将会发生。
我还删除了 grab_next_entry
函数,因为在我看来,这个函数将在每次调用时(递增地)迭代相同的键(给出 O(n!))复杂度,而您可以简化它只需迭代一次你的字典(将复杂度降低到 O(n))
您也不需要 Semaphore
,除非您想限制并行 post_update
调用的数量。但是 trio
也为此提供了一个内置机制,这要归功于它的 CapacityLimiter,您可以像这样使用它:
limit = trio.CapacityLimiter(10)
async with trio.open_nursery() as nursery:
async with limit:
for x in z:
nursery.start_soon(func, x)
更新感谢 @njsmith 的评论
所以,为了限制并发量post_update
你会这样重写:
async def post_update(data, limit):
async with limit:
...
然后你可以像这样重写之前的循环:
limit = trio.CapacityLimiter(10)
# iterate over your dict asynchronously
async with trio.open_nursery() as nursery:
for key, sub in data.items():
if sub['updated'] is None:
sub['updated'] = 'in_progress'
nursery.start_soon(post_update, {key: sub}, limit)
这样,我们会为您的 data-dict 中的 n 个条目生成 n 个任务,但是如果有超过10 个任务 运行 并发,然后额外的任务将不得不等待释放限制(在 async with limit
块的末尾)。
此代码使用通道来复用工作人员池中的请求。我发现 (在您的代码注释中)post-response 速率受到限制,因此 read_entries
在每个 send
.
后休眠
from random import random
import time, asks, trio
snd_input, rcv_input = trio.open_memory_channel(0)
snd_output, rcv_output = trio.open_memory_channel(0)
async def read_entries():
async with snd_input:
for key_entry in range(10):
print("reading", key_entry)
await snd_input.send(key_entry)
await trio.sleep(1)
async def work(n):
async for key_entry in rcv_input:
print(f"w{n} {time.monotonic()} posting", key_entry)
r = await asks.post(f"https://httpbin.org/delay/{5 * random()}")
await snd_output.send((r.status_code, key_entry))
async def save_entries():
async for entry in rcv_output:
print("saving", entry)
async def main():
async with trio.open_nursery() as nursery:
nursery.start_soon(read_entries)
nursery.start_soon(save_entries)
async with snd_output:
async with trio.open_nursery() as workers:
for n in range(3):
workers.start_soon(work, n)
trio.run(main)
我的要求是结构化的三重奏伪代码(实际的三重奏函数调用,但虚拟工作人员在此处填写),因此我可以理解并尝试在同步之间切换的良好流程控制实践和异步进程。
我想执行以下操作...
- 将 json-data 的文件加载到数据字典中
- 旁白:数据字典看起来像 { 'key_a': {(info_dict_a)}, 'key_b': {info_dict_b} }
- 让 n 名工人中的每一个...
- 访问该数据字典以查找下一个要处理的记录信息字典
- 从正在处理的记录中准备一些数据并post将数据url
- 处理 post-响应以更新正在处理记录的信息字典中的 'response' 键
- 用密钥的信息字典更新数据字典
- 用更新后的data-dict覆盖json-data的原始文件
旁白:我知道除了笨拙地重复重写 json 文件之外,还有其他方法可以实现我的总体目标——但我不要求输入;我真的很想充分理解 trio,以便能够将其用于 this 流程。
所以,我想要同步的进程:
- 获取下一条记录到进程的信息字典
- 更新数据字典
- 用更新后的数据字典覆盖json-data的原始文件
trio 的新手,我有工作代码 here ...我相信 是 同步获取下一个要处理的记录(通过使用 trio.Semaphore() 技术)。但我很确定我不会同步保存文件。
几年前学习 Go,我觉得我已经掌握了交织同步和异步调用的方法——但我还没有掌握三重奏。提前致谢。
以下是我编写(伪)代码的方式:
async def process_file(input_file):
# load the file synchronously
with open(input_file) as fd:
data = json.load(fd)
# iterate over your dict asynchronously
async with trio.open_nursery() as nursery:
for key, sub in data.items():
if sub['updated'] is None:
sub['updated'] = 'in_progress'
nursery.start_soon(post_update, {key: sub})
# save your result json synchronously
save_file(data, input_file)
trio
向您保证,一旦您退出 async with
块,您生成的每个 任务 都会完成,因此您可以安全地保存您的文件,因为不再更新将会发生。
我还删除了 grab_next_entry
函数,因为在我看来,这个函数将在每次调用时(递增地)迭代相同的键(给出 O(n!))复杂度,而您可以简化它只需迭代一次你的字典(将复杂度降低到 O(n))
您也不需要 Semaphore
,除非您想限制并行 post_update
调用的数量。但是 trio
也为此提供了一个内置机制,这要归功于它的 CapacityLimiter,您可以像这样使用它:
limit = trio.CapacityLimiter(10)
async with trio.open_nursery() as nursery:
async with limit:
for x in z:
nursery.start_soon(func, x)
更新感谢 @njsmith 的评论
所以,为了限制并发量post_update
你会这样重写:
async def post_update(data, limit):
async with limit:
...
然后你可以像这样重写之前的循环:
limit = trio.CapacityLimiter(10)
# iterate over your dict asynchronously
async with trio.open_nursery() as nursery:
for key, sub in data.items():
if sub['updated'] is None:
sub['updated'] = 'in_progress'
nursery.start_soon(post_update, {key: sub}, limit)
这样,我们会为您的 data-dict 中的 n 个条目生成 n 个任务,但是如果有超过10 个任务 运行 并发,然后额外的任务将不得不等待释放限制(在 async with limit
块的末尾)。
此代码使用通道来复用工作人员池中的请求。我发现 read_entries
在每个 send
.
from random import random
import time, asks, trio
snd_input, rcv_input = trio.open_memory_channel(0)
snd_output, rcv_output = trio.open_memory_channel(0)
async def read_entries():
async with snd_input:
for key_entry in range(10):
print("reading", key_entry)
await snd_input.send(key_entry)
await trio.sleep(1)
async def work(n):
async for key_entry in rcv_input:
print(f"w{n} {time.monotonic()} posting", key_entry)
r = await asks.post(f"https://httpbin.org/delay/{5 * random()}")
await snd_output.send((r.status_code, key_entry))
async def save_entries():
async for entry in rcv_output:
print("saving", entry)
async def main():
async with trio.open_nursery() as nursery:
nursery.start_soon(read_entries)
nursery.start_soon(save_entries)
async with snd_output:
async with trio.open_nursery() as workers:
for n in range(3):
workers.start_soon(work, n)
trio.run(main)