为数据缓冲区创建类文件对象
Create file-like object for data buffer
情境化
我正在编写一个程序,该程序能够从传感器读取数据,然后对其进行处理。目前我希望它被发送到服务器。我有两个通过套接字进行通信的进程,一个读取数据并将其存储到临时文件,另一个读取临时文件,将数据发送到服务器。
问题
这个问题实际上从未在测试中出现过,但是我已经意识到,如果采样频率很高,两个进程很可能 重合以尝试 read/write同时文件(不是他们同时确切地请求它,而是一个试图在另一个关闭它之前打开它)。
即使这不会引发错误(对于我在网上阅读的内容,有些 OS 不会将锁放入文件中),它也可能导致巨大的版本不兼容错误,从而导致数据丢失。因此,这种处理数据的方式看起来不太合适。
我自己的idea/approach
我想在内存(数据缓冲区)中使用一个类似文件的对象。我在 Python 中没有这个概念的经验,所以我研究了一下,我明白 [a buffer] 就像一个文件,在程序执行时保存在内存中,并且具有非常相似的属性一个标准的系统文件。我认为使用它可能是个好主意,但是我找不到解决其中一些不便的方法:
既然还是like一个文件(file-like object),难道两个进程在对象,版本不兼容 errors/bugs 可以提高吗?我只需要 append 一个进程的数据(最后)和 remove 数据从另一个开始(作为某种队列).此 Python 功能是否允许这样做?如果允许,我可以在文档中查看哪些方法?
对于上面的解释,我想到了字面上使用队列;然而,这在时间上可能是低效的执行(附加到列表相当快,但根据我在自己的机器上进行的测试,附加到 pandas 对象要慢大约 1000 倍,以查看哪种对象类型最适合).是否有一个对象(如果不是类似文件的对象)可以让我这样做并且高效?我知道效率是主观的,所以假设每秒 100 次追加而没有明显的延迟(在这种情况下时间戳很重要)。
由于我使用了两个不同的进程并且它们在 Python 中不共享内存,是否仍然可以在对类文件对象进行操作时指向相同的内存地址?正如我所说,我用套接字与它们通信,但该方法是 afaik 按值调用,而不是引用;所以这对我来说是个严重的问题(也许有必要将它们合并到两个线程而不是不同的 python 进程?)
如果需要,您可以评论询问任何其他细节,我会很乐意回答。
编辑:评论中提出的问题:
How are you creating these processes? Through a Python module like
multiprocessing
or subprocess
, or some other way?
我运行他们是两个完全独立的程序。每个都有一个不同的主 python 文件,由 shell 脚本调用;但是,如果需要,我可以灵活地更改此行为。
另一方面,从传感器读取数据的进程有两个线程:一个实际读取数据,另一个侦听套接字请求。
what type of data are you getting from the sensor and sending to the
server?
我发送的表通常包含浮点数,但是传感器也可能产生视频流或其他类型的数据结构。
Misconception of Queue | pandas
我知道队列与数据帧无关;我只是说我尝试使用数据帧但它表现不佳,因为它被认为预先分配了它需要的内存 space(如果我是对的)。我只是表达了我对解决方案性能的担忧。
首先,您确实在考虑构建 io.BytesIO
已经完成的工作。它是一个完全存储在内存中的类似文件的对象。每个进程的对象都完全独立于每个其他进程的对象。这就是您想要的一切。但这对你没有任何好处。它是一个类似文件的对象这一事实并不意味着它可以从其他进程访问。事实上,这就是非文件类文件对象的全部 点 :它们不是文件。
但您可以明确锁定您的文件。
确实,除了 Windows,大多数操作系统都不会自动锁定文件,有些甚至没有“强制”锁,只有协作锁,它们实际上不会保护文件,除非所有的程序都是为使用锁而编写的。但这不是问题。
一个选项是为 Windows 和 Unix 编写单独的代码:在 Windows 上,依靠以独占模式打开文件;在 Unix 上,使用 flock
.
另一种选择是创建手动锁定文件。你可以原子地尝试创建一个文件,如果其他人首先在每个平台上创建它,只需使用 os.open
和 O_CREAT|O_EXCL
标志,你就可以失败,你可以在此基础上构建你需要的一切。
如果您正在考虑使用共享内存,除非您正在使用 multiprocessing
,否则以跨平台的方式这样做是非常痛苦的。
但是你可以通过使用普通文件并在每个进程中使用mmap
来访问该文件,就好像它是普通内存一样,可以获得相同的效果。只需确保只使用 length
和 access
的跨平台值(而不使用平台特定的参数,如 prot
或 flags
)并且它的工作原理相同无处不在。
当然你不能将 Python 对象放入共享内存或 mmap 中,但你可以将原始字节或“本机值”或它们的数组或 ctypes Structures
,或者最好的是,它们的多维 numpy 数组。除了最后一个,你可以使用 the appropriate wrapper objects out of multiprocessing
even if you aren’t otherwise using the module. For the last one, just use np.memmap
而不是直接使用 mmap
,它会处理所有事情。
但是,关于队列更快的说法您可能是对的。如果这真的是一个问题(尽管我实际上会构建并测试它以在解决它之前查看它是否是一个问题……),那就去做吧。不过你好像有些误解
首先,我不知道为什么你认为队列与附加到 pandas DataFrame 有任何关系。我想你可以使用 df 作为队列,但两者之间没有内在联系。
同时,列表适用于小型队列,但对于非常大的队列则不然。要么你追加到右边然后从左边弹出,要么你追加到左边然后从右边弹出。无论哪种方式,左侧的操作所花费的时间都与队列的大小成线性关系,因为您必须将 be 列表的其余部分向左或向右移动一个槽。解决办法是collections.deque
,一个几乎和列表一样的对象,除了它可以在常数时间在两边插入或删除,而不是只在右边。
但同样,这并没有解决任何问题,因为它实际上并没有以任何方式共享。您需要某种 interprocess 队列,而 DataFrame 和列表(也不是双端队列)都无济于事。
您可以在管道之上构建进程间队列。根据您的进程 运行,这可能是一个匿名管道,其中启动程序将管道的一端交给子程序,或者这可能是一个命名管道,这在 [=94] 上略有不同=] 与 Unix 相比,但在这两种情况下,它都由具有一些全局已知名称(如文件系统路径)的程序来打开同一个管道。
您还可以在 TCP 套接字之上构建进程间队列。如果绑定到localhost再连接到localhost,这几乎和管道一样高效,但是写跨平台更简单。
那么,如何在管道或套接字之上构建队列?唯一的问题是您只有字节流而不是消息流。
- 如果您的邮件大小相同,您只需在一侧
sendall
,然后循环 recv
,直到您有 MESSAGESIZE
个字节。
- 如果是像
pickle
这样的自定界格式,没有问题;一侧 sendall
,另一侧 recv
直到你有一个完整的泡菜。您甚至可以使用 socket.makefile
(当然,仅适用于套接字,不适用于管道)来获取可以直接传递给 pickle.dump‘ and
pickle.load` 的类似文件的对象。
- 您可以使用某种分隔符(例如,如果您的消息是永远不能包含换行符或永远不能包含 NUL 字节的文本,您可以只使用换行符或 0 作为分隔符——如果您使用换行符,
makefile
再次为您处理。
- 或者您可以在消息本身之前发送每条消息的大小(例如,使用像 netstring 这样的普通协议)。
如果您正在(或可能)使用 multiprocessing
库来控制所有单独的进程,它带有一个内置的 Queue
class,它构建了一个 IPC以一种有效的方式为每个主要平台在管道上发送泡菜之前排队,但您不必担心它是如何工作的;你只需将队列交给你的子进程,你可以在一端 put
和在另一端 get
它就可以了。
您误解了什么是类文件对象。 "File-like object" 描述了一个对象呈现的接口——像 read
或 write
这样的方法,以及逐行迭代。它没有说明是否将数据存储在内存中。常规文件对象是类文件对象。 OS 级管道的文件对象是类文件对象。 io.StringIO
和 io.BytesIO
对象是类文件对象,它们确实像您一直在想的那样工作。
与其考虑类文件对象,不如考虑要使用哪种 OS 级别的机制在进程之间进行通信。你已经有了套接字;为什么不使用套接字在进程之间发送数据?管道是另一种选择。共享内存是可能的,但依赖于平台且棘手;这可能不是最好的选择。
情境化
我正在编写一个程序,该程序能够从传感器读取数据,然后对其进行处理。目前我希望它被发送到服务器。我有两个通过套接字进行通信的进程,一个读取数据并将其存储到临时文件,另一个读取临时文件,将数据发送到服务器。
问题
这个问题实际上从未在测试中出现过,但是我已经意识到,如果采样频率很高,两个进程很可能 重合以尝试 read/write同时文件(不是他们同时确切地请求它,而是一个试图在另一个关闭它之前打开它)。
即使这不会引发错误(对于我在网上阅读的内容,有些 OS 不会将锁放入文件中),它也可能导致巨大的版本不兼容错误,从而导致数据丢失。因此,这种处理数据的方式看起来不太合适。
我自己的idea/approach
我想在内存(数据缓冲区)中使用一个类似文件的对象。我在 Python 中没有这个概念的经验,所以我研究了一下,我明白 [a buffer] 就像一个文件,在程序执行时保存在内存中,并且具有非常相似的属性一个标准的系统文件。我认为使用它可能是个好主意,但是我找不到解决其中一些不便的方法:
既然还是like一个文件(file-like object),难道两个进程在对象,版本不兼容 errors/bugs 可以提高吗?我只需要 append 一个进程的数据(最后)和 remove 数据从另一个开始(作为某种队列).此 Python 功能是否允许这样做?如果允许,我可以在文档中查看哪些方法?
对于上面的解释,我想到了字面上使用队列;然而,这在时间上可能是低效的执行(附加到列表相当快,但根据我在自己的机器上进行的测试,附加到 pandas 对象要慢大约 1000 倍,以查看哪种对象类型最适合).是否有一个对象(如果不是类似文件的对象)可以让我这样做并且高效?我知道效率是主观的,所以假设每秒 100 次追加而没有明显的延迟(在这种情况下时间戳很重要)。
由于我使用了两个不同的进程并且它们在 Python 中不共享内存,是否仍然可以在对类文件对象进行操作时指向相同的内存地址?正如我所说,我用套接字与它们通信,但该方法是 afaik 按值调用,而不是引用;所以这对我来说是个严重的问题(也许有必要将它们合并到两个线程而不是不同的 python 进程?)
如果需要,您可以评论询问任何其他细节,我会很乐意回答。
编辑:评论中提出的问题:
How are you creating these processes? Through a Python module like
multiprocessing
orsubprocess
, or some other way?
我运行他们是两个完全独立的程序。每个都有一个不同的主 python 文件,由 shell 脚本调用;但是,如果需要,我可以灵活地更改此行为。
另一方面,从传感器读取数据的进程有两个线程:一个实际读取数据,另一个侦听套接字请求。
what type of data are you getting from the sensor and sending to the server?
我发送的表通常包含浮点数,但是传感器也可能产生视频流或其他类型的数据结构。
Misconception of Queue | pandas
我知道队列与数据帧无关;我只是说我尝试使用数据帧但它表现不佳,因为它被认为预先分配了它需要的内存 space(如果我是对的)。我只是表达了我对解决方案性能的担忧。
首先,您确实在考虑构建 io.BytesIO
已经完成的工作。它是一个完全存储在内存中的类似文件的对象。每个进程的对象都完全独立于每个其他进程的对象。这就是您想要的一切。但这对你没有任何好处。它是一个类似文件的对象这一事实并不意味着它可以从其他进程访问。事实上,这就是非文件类文件对象的全部 点 :它们不是文件。
但您可以明确锁定您的文件。
确实,除了 Windows,大多数操作系统都不会自动锁定文件,有些甚至没有“强制”锁,只有协作锁,它们实际上不会保护文件,除非所有的程序都是为使用锁而编写的。但这不是问题。
一个选项是为 Windows 和 Unix 编写单独的代码:在 Windows 上,依靠以独占模式打开文件;在 Unix 上,使用 flock
.
另一种选择是创建手动锁定文件。你可以原子地尝试创建一个文件,如果其他人首先在每个平台上创建它,只需使用 os.open
和 O_CREAT|O_EXCL
标志,你就可以失败,你可以在此基础上构建你需要的一切。
如果您正在考虑使用共享内存,除非您正在使用 multiprocessing
,否则以跨平台的方式这样做是非常痛苦的。
但是你可以通过使用普通文件并在每个进程中使用mmap
来访问该文件,就好像它是普通内存一样,可以获得相同的效果。只需确保只使用 length
和 access
的跨平台值(而不使用平台特定的参数,如 prot
或 flags
)并且它的工作原理相同无处不在。
当然你不能将 Python 对象放入共享内存或 mmap 中,但你可以将原始字节或“本机值”或它们的数组或 ctypes Structures
,或者最好的是,它们的多维 numpy 数组。除了最后一个,你可以使用 the appropriate wrapper objects out of multiprocessing
even if you aren’t otherwise using the module. For the last one, just use np.memmap
而不是直接使用 mmap
,它会处理所有事情。
但是,关于队列更快的说法您可能是对的。如果这真的是一个问题(尽管我实际上会构建并测试它以在解决它之前查看它是否是一个问题……),那就去做吧。不过你好像有些误解
首先,我不知道为什么你认为队列与附加到 pandas DataFrame 有任何关系。我想你可以使用 df 作为队列,但两者之间没有内在联系。
同时,列表适用于小型队列,但对于非常大的队列则不然。要么你追加到右边然后从左边弹出,要么你追加到左边然后从右边弹出。无论哪种方式,左侧的操作所花费的时间都与队列的大小成线性关系,因为您必须将 be 列表的其余部分向左或向右移动一个槽。解决办法是collections.deque
,一个几乎和列表一样的对象,除了它可以在常数时间在两边插入或删除,而不是只在右边。
但同样,这并没有解决任何问题,因为它实际上并没有以任何方式共享。您需要某种 interprocess 队列,而 DataFrame 和列表(也不是双端队列)都无济于事。
您可以在管道之上构建进程间队列。根据您的进程 运行,这可能是一个匿名管道,其中启动程序将管道的一端交给子程序,或者这可能是一个命名管道,这在 [=94] 上略有不同=] 与 Unix 相比,但在这两种情况下,它都由具有一些全局已知名称(如文件系统路径)的程序来打开同一个管道。
您还可以在 TCP 套接字之上构建进程间队列。如果绑定到localhost再连接到localhost,这几乎和管道一样高效,但是写跨平台更简单。
那么,如何在管道或套接字之上构建队列?唯一的问题是您只有字节流而不是消息流。
- 如果您的邮件大小相同,您只需在一侧
sendall
,然后循环recv
,直到您有MESSAGESIZE
个字节。 - 如果是像
pickle
这样的自定界格式,没有问题;一侧sendall
,另一侧recv
直到你有一个完整的泡菜。您甚至可以使用socket.makefile
(当然,仅适用于套接字,不适用于管道)来获取可以直接传递给pickle.dump‘ and
pickle.load` 的类似文件的对象。 - 您可以使用某种分隔符(例如,如果您的消息是永远不能包含换行符或永远不能包含 NUL 字节的文本,您可以只使用换行符或 0 作为分隔符——如果您使用换行符,
makefile
再次为您处理。 - 或者您可以在消息本身之前发送每条消息的大小(例如,使用像 netstring 这样的普通协议)。
如果您正在(或可能)使用 multiprocessing
库来控制所有单独的进程,它带有一个内置的 Queue
class,它构建了一个 IPC以一种有效的方式为每个主要平台在管道上发送泡菜之前排队,但您不必担心它是如何工作的;你只需将队列交给你的子进程,你可以在一端 put
和在另一端 get
它就可以了。
您误解了什么是类文件对象。 "File-like object" 描述了一个对象呈现的接口——像 read
或 write
这样的方法,以及逐行迭代。它没有说明是否将数据存储在内存中。常规文件对象是类文件对象。 OS 级管道的文件对象是类文件对象。 io.StringIO
和 io.BytesIO
对象是类文件对象,它们确实像您一直在想的那样工作。
与其考虑类文件对象,不如考虑要使用哪种 OS 级别的机制在进程之间进行通信。你已经有了套接字;为什么不使用套接字在进程之间发送数据?管道是另一种选择。共享内存是可能的,但依赖于平台且棘手;这可能不是最好的选择。