如果接收速度太快(缓冲问题?)
outReceived from twisted ProcessProtocol merges messages if received too fast (buffering problem?)
我正在使用 Klein,一个基于 twisted 的微型网络框架。我有一个服务器(运行 on windows!),它将通过 reactor.spawnProcess() 产生一个外部长 运行 进程(端到端测试)。
为了发送关于 运行 测试的状态信息,我实现了一个 ProcessProtocol:
class IPCProtocol(protocol.ProcessProtocol):
def __init__(self, status: 'Status', history: 'History'):
super().__init__()
self.status: Status = status
self.history: History = history
self.pid = None
def connectionMade(self):
self.pid = self.transport.pid
log.msg("process started, pid={}".format(self.pid))
def processExited(self, reason):
log.msg("process exited, status={}".format(reason.value.exitCode))
# add current run to history
self.history.add(self.status.current_run)
# create empty testrun and save status
self.status.current_run = Testrun()
self.status.status = StatusEnum.ready
self.status.save()
# check for more queue items
if not self.status.queue.is_empty():
start_testrun()
def outReceived(self, data: bytes):
data = data.decode('utf-8').strip()
if data.startswith(constants.LOG_PREFIX_FAILURE):
self.failureReceived()
if data.startswith(constants.LOG_PREFIX_SERVER):
data = data[len(constants.LOG_PREFIX_SERVER):]
log.msg("Testrunner: " + data)
self.serverMsgReceived(data)
我使用以下命令启动该过程:
ipc_protocol = IPCProtocol(status=app.status, history=app.history)
args = [sys.executable, 'testrunner.py', next_entry.suite, json.dumps(next_entry.testscripts)]
log.msg("Starting testrunn.py with args: {}".format(args))
reactor.spawnProcess(ipc_protocol, sys.executable, args=args)
为了发送信息,我只是在 testrunner.py.
中打印消息(使用前缀来区分它们)
问题是,如果我将打印命令发送到快速,那么 outReceived
将合并消息。
我已经尝试在外部进程中为 print()
调用添加 flush=True
,但这并没有解决问题。其他一些问题建议对 spawnProcess 使用 usePTY=True
,但这在 windows 下不受支持。
有没有比为每个 print()
调用添加一个小延迟(如 time.sleep(0.1)
)更好的方法来解决这个问题?
你没说,但看起来子进程向其标准输出写入了行。
如果要对这些行进行操作,则需要解析输出以找到行边界。
您可以使用 LineOnlyReceiver
来帮助您。由于进程不是流传输,因此您不能直接使用 LineOnlyReceiver
。您必须使其适应流程协议接口。您可以自己完成此操作,也可以使用 ProcessEndpoint
(而不是 spawnProcess
)为您完成。
例如:
from twisted.protocols.basic import LineOnlyReceiver
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import ProcessEndpoint
from twisted.internet import reactor
endpoint = ProcessEndpoint(reactor, b"/some/some-executable", ...)
spawning_deferred = endpoint.connect(Factory.forProtocol(LineOnlyReceiver))
...
我正在使用 Klein,一个基于 twisted 的微型网络框架。我有一个服务器(运行 on windows!),它将通过 reactor.spawnProcess() 产生一个外部长 运行 进程(端到端测试)。 为了发送关于 运行 测试的状态信息,我实现了一个 ProcessProtocol:
class IPCProtocol(protocol.ProcessProtocol):
def __init__(self, status: 'Status', history: 'History'):
super().__init__()
self.status: Status = status
self.history: History = history
self.pid = None
def connectionMade(self):
self.pid = self.transport.pid
log.msg("process started, pid={}".format(self.pid))
def processExited(self, reason):
log.msg("process exited, status={}".format(reason.value.exitCode))
# add current run to history
self.history.add(self.status.current_run)
# create empty testrun and save status
self.status.current_run = Testrun()
self.status.status = StatusEnum.ready
self.status.save()
# check for more queue items
if not self.status.queue.is_empty():
start_testrun()
def outReceived(self, data: bytes):
data = data.decode('utf-8').strip()
if data.startswith(constants.LOG_PREFIX_FAILURE):
self.failureReceived()
if data.startswith(constants.LOG_PREFIX_SERVER):
data = data[len(constants.LOG_PREFIX_SERVER):]
log.msg("Testrunner: " + data)
self.serverMsgReceived(data)
我使用以下命令启动该过程:
ipc_protocol = IPCProtocol(status=app.status, history=app.history)
args = [sys.executable, 'testrunner.py', next_entry.suite, json.dumps(next_entry.testscripts)]
log.msg("Starting testrunn.py with args: {}".format(args))
reactor.spawnProcess(ipc_protocol, sys.executable, args=args)
为了发送信息,我只是在 testrunner.py.
中打印消息(使用前缀来区分它们)问题是,如果我将打印命令发送到快速,那么 outReceived
将合并消息。
我已经尝试在外部进程中为 print()
调用添加 flush=True
,但这并没有解决问题。其他一些问题建议对 spawnProcess 使用 usePTY=True
,但这在 windows 下不受支持。
有没有比为每个 print()
调用添加一个小延迟(如 time.sleep(0.1)
)更好的方法来解决这个问题?
你没说,但看起来子进程向其标准输出写入了行。 如果要对这些行进行操作,则需要解析输出以找到行边界。
您可以使用 LineOnlyReceiver
来帮助您。由于进程不是流传输,因此您不能直接使用 LineOnlyReceiver
。您必须使其适应流程协议接口。您可以自己完成此操作,也可以使用 ProcessEndpoint
(而不是 spawnProcess
)为您完成。
例如:
from twisted.protocols.basic import LineOnlyReceiver
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import ProcessEndpoint
from twisted.internet import reactor
endpoint = ProcessEndpoint(reactor, b"/some/some-executable", ...)
spawning_deferred = endpoint.connect(Factory.forProtocol(LineOnlyReceiver))
...