如果接收速度太快(缓冲问题?)

outReceived from twisted ProcessProtocol merges messages if received too fast (buffering problem?)

我正在使用 Klein,一个基于 twisted 的微型网络框架。我有一个服务器(运行 on windows!),它将通过 reactor.spawnProcess() 产生一个外部长 运行 进程(端到端测试)。 为了发送关于 运行 测试的状态信息,我实现了一个 ProcessProtocol:

class IPCProtocol(protocol.ProcessProtocol):
    def __init__(self, status: 'Status', history: 'History'):
        super().__init__()
        self.status: Status = status
        self.history: History = history
        self.pid = None

    def connectionMade(self):
        self.pid = self.transport.pid
        log.msg("process started, pid={}".format(self.pid))

    def processExited(self, reason):
        log.msg("process exited, status={}".format(reason.value.exitCode))
        # add current run to history
        self.history.add(self.status.current_run)
        # create empty testrun and save status
        self.status.current_run = Testrun()
        self.status.status = StatusEnum.ready
        self.status.save()
        # check for more queue items
        if not self.status.queue.is_empty():
            start_testrun()

    def outReceived(self, data: bytes):
        data = data.decode('utf-8').strip()
        if data.startswith(constants.LOG_PREFIX_FAILURE):
            self.failureReceived()
        if data.startswith(constants.LOG_PREFIX_SERVER):
            data = data[len(constants.LOG_PREFIX_SERVER):]
            log.msg("Testrunner: " + data)
            self.serverMsgReceived(data)

我使用以下命令启动该过程:

ipc_protocol = IPCProtocol(status=app.status, history=app.history)
args = [sys.executable, 'testrunner.py', next_entry.suite, json.dumps(next_entry.testscripts)]
log.msg("Starting testrunn.py with args: {}".format(args))
reactor.spawnProcess(ipc_protocol, sys.executable, args=args)

为了发送信息,我只是在 testrunner.py.

中打印消息(使用前缀来区分它们)

问题是,如果我将打印命令发送到快速,那么 outReceived 将合并消息。

我已经尝试在外部进程中为 print() 调用添加 flush=True,但这并没有解决问题。其他一些问题建议对 spawnProcess 使用 usePTY=True,但这在 windows 下不受支持。 有没有比为每个 print() 调用添加一个小延迟(如 time.sleep(0.1))更好的方法来解决这个问题?

你没说,但看起来子进程向其标准输出写入了行。 如果要对这些行进行操作,则需要解析输出以找到行边界。

您可以使用 LineOnlyReceiver 来帮助您。由于进程不是流传输,因此您不能直接使用 LineOnlyReceiver。您必须使其适应流程协议接口。您可以自己完成此操作,也可以使用 ProcessEndpoint(而不是 spawnProcess)为您完成。

例如:

from twisted.protocols.basic import LineOnlyReceiver
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import ProcessEndpoint
from twisted.internet import reactor

endpoint = ProcessEndpoint(reactor, b"/some/some-executable", ...)
spawning_deferred = endpoint.connect(Factory.forProtocol(LineOnlyReceiver))
...