如何在两个 python 程序之间发送、缓冲和接收 Python 对象?
How to send, buffer, and receive Python objects between two python programs?
我有以下场景。当每个数据流过时,数据(在这种情况下为数据包)由 Python 函数实时接收处理。所以每个数据都被接收并翻译成一个 python 对象。对该对象进行了轻量级算法,returns 输出(小字典)。然后丢弃该对象并处理下一个对象。我有那个程序 运行。
现在,算法将为每个对象生成一个小的输出数据字典。该词典需要通过单独的第二种算法进行处理(也是实时处理)。我设想我的代码 运行 两个进程。对于第一个输出,我需要第二个过程 "listen"。
那么我如何在 python 中编写第二个算法,以便它可以侦听并接受第一个算法生成的数据?举一个具体的例子,假设第一个算法应用时间戳,然后传递给缓冲区,第二个算法侦听——它从缓冲区中获取并处理它。如果缓冲区中没有任何内容,那么一旦出现它就会处理它。
如果您需要使用不同的进程(而不是在单个进程中使用多个函数),也许消息队列适合您?因此,您的第一个进程将执行它所做的任何操作,并将结果放入您的第二个进程正在侦听的消息队列中。
显然有很多选项可用,但根据您的描述,这听起来是一种合理的方法。
我知道这应该是一条评论,因为它并没有真正回答你的问题,而是指出了一个替代方案,但我无法将其放入评论中,所以这里是:
在进程之外构建管道看起来是一个干净的解决方案,但有一些问题需要考虑:
使用IPC/消息队列解决方案时出现的问题
如果您不使用共享内存,则必须在进程之间将数据作为消息传递。这意味着每个进程都必须反序列化它的输入并序列化它的输出,只是为了对中间结构进行简单的转换。你只是在忙于序列化和反序列化工作,这会损失很多效率。
假设您在瀑布模型中设计系统(第一个进程将数据传递给第二个进程,第二个进程将数据传递给第三个进程,等等),如果链中某处有单个进程死亡,你的整个管道中断。所以它不一定比单进程管道更健壮(事实上,我希望它比单进程解决方案更频繁地中断)
单进程流水线
您希望管道易于调整。所以:确保你有一个内存中的数据格式,每个 "transformator" 都可以读取(例如,一个字典,其中包含一些每个转换程序都能识别的标准化属性,以知道它是否可以在上面工作。
然后将每个变换器写入它自己的模块中(这有助于使它们易于(单元)自行测试,只需很少的额外脚手架)。让您的主应用程序导入您需要的所有变换器模块,并使用变换器注册表(基本上是要调用的函数列表)注册它们的变换器。在主应用程序中,您读取输入,将其转换为内存中的字典,运行 其上所有已注册的变换器按顺序输出结果。
这仍然很容易测试和调整,因为您可以简单地更改导入和注册的转换器。
您甚至可以在进程 运行ning 期间更改配置(哪些转换器是 运行),方法是存储要在文件中导入和注册哪些转换器以及是否要调整的信息当进程正在 运行ning 时,你向它发送一个 SIGHUP(在 Unix 上,无论如何 - 在 windows 上你将不得不找到另一种方式来向进程发出信号)这将导致它重新读取它是配置文件并调整它 运行 在数据上的变换器列表。这并不难实现。
如果您不需要 运行ning 在多核上为您提供的速度,这会更高效、不太可能中断并且更容易推理和支持。
基本想法看起来有点像这样(没有所有花里胡哨的东西):
# this is transformator_a.py
def transform(data):
return do_very_cool_stuff_to(data)
# this is transformator_b.py
import random
def transform(data):
if can_i_handle_this(data):
return do_some_more_cool_stuff_to(data)
else:
return data
def can_i_handle_this(data):
# this is a very picky transformator - it's not always sure it
# can handle the data
i = random.randint(1, 5)
return i == 2
# this is the main app
import transformator_a
import transformator_b
registered_transformators = [ transformator_a.transform,
transformator_b.transform ]
while True:
data = deserialize_data_unit_from_external_input_stream()
for transform in registered_transformators:
try:
data = transform.transform(data)
except SomeExcpetionsYouWantToHandle:
# might not be fatal to the rest of the pipeline.
# check whether data is still viable,
# continue if so, exit process if not - or you might
# just drop this data unit
ensure_we_have_viable_data(data)
serialize_data_unit_to_output_stream(data)
我有以下场景。当每个数据流过时,数据(在这种情况下为数据包)由 Python 函数实时接收处理。所以每个数据都被接收并翻译成一个 python 对象。对该对象进行了轻量级算法,returns 输出(小字典)。然后丢弃该对象并处理下一个对象。我有那个程序 运行。
现在,算法将为每个对象生成一个小的输出数据字典。该词典需要通过单独的第二种算法进行处理(也是实时处理)。我设想我的代码 运行 两个进程。对于第一个输出,我需要第二个过程 "listen"。
那么我如何在 python 中编写第二个算法,以便它可以侦听并接受第一个算法生成的数据?举一个具体的例子,假设第一个算法应用时间戳,然后传递给缓冲区,第二个算法侦听——它从缓冲区中获取并处理它。如果缓冲区中没有任何内容,那么一旦出现它就会处理它。
如果您需要使用不同的进程(而不是在单个进程中使用多个函数),也许消息队列适合您?因此,您的第一个进程将执行它所做的任何操作,并将结果放入您的第二个进程正在侦听的消息队列中。
显然有很多选项可用,但根据您的描述,这听起来是一种合理的方法。
我知道这应该是一条评论,因为它并没有真正回答你的问题,而是指出了一个替代方案,但我无法将其放入评论中,所以这里是:
在进程之外构建管道看起来是一个干净的解决方案,但有一些问题需要考虑:
使用IPC/消息队列解决方案时出现的问题
如果您不使用共享内存,则必须在进程之间将数据作为消息传递。这意味着每个进程都必须反序列化它的输入并序列化它的输出,只是为了对中间结构进行简单的转换。你只是在忙于序列化和反序列化工作,这会损失很多效率。
假设您在瀑布模型中设计系统(第一个进程将数据传递给第二个进程,第二个进程将数据传递给第三个进程,等等),如果链中某处有单个进程死亡,你的整个管道中断。所以它不一定比单进程管道更健壮(事实上,我希望它比单进程解决方案更频繁地中断)
单进程流水线
您希望管道易于调整。所以:确保你有一个内存中的数据格式,每个 "transformator" 都可以读取(例如,一个字典,其中包含一些每个转换程序都能识别的标准化属性,以知道它是否可以在上面工作。
然后将每个变换器写入它自己的模块中(这有助于使它们易于(单元)自行测试,只需很少的额外脚手架)。让您的主应用程序导入您需要的所有变换器模块,并使用变换器注册表(基本上是要调用的函数列表)注册它们的变换器。在主应用程序中,您读取输入,将其转换为内存中的字典,运行 其上所有已注册的变换器按顺序输出结果。
这仍然很容易测试和调整,因为您可以简单地更改导入和注册的转换器。
您甚至可以在进程 运行ning 期间更改配置(哪些转换器是 运行),方法是存储要在文件中导入和注册哪些转换器以及是否要调整的信息当进程正在 运行ning 时,你向它发送一个 SIGHUP(在 Unix 上,无论如何 - 在 windows 上你将不得不找到另一种方式来向进程发出信号)这将导致它重新读取它是配置文件并调整它 运行 在数据上的变换器列表。这并不难实现。
如果您不需要 运行ning 在多核上为您提供的速度,这会更高效、不太可能中断并且更容易推理和支持。
基本想法看起来有点像这样(没有所有花里胡哨的东西):
# this is transformator_a.py
def transform(data):
return do_very_cool_stuff_to(data)
# this is transformator_b.py
import random
def transform(data):
if can_i_handle_this(data):
return do_some_more_cool_stuff_to(data)
else:
return data
def can_i_handle_this(data):
# this is a very picky transformator - it's not always sure it
# can handle the data
i = random.randint(1, 5)
return i == 2
# this is the main app
import transformator_a
import transformator_b
registered_transformators = [ transformator_a.transform,
transformator_b.transform ]
while True:
data = deserialize_data_unit_from_external_input_stream()
for transform in registered_transformators:
try:
data = transform.transform(data)
except SomeExcpetionsYouWantToHandle:
# might not be fatal to the rest of the pipeline.
# check whether data is still viable,
# continue if so, exit process if not - or you might
# just drop this data unit
ensure_we_have_viable_data(data)
serialize_data_unit_to_output_stream(data)