如何使用 trio 构建开始和结束同步调用?

how to structure beginning and ending synchronous calls using trio?

我的要求是结构化的三重奏伪代码(实际的三重奏函数调用,但虚拟工作人员在此处填写),因此我可以理解并尝试在同步之间切换的良好流程控制实践和异步进程。

我想执行以下操作...

旁白:我知道除了笨拙地重复重写 json 文件之外,还有其他方法可以实现我的总体目标——但我不要求输入;我真的很想充分理解 trio,以便能够将其用于 this 流程。

所以,我想要同步的进程:

trio 的新手,我有工作代码 here ...我相信 同步获取下一个要处理的记录(通过使用 trio.Semaphore() 技术)。但我很确定我不会同步保存文件。

几年前学习 Go,我觉得我已经掌握了交织同步和异步调用的方法——但我还没有掌握三重奏。提前致谢。

以下是我编写(伪)代码的方式:

    async def process_file(input_file):
        # load the file synchronously
        with open(input_file) as fd:
            data = json.load(fd)

        # iterate over your dict asynchronously
        async with trio.open_nursery() as nursery:
            for key, sub in data.items():
                if sub['updated'] is None:
                    sub['updated'] = 'in_progress'
                    nursery.start_soon(post_update, {key: sub})

        # save your result json synchronously
        save_file(data, input_file)

trio 向您保证,一旦您退出 async with 块,您生成的每个 任务 都会完成,因此您可以安全地保存您的文件,因为不再更新将会发生。

我还删除了 grab_next_entry 函数,因为在我看来,这个函数将在每次调用时(递增地)迭代相同的键(给出 O(n!))复杂度,而您可以简化它只需迭代一次你的字典(将复杂度降低到 O(n))

您也不需要 Semaphore,除非您想限制并行 post_update 调用的数量。但是 trio 也为此提供了一个内置机制,这要归功于它的 CapacityLimiter,您可以像这样使用它:

    limit = trio.CapacityLimiter(10)
    async with trio.open_nursery() as nursery:
        async with limit:
            for x in z:
                nursery.start_soon(func, x)

更新感谢 @njsmith 的评论

所以,为了限制并发量post_update你会这样重写:

    async def post_update(data, limit):
        async with limit:
            ...

然后你可以像这样重写之前的循环:

    limit = trio.CapacityLimiter(10)
    # iterate over your dict asynchronously
    async with trio.open_nursery() as nursery:
        for key, sub in data.items():
            if sub['updated'] is None:
                sub['updated'] = 'in_progress'
                nursery.start_soon(post_update, {key: sub}, limit)

这样,我们会为您的 data-dict 中的 n 个条目生成 n 个任务,但是如果有超过10 个任务 运行 并发,然后额外的任务将不得不等待释放限制(在 async with limit 块的末尾)。

此代码使用通道来复用工作人员池中的请求。我发现 (在您的代码注释中)post-response 速率受到限制,因此 read_entries 在每个 send.

后休眠
from random import random    
import time, asks, trio    

snd_input, rcv_input = trio.open_memory_channel(0)
snd_output, rcv_output = trio.open_memory_channel(0)    

async def read_entries():
    async with snd_input:
        for key_entry in range(10):
            print("reading", key_entry)    
            await snd_input.send(key_entry)    
            await trio.sleep(1)    

async def work(n):
    async for key_entry in rcv_input:    
        print(f"w{n} {time.monotonic()} posting", key_entry)    
        r = await asks.post(f"https://httpbin.org/delay/{5 * random()}")
        await snd_output.send((r.status_code, key_entry))

async def save_entries():    
    async for entry in rcv_output:    
        print("saving", entry)    

async def main():    
    async with trio.open_nursery() as nursery:
        nursery.start_soon(read_entries)    
        nursery.start_soon(save_entries)    
        async with snd_output:
            async with trio.open_nursery() as workers:
                for n in range(3):
                    workers.start_soon(work, n)

trio.run(main)