如何从三重奏 ReceiveStream 一次读取一行?

How can I read one line at a time from a trio ReceiveStream?

asyncio 有 StreamReader.readline(),允许这样的东西:

while True:
    line = await reader.readline()
    ...

(我没有看到 async for 在 asyncio 中可用,但那将是明显的演变)

如何使用 trio 实现等效?

我在 trio 0.9 中没有直接看到对此有任何高级支持。我所看到的是 ReceiveStream.receive_some() 其中 returns 任意大小的二进制块;对我来说,解码并将其转换为行式的东西似乎很重要。有没有我可以使用的标准库函数或代码片段?我发现 io stdlib 模块看起来很有前途,但我看不出有什么方法可以提供 "feed" 方法。

我写完了这个。未经过适当测试(欢迎修正错误),但它似乎有效:

class LineReader:
    def __init__(self, stream):
        self.stream = stream
        self._line_generator = self.generate_lines()

    @staticmethod
    def generate_lines():
        buf = bytes()
        while True:
            newline_idx = buf.find(b'\n')
            if newline_idx < 0:
                # no b'\n' found in buf
                more_data = yield
            else:
                # b'\n' found in buf so return the line and move up buf
                line = buf[:newline_idx+1]
                buf = buf[newline_idx+1:]
                more_data = yield line

            if more_data is not None:
                buf += bytes(more_data)

    async def readline(self):
        line = next(self._line_generator)
        while line is None:
            more_data = await self.stream.receive_some(1024)
            if not more_data:
                return b''  # this is the EOF indication expected by my caller
            line = self._line_generator.send(more_data)
        return line

然后我可以用 LineReader 包装 ReceiveStream 并使用它的 readline 方法。添加 __aiter__()__anext()__ 将是微不足道的,但在我的情况下我不需要它(我正在将一些东西移植到不使用 async for 的 trio)。

另一个缺陷是它假定 UTF-8 或类似的编码,其中 b'\n' 换行符存在于未修改的编码字节对象中。

虽然依靠库函数来处理这个会很好;其他答案表示赞赏。

你说得对,目前 Trio 中没有对此的高级支持。应该有 something,虽然我不是 100% 确定它应该是什么样子。我打开an issue讨论一下

与此同时,您的实施看起来很合理。

如果你想让它更健壮,你可以 (1) 使用 bytearray 而不是 bytes 作为你的缓冲区,使附加和删除摊销 O(n) 而不是O(n^2),(2) 限制最大行长度,所以邪恶的同行不能强迫你浪费无限内存缓冲无限长的行,(3) 在最后一个停止的地方,而不是每次都从头开始,再次避免 O(n^2) 行为。 None 如果您只处理合理的行长度和行为良好的同伴,那么这一点非常重要,但也没有坏处。

这是您的代码的一个调整版本,它试图结合这三个想法:

class LineReader:
    def __init__(self, stream, max_line_length=16384):
        self.stream = stream
        self._line_generator = self.generate_lines(max_line_length)

    @staticmethod
    def generate_lines(max_line_length):
        buf = bytearray()
        find_start = 0
        while True:
            newline_idx = buf.find(b'\n', find_start)
            if newline_idx < 0:
                # no b'\n' found in buf
                if len(buf) > max_line_length:
                    raise ValueError("line too long")
                # next time, start the search where this one left off
                find_start = len(buf)
                more_data = yield
            else:
                # b'\n' found in buf so return the line and move up buf
                line = buf[:newline_idx+1]
                # Update the buffer in place, to take advantage of bytearray's
                # optimized delete-from-beginning feature.
                del buf[:newline_idx+1]
                # next time, start the search from the beginning
                find_start = 0
                more_data = yield line

            if more_data is not None:
                buf += bytes(more_data)

    async def readline(self):
        line = next(self._line_generator)
        while line is None:
            more_data = await self.stream.receive_some(1024)
            if not more_data:
                return b''  # this is the EOF indication expected by my caller
            line = self._line_generator.send(more_data)
        return line

(随意使用您喜欢的任何许可。)

我正在使用的一种非常幼稚的方法:

async def readline(stdout: trio.abc.ReceiveStream):
    data = b""
    while True:
        _data = await stdout.receive_some()
        if _data == b"":
            break
        data += _data
        if data.endswith(b"\n"):
            break
    return data

# use it like this:
async def fn():
    async with await trio.open_process(..., stdout=subprocess.PIPE) as process:
        while True:
            # instead of:
            #   data = process.stdout.receive_some()
            # use this:
            line = await readline(process.stdout)
            if line == b"":
                break