为 numpy 数组和多处理定义在内存中重叠的 Ctypes 数组
Define Ctypes array that overlaps in memory for numpy array and multiprocessing
如何定义一个 ctype 数组缓冲区,它可以在一个时间点保存多个 numpy 浮点数组(比如 A、B、C),然后在另一个时间点保存多个 numpy 整数数组(比如 D、E) ?这可以通过 python 中的 ctypes、numpy 或多处理的某种组合来完成吗?
谢谢。我正在尝试使用更少的内存。
首先,您的程序是否占用过多内存?如果答案是 "no" 或 "I'm not sure",那么忽略这个可以继续,直到你知道你确实有问题。
对不同的数组使用相同的缓冲区
您可以使用 numpy 中提供的 "views" 来完成所有您想做的事情。视图只是查看相同数据的不同方式。例如,
import numpy as np
ints32 = np.array([0, 0, 0, 0], dtype="<i4") # dtype string means little endian 4 byte ints
assert len(ints32) == 4
ints16 = ints32.view(dtype="<i2")
assert len(ints16) == 8 # 32-bit ints need half as much space as a 32-bit int
ints32[0] = 0x11223344
assert ints16[0] == 0x3344
print(ints16) # prints [13124 4386 0 0 0 0 0 0]
# Thus, showing ints16 is backed by the same memory as ints32
如果您愿意,也可以使用外部缓冲区
buffer = bytearray(8)
floats32 = np.frombuffer(buffer, dtype="<f4")
floats32[0] = 1
print(buffer) # shows buffer has been modified
您需要小心,因为您最终可能会出现对齐错误:
buf = np.zeros(3, dtype=np.int8) # 3 byte buffer
arr = buf.view(dtype=np.int16) # Error! Needs a buffer with multiples of 2 bytes
two_byte_slice = buf[:2]
arr = two_byte_slice.view(dtype=np.int16) # Succeeds
arr[0] = 1
assert buf[0] == 1 # shows that two_byte_slice and arr are not copies of buf
与不同进程或 C 库共享同一个缓冲区
与 C 库或其他进程共享缓冲区会带来一定的风险。通常仅通过立即复制缓冲区并仅使用它来减轻这种风险。但是,小心管理,您仍然可以安全。
要与 C 库共享缓冲区,您必须确保:
- 在 Python 释放缓冲区后,C 库不会保留指向输入缓冲区的指针。如果 C 库在函数 returns 之后不保留对缓冲区的引用,或者如果您保留对拥有对象的全局引用,这隐含地很好。
与另一个进程共享数据更复杂。但也可以做到安全。
- 如果任何派生进程打算比其父进程存活得更久,则它会从缓冲区复制数据而不是直接使用缓冲区。
- 如果两个或多个进程打算共享一个缓冲区,但同时工作,那么它们会表现良好,因为分配了一个锁来保护对缓冲区的访问,并且进程会观察这个锁。
请参阅以下与另一个进程共享缓冲区并使用锁来同步访问的示例(严格来说,锁不是必需的,因为父进程会等待子进程完成后再继续)。
import numpy as np
import ctypes
from multiprocessing import Array, Process
def main():
buf = Array(ctypes.c_int8, 10) # 10 byte buffer
with buf: # acquire lock
ctypes_arr = buf.get_obj()
arr = np.frombuffer(ctypes_arr, dtype=np.int16) # int16 array, with size 5
total = arr.sum()
del arr, ctypes_arr # losing lock, delete local reference to the buffer
print("total before:", total) # 0
p = Process(target=subprocess_target, args=(buf,))
p.start()
p.join()
with buf:
# interpret first 8 bytes as two 4 byte ints
view = memoryview(buf.get_obj())[:8]
arr = np.frombuffer(view, dtype=np.int32)
total = arr.sum()
del arr, view
print("total after:", total) # 262146
raw_bytes = list(buf.get_obj())
assert raw_bytes == [0, 0, 1, 0, 2, 0, 3, 0, 4, 0]
def subprocess_target(buf):
"""Sets elements in buf to [0, 1, ..., n-2, n-1]"""
with buf:
arr = np.frombuffer(buf.get_obj(), dtype=np.int16)
arr[:] = range(len(arr))
del arr
if __name__ == "__main__":
main()
如何定义一个 ctype 数组缓冲区,它可以在一个时间点保存多个 numpy 浮点数组(比如 A、B、C),然后在另一个时间点保存多个 numpy 整数数组(比如 D、E) ?这可以通过 python 中的 ctypes、numpy 或多处理的某种组合来完成吗?
谢谢。我正在尝试使用更少的内存。
首先,您的程序是否占用过多内存?如果答案是 "no" 或 "I'm not sure",那么忽略这个可以继续,直到你知道你确实有问题。
对不同的数组使用相同的缓冲区
您可以使用 numpy 中提供的 "views" 来完成所有您想做的事情。视图只是查看相同数据的不同方式。例如,
import numpy as np
ints32 = np.array([0, 0, 0, 0], dtype="<i4") # dtype string means little endian 4 byte ints
assert len(ints32) == 4
ints16 = ints32.view(dtype="<i2")
assert len(ints16) == 8 # 32-bit ints need half as much space as a 32-bit int
ints32[0] = 0x11223344
assert ints16[0] == 0x3344
print(ints16) # prints [13124 4386 0 0 0 0 0 0]
# Thus, showing ints16 is backed by the same memory as ints32
如果您愿意,也可以使用外部缓冲区
buffer = bytearray(8)
floats32 = np.frombuffer(buffer, dtype="<f4")
floats32[0] = 1
print(buffer) # shows buffer has been modified
您需要小心,因为您最终可能会出现对齐错误:
buf = np.zeros(3, dtype=np.int8) # 3 byte buffer
arr = buf.view(dtype=np.int16) # Error! Needs a buffer with multiples of 2 bytes
two_byte_slice = buf[:2]
arr = two_byte_slice.view(dtype=np.int16) # Succeeds
arr[0] = 1
assert buf[0] == 1 # shows that two_byte_slice and arr are not copies of buf
与不同进程或 C 库共享同一个缓冲区
与 C 库或其他进程共享缓冲区会带来一定的风险。通常仅通过立即复制缓冲区并仅使用它来减轻这种风险。但是,小心管理,您仍然可以安全。 要与 C 库共享缓冲区,您必须确保:
- 在 Python 释放缓冲区后,C 库不会保留指向输入缓冲区的指针。如果 C 库在函数 returns 之后不保留对缓冲区的引用,或者如果您保留对拥有对象的全局引用,这隐含地很好。
与另一个进程共享数据更复杂。但也可以做到安全。
- 如果任何派生进程打算比其父进程存活得更久,则它会从缓冲区复制数据而不是直接使用缓冲区。
- 如果两个或多个进程打算共享一个缓冲区,但同时工作,那么它们会表现良好,因为分配了一个锁来保护对缓冲区的访问,并且进程会观察这个锁。
请参阅以下与另一个进程共享缓冲区并使用锁来同步访问的示例(严格来说,锁不是必需的,因为父进程会等待子进程完成后再继续)。
import numpy as np
import ctypes
from multiprocessing import Array, Process
def main():
buf = Array(ctypes.c_int8, 10) # 10 byte buffer
with buf: # acquire lock
ctypes_arr = buf.get_obj()
arr = np.frombuffer(ctypes_arr, dtype=np.int16) # int16 array, with size 5
total = arr.sum()
del arr, ctypes_arr # losing lock, delete local reference to the buffer
print("total before:", total) # 0
p = Process(target=subprocess_target, args=(buf,))
p.start()
p.join()
with buf:
# interpret first 8 bytes as two 4 byte ints
view = memoryview(buf.get_obj())[:8]
arr = np.frombuffer(view, dtype=np.int32)
total = arr.sum()
del arr, view
print("total after:", total) # 262146
raw_bytes = list(buf.get_obj())
assert raw_bytes == [0, 0, 1, 0, 2, 0, 3, 0, 4, 0]
def subprocess_target(buf):
"""Sets elements in buf to [0, 1, ..., n-2, n-1]"""
with buf:
arr = np.frombuffer(buf.get_obj(), dtype=np.int16)
arr[:] = range(len(arr))
del arr
if __name__ == "__main__":
main()