在独立的 运行 python 个脚本之间共享 python 个对象(例如 Pandas Dataframe)
Sharing python objects (e.g. Pandas Dataframe) between independently running python scripts
这是我的第一个问题,我希望我不会提出一个与现有问题非常相似的问题。如果是这样,请见谅!
所以,我遇到的情况如下:
我想 运行 独立的 python 并行脚本,它可以访问相同的 python 对象,在我的例子中是 Pandas Dataframe。我的想法是,一个脚本基本上不断 运行ning 并订阅数据流(这里的数据是通过 websocket 推送的),然后将其附加到共享数据帧。第二个脚本应该能够独立于第一个脚本启动,并且仍然可以访问由第一个脚本不断更新的 Dataframe。在第二个脚本中,我想在预定义的时间间隔内执行不同类型的分析,或者对实时数据执行其他相对耗时的操作。
我已经尝试 运行 从一个脚本中执行所有操作,但我一直与 websocket 断开连接。未来还会有多个脚本来实时访问共享数据。
与其在脚本 1 中每次更新后都保存一个 csv 文件或 pickle,我宁愿有一个解决方案,其中两个脚本基本上共享相同的内存。我也只需要一个脚本来编写和附加到 Dataframe,另一个只需要从中读取。多处理模块似乎有一些有趣的特性,可能会有所帮助,但到目前为止我还没有真正理解它。我还查看了全局变量,但在这种情况下使用它似乎也不正确。
我想象的是这样的(我知道,代码是完全错误的,这只是为了说明目的):
第一个脚本应不断将数据流中的新数据分配给数据帧并共享此对象。
from share_data import share
shared_df = pd.DataFrame()
for data from datastream:
shared_df.append(data)
share(shared_df)
第二个脚本应该能够执行以下操作:
from share_data import get
df = get(shared_df)
这完全有可能吗?或者您对如何以简单的方式实现这一点有任何想法吗?或者您有什么建议可以使用哪些软件包?
此致,
奥莱
您已经非常清楚可以使用您的数据做什么。
最佳方案取决于您的实际需求,
所以我将尝试用一个工作示例来涵盖各种可能性。
你想要什么
如果我完全理解你的需求,你想要
- 持续更新 DataFrame(从 websocket)
- 同时对同一个 DataFrame 进行一些计算
- 在计算工作者上保持 DataFrame 最新,
- 一次计算 CPU 密集
- 另一个不是。
你需要什么
正如您所说,您将需要一种方法来 运行 不同的线程或进程以保持计算 运行ning。
线程如何
实现所需内容的最简单方法是使用 threading 库。
由于线程可以共享内存,并且只有一个工作人员实际更新 DataFrame,因此很容易提出一种管理数据最新的方法:
import time
from dataclasses import dataclass
import pandas
from threading import Thread
@dataclass
class DataFrameHolder:
"""This dataclass holds a reference to the current DF in memory.
This is necessary if you do operations without in-place modification of
the DataFrame, since you will need replace the whole object.
"""
dataframe: pandas.DataFrame = pandas.DataFrame(columns=['A', 'B'])
def update(self, data):
self.dataframe = self.dataframe.append(data, ignore_index=True)
class StreamLoader:
"""This class is our worker communicating with the websocket"""
def __init__(self, df_holder: DataFrameHolder) -> None:
super().__init__()
self.df_holder = df_holder
def update_df(self):
# read from websocket and update your DF.
data = {
'A': [1, 2, 3],
'B': [4, 5, 6],
}
self.df_holder.update(data)
def run(self):
# limit loop for the showcase
for _ in range(5):
self.update_df()
print("[1] Updated DF %s" % str(self.df_holder.dataframe))
time.sleep(3)
class LightComputation:
"""This class is a random computation worker"""
def __init__(self, df_holder: DataFrameHolder) -> None:
super().__init__()
self.df_holder = df_holder
def compute(self):
print("[2] Current DF %s" % str(self.df_holder.dataframe))
def run(self):
# limit loop for the showcase
for _ in range(5):
self.compute()
time.sleep(5)
def main():
# We create a DataFrameHolder to keep our DataFrame available.
df_holder = DataFrameHolder()
# We create and start our update worker
stream = StreamLoader(df_holder)
stream_process = Thread(target=stream.run)
stream_process.start()
# We create and start our computation worker
compute = LightComputation(df_holder)
compute_process = Thread(target=compute.run)
compute_process.start()
# We join our Threads, i.e. we wait for them to finish before continuing
stream_process.join()
compute_process.join()
if __name__ == "__main__":
main()
请注意,我们使用 class 来保存当前 DataFrame 的引用,因为 append
等某些操作不一定就位,
因此,如果我们直接发送对 DataFrame 对象的引用,修改将在 worker 上丢失。
这里 DataFrameHolder
对象将在内存中保持相同的位置,因此 worker 仍然可以访问更新后的 DataFrame。
进程可能更强大
现在,如果您需要更多的计算能力,进程可能会更有用,因为它们使您能够将工作人员隔离在不同的核心上。
要在 python 中启动进程而不是线程,您可以使用 multiprocessing 库。
两个对象的 API 是相同的,您只需更改构造函数如下
from threading import Thread
# I create a thread
compute_process = Thread(target=compute.run)
from multiprocessing import Process
# I create a process that I can use the same way
compute_process = Process(target=compute.run)
现在,如果您尝试更改上述脚本中的值,您将看到 DataFrame 未正确更新。
为此,您需要做更多的工作,因为这两个进程不共享内存,而且它们之间有多种通信方式 (https://en.wikipedia.org/wiki/Inter-process_communication)
@SimonCrane 的参考在这些问题上非常有趣,并展示了使用 multiprocessing.manager.
在两个进程之间使用共享内存
这是一个版本,其中工作人员使用具有共享内存的单独进程:
import logging
import multiprocessing
import time
from dataclasses import dataclass
from multiprocessing import Process
from multiprocessing.managers import BaseManager
from threading import Thread
import pandas
@dataclass
class DataFrameHolder:
"""This dataclass holds a reference to the current DF in memory.
This is necessary if you do operations without in-place modification of
the DataFrame, since you will need replace the whole object.
"""
dataframe: pandas.DataFrame = pandas.DataFrame(columns=['A', 'B'])
def update(self, data):
self.dataframe = self.dataframe.append(data, ignore_index=True)
def retrieve(self):
return self.dataframe
class DataFrameManager(BaseManager):
"""This dataclass handles shared DataFrameHolder.
See https://docs.python.org/3/library/multiprocessing.html#examples
"""
# You can also use a socket file '/tmp/shared_df'
MANAGER_ADDRESS = ('localhost', 6000)
MANAGER_AUTH = b"auth"
def __init__(self) -> None:
super().__init__(self.MANAGER_ADDRESS, self.MANAGER_AUTH)
self.dataframe: pandas.DataFrame = pandas.DataFrame(columns=['A', 'B'])
@classmethod
def register_dataframe(cls):
BaseManager.register("DataFrameHolder", DataFrameHolder)
class DFWorker:
"""Abstract class initializing a worker depending on a DataFrameHolder."""
def __init__(self, df_holder: DataFrameHolder) -> None:
super().__init__()
self.df_holder = df_holder
class StreamLoader(DFWorker):
"""This class is our worker communicating with the websocket"""
def update_df(self):
# read from websocket and update your DF.
data = {
'A': [1, 2, 3],
'B': [4, 5, 6],
}
self.df_holder.update(data)
def run(self):
# limit loop for the showcase
for _ in range(4):
self.update_df()
print("[1] Updated DF\n%s" % str(self.df_holder.retrieve()))
time.sleep(3)
class LightComputation(DFWorker):
"""This class is a random computation worker"""
def compute(self):
print("[2] Current DF\n%s" % str(self.df_holder.retrieve()))
def run(self):
# limit loop for the showcase
for _ in range(4):
self.compute()
time.sleep(5)
def main():
logger = multiprocessing.log_to_stderr()
logger.setLevel(logging.INFO)
# Register our DataFrameHolder type in the DataFrameManager.
DataFrameManager.register_dataframe()
manager = DataFrameManager()
manager.start()
# We create a managed DataFrameHolder to keep our DataFrame available.
df_holder = manager.DataFrameHolder()
# We create and start our update worker
stream = StreamLoader(df_holder)
stream_process = Thread(target=stream.run)
stream_process.start()
# We create and start our computation worker
compute = LightComputation(df_holder)
compute_process = Process(target=compute.run)
compute_process.start()
# The managed dataframe is updated in every Thread/Process
time.sleep(5)
print("[0] Main process DF\n%s" % df_holder.retrieve())
# We join our Threads, i.e. we wait for them to finish before continuing
stream_process.join()
compute_process.join()
if __name__ == "__main__":
main()
如您所见,线程化和处理之间的差异非常小。
通过一些调整,如果您想使用不同的文件来处理您的 CPU 密集处理,您可以从那里开始连接到同一个管理器。
这是我的第一个问题,我希望我不会提出一个与现有问题非常相似的问题。如果是这样,请见谅!
所以,我遇到的情况如下:
我想 运行 独立的 python 并行脚本,它可以访问相同的 python 对象,在我的例子中是 Pandas Dataframe。我的想法是,一个脚本基本上不断 运行ning 并订阅数据流(这里的数据是通过 websocket 推送的),然后将其附加到共享数据帧。第二个脚本应该能够独立于第一个脚本启动,并且仍然可以访问由第一个脚本不断更新的 Dataframe。在第二个脚本中,我想在预定义的时间间隔内执行不同类型的分析,或者对实时数据执行其他相对耗时的操作。
我已经尝试 运行 从一个脚本中执行所有操作,但我一直与 websocket 断开连接。未来还会有多个脚本来实时访问共享数据。
与其在脚本 1 中每次更新后都保存一个 csv 文件或 pickle,我宁愿有一个解决方案,其中两个脚本基本上共享相同的内存。我也只需要一个脚本来编写和附加到 Dataframe,另一个只需要从中读取。多处理模块似乎有一些有趣的特性,可能会有所帮助,但到目前为止我还没有真正理解它。我还查看了全局变量,但在这种情况下使用它似乎也不正确。
我想象的是这样的(我知道,代码是完全错误的,这只是为了说明目的):
第一个脚本应不断将数据流中的新数据分配给数据帧并共享此对象。
from share_data import share
shared_df = pd.DataFrame()
for data from datastream:
shared_df.append(data)
share(shared_df)
第二个脚本应该能够执行以下操作:
from share_data import get
df = get(shared_df)
这完全有可能吗?或者您对如何以简单的方式实现这一点有任何想法吗?或者您有什么建议可以使用哪些软件包?
此致, 奥莱
您已经非常清楚可以使用您的数据做什么。
最佳方案取决于您的实际需求, 所以我将尝试用一个工作示例来涵盖各种可能性。
你想要什么
如果我完全理解你的需求,你想要
- 持续更新 DataFrame(从 websocket)
- 同时对同一个 DataFrame 进行一些计算
- 在计算工作者上保持 DataFrame 最新,
- 一次计算 CPU 密集
- 另一个不是。
你需要什么
正如您所说,您将需要一种方法来 运行 不同的线程或进程以保持计算 运行ning。
线程如何
实现所需内容的最简单方法是使用 threading 库。 由于线程可以共享内存,并且只有一个工作人员实际更新 DataFrame,因此很容易提出一种管理数据最新的方法:
import time
from dataclasses import dataclass
import pandas
from threading import Thread
@dataclass
class DataFrameHolder:
"""This dataclass holds a reference to the current DF in memory.
This is necessary if you do operations without in-place modification of
the DataFrame, since you will need replace the whole object.
"""
dataframe: pandas.DataFrame = pandas.DataFrame(columns=['A', 'B'])
def update(self, data):
self.dataframe = self.dataframe.append(data, ignore_index=True)
class StreamLoader:
"""This class is our worker communicating with the websocket"""
def __init__(self, df_holder: DataFrameHolder) -> None:
super().__init__()
self.df_holder = df_holder
def update_df(self):
# read from websocket and update your DF.
data = {
'A': [1, 2, 3],
'B': [4, 5, 6],
}
self.df_holder.update(data)
def run(self):
# limit loop for the showcase
for _ in range(5):
self.update_df()
print("[1] Updated DF %s" % str(self.df_holder.dataframe))
time.sleep(3)
class LightComputation:
"""This class is a random computation worker"""
def __init__(self, df_holder: DataFrameHolder) -> None:
super().__init__()
self.df_holder = df_holder
def compute(self):
print("[2] Current DF %s" % str(self.df_holder.dataframe))
def run(self):
# limit loop for the showcase
for _ in range(5):
self.compute()
time.sleep(5)
def main():
# We create a DataFrameHolder to keep our DataFrame available.
df_holder = DataFrameHolder()
# We create and start our update worker
stream = StreamLoader(df_holder)
stream_process = Thread(target=stream.run)
stream_process.start()
# We create and start our computation worker
compute = LightComputation(df_holder)
compute_process = Thread(target=compute.run)
compute_process.start()
# We join our Threads, i.e. we wait for them to finish before continuing
stream_process.join()
compute_process.join()
if __name__ == "__main__":
main()
请注意,我们使用 class 来保存当前 DataFrame 的引用,因为 append
等某些操作不一定就位,
因此,如果我们直接发送对 DataFrame 对象的引用,修改将在 worker 上丢失。
这里 DataFrameHolder
对象将在内存中保持相同的位置,因此 worker 仍然可以访问更新后的 DataFrame。
进程可能更强大
现在,如果您需要更多的计算能力,进程可能会更有用,因为它们使您能够将工作人员隔离在不同的核心上。 要在 python 中启动进程而不是线程,您可以使用 multiprocessing 库。 两个对象的 API 是相同的,您只需更改构造函数如下
from threading import Thread
# I create a thread
compute_process = Thread(target=compute.run)
from multiprocessing import Process
# I create a process that I can use the same way
compute_process = Process(target=compute.run)
现在,如果您尝试更改上述脚本中的值,您将看到 DataFrame 未正确更新。
为此,您需要做更多的工作,因为这两个进程不共享内存,而且它们之间有多种通信方式 (https://en.wikipedia.org/wiki/Inter-process_communication)
@SimonCrane 的参考在这些问题上非常有趣,并展示了使用 multiprocessing.manager.
在两个进程之间使用共享内存这是一个版本,其中工作人员使用具有共享内存的单独进程:
import logging
import multiprocessing
import time
from dataclasses import dataclass
from multiprocessing import Process
from multiprocessing.managers import BaseManager
from threading import Thread
import pandas
@dataclass
class DataFrameHolder:
"""This dataclass holds a reference to the current DF in memory.
This is necessary if you do operations without in-place modification of
the DataFrame, since you will need replace the whole object.
"""
dataframe: pandas.DataFrame = pandas.DataFrame(columns=['A', 'B'])
def update(self, data):
self.dataframe = self.dataframe.append(data, ignore_index=True)
def retrieve(self):
return self.dataframe
class DataFrameManager(BaseManager):
"""This dataclass handles shared DataFrameHolder.
See https://docs.python.org/3/library/multiprocessing.html#examples
"""
# You can also use a socket file '/tmp/shared_df'
MANAGER_ADDRESS = ('localhost', 6000)
MANAGER_AUTH = b"auth"
def __init__(self) -> None:
super().__init__(self.MANAGER_ADDRESS, self.MANAGER_AUTH)
self.dataframe: pandas.DataFrame = pandas.DataFrame(columns=['A', 'B'])
@classmethod
def register_dataframe(cls):
BaseManager.register("DataFrameHolder", DataFrameHolder)
class DFWorker:
"""Abstract class initializing a worker depending on a DataFrameHolder."""
def __init__(self, df_holder: DataFrameHolder) -> None:
super().__init__()
self.df_holder = df_holder
class StreamLoader(DFWorker):
"""This class is our worker communicating with the websocket"""
def update_df(self):
# read from websocket and update your DF.
data = {
'A': [1, 2, 3],
'B': [4, 5, 6],
}
self.df_holder.update(data)
def run(self):
# limit loop for the showcase
for _ in range(4):
self.update_df()
print("[1] Updated DF\n%s" % str(self.df_holder.retrieve()))
time.sleep(3)
class LightComputation(DFWorker):
"""This class is a random computation worker"""
def compute(self):
print("[2] Current DF\n%s" % str(self.df_holder.retrieve()))
def run(self):
# limit loop for the showcase
for _ in range(4):
self.compute()
time.sleep(5)
def main():
logger = multiprocessing.log_to_stderr()
logger.setLevel(logging.INFO)
# Register our DataFrameHolder type in the DataFrameManager.
DataFrameManager.register_dataframe()
manager = DataFrameManager()
manager.start()
# We create a managed DataFrameHolder to keep our DataFrame available.
df_holder = manager.DataFrameHolder()
# We create and start our update worker
stream = StreamLoader(df_holder)
stream_process = Thread(target=stream.run)
stream_process.start()
# We create and start our computation worker
compute = LightComputation(df_holder)
compute_process = Process(target=compute.run)
compute_process.start()
# The managed dataframe is updated in every Thread/Process
time.sleep(5)
print("[0] Main process DF\n%s" % df_holder.retrieve())
# We join our Threads, i.e. we wait for them to finish before continuing
stream_process.join()
compute_process.join()
if __name__ == "__main__":
main()
如您所见,线程化和处理之间的差异非常小。
通过一些调整,如果您想使用不同的文件来处理您的 CPU 密集处理,您可以从那里开始连接到同一个管理器。