共享内存缓冲区的粒度锁定

Granular locking of shared memory buffer

我需要创建一个共享内存,一个进程将其推送到另一个进程并从中进行采样。为了尽量减少共享内存被锁定的时间,我尝试为共享内存缓冲区中的每个索引创建一个锁。

当我 运行 执行此操作时,出现错误:OSError: [Errno 23] Too many open files in system。 (下面的堆栈跟踪是缩写的)

如何实现对长度 >100,000 的共享内存缓冲区的精细控制,以便在读取某些段的同时写入其他段?我可以使用其他构造吗?

In [1]: import multiprocessing as mp 
   ...: from multiprocessing.shared_memory import SharedMemory 
   ...:  
   ...: class Memory: 
   ...:     def __init__(self, length): 
   ...:         self.shm = SharedMemory(create=True, size=length) 
   ...:         self.locks = [mp.Lock() for _ in range(length)] 
   ...:  
   ...:     def __getitem__(self, item): 
   ...:         return int.from_bytes(self.shm.buf[item], 'big') 
   ...:  
   ...:     def __setitem__(self, key, value): 
   ...:         assert isinstance(value, int) 
   ...:         self.shm.buf[key] = value.to_bytes(1, 'big') 
   ...:                                                                         

In [2]: m = Memory(100_000)                                                     
---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
<ipython-input-2-2c6433483d72> in <module>
----> 1 m = Memory(100_000)

...

OSError: [Errno 23] Too many open files in system

我运行在 Kubernetes 上使用 docker 图像 gitlab-registry.nautilus.optiputer.net/ian/torch:latest

ulimitreturnsunlimited。 无论如何,设置 ulimit 显然是 Kubernetes 上的 open issue

更多细节:

推手:

def _push_worker(self) -> None:
    buffer_len = 100
    while True:
        sample = self.replay_in_queue.get()
        self.buffer_in.append(sample)
        if len(self.buffer_in) >= self.initial_memory // buffer_len:
            index = self.sample_count % self.memory_maxlen
            self.memory[index: index + buffer_len] = self.buffer_in
            self.sample_count += buffer_len
            self.buffer_in = []

样本工人:


def _sample_worker(self) -> None:
    while True:
        batch = random.choices(self.memory, k=self.batch_size)
        self.replay_out_queue.put(batch)

我的解决方案是锁定内存块。这需要微调块的数量以获得最佳性能。


class Memory:
    """
    A shared memory utility class
    """
    # n_bytes:
    #   2 4x84x84 uint8 arrays
    #   4 32-bit (4-byte) numbers
    #   1 bool (1-byte)
    int_size = 4
    array_dtype = 'uint8'
    array_bytes = 4 * 84 * 84
    array_shape = (4, 84, 84)
    stride = 2 * array_bytes + 4 * int_size + 1
    _offset = 0

    def __init__(self, length: int):
        self._length = length
        self._shared_memory = SharedMemory(create=True, size=self.stride * length)

        _n_locks = 1_000
        self._locks = [mp.Lock() for _ in range(_n_locks)]
        self._lock_length = length // _n_locks
        assert self._lock_length == length / _n_locks, "length must be divisible by _n_locks"

    def __del__(self):
        self._shared_memory.unlink()

    @property
    def _buf(self):
        return self._shared_memory.buf

    def __len__(self):
        return self._length

    def __getitem__(self, index: Union[slice, int]):
        if isinstance(index, int):
            return self._get_item(index)
        elif isinstance(index, slice):
            return self._get_slice(index)
        else:
            raise IndexError

    def _get_slice(self, slice_: slice):
        start = slice_.start if slice_.start is not None else 0
        step = slice_.step if slice_.step is not None else 1
        stop = slice_.stop if slice_.stop is not None else self._length
        if slice_.stop > self._length:
            raise IndexError
        return [self._get_item(i % self._length) for i in range(start, stop, step)]

    # todo: use __get_slice__ and __set_slice__
    def _get_item(self, index):
        if index < 0 or index > self._length:
            raise IndexError(f"index {index} out of bounds")

        with self._locks[index // self._lock_length]:
            self._offset = index * self.stride

            actor_id = int.from_bytes(self._get(self.int_size), 'big')
            step_number = int.from_bytes(self._get(self.int_size), 'big')
            state = np.frombuffer(self._get(self.array_bytes), dtype='uint8').reshape(self.array_shape)
            action = int.from_bytes(self._get(self.int_size), 'big')
            next_state = np.frombuffer(self._get(self.array_bytes), dtype='uint8').reshape(self.array_shape)
            reward = int.from_bytes(self._get(self.int_size), 'big', signed=True)
            done = int.from_bytes(self._get(1), 'big')
            if done:
                next_state = None
            return Transition(actor_id, step_number, state, action, next_state, reward, done)

    def _get(self, n_bytes: int) -> bytes:
        """
        Get item at `_offset` and move forward `n_bytes`

        :param n_bytes: Number of bytes to retrieve from memory
        :return: bytes copied from memory
        """
        item = self._buf[self._offset: self._offset + n_bytes]
        self._offset += n_bytes
        return item.tobytes()

    def __setitem__(self, index: Union[int, slice], transition: Union[List[Transition], Transition]):
        """
        Store `transition` in shared memory

        :param index: Index of the memory location to store
        :param transition: a `Transition`
        """
        if isinstance(index, int):
            assert isinstance(transition, Transition)
            self._set_item(index, transition)
        elif isinstance(index, slice):
            assert isinstance(transition, List)
            self._set_slice(index, transition)
        else:
            raise IndexError

    def _set_slice(self, slice_: slice, transitions: List[Transition]):
        start = slice_.start if slice_.start is not None else 0
        step = slice_.step if slice_.step is not None else 1
        stop = slice_.stop if slice_.stop is not None else self._length
        for i, t in zip(range(start, stop, step), transitions):
            self._set_item(i % self._length, t)

    def _set_item(self, index, transition):
        if index < 0 or index > self._length:
            raise IndexError(f"index {index} out of bounds")

        with self._locks[index // self._lock_length]:
            self._offset = index * self.stride

            # 'actor_id', 'step_number', 'state', 'action', 'next_state', 'reward', 'done'
            self._set(transition.actor_id.to_bytes(self.int_size, 'big'))
            self._set(transition.step_number.to_bytes(self.int_size, 'big'))
            self._set(transition.state.tobytes())
            self._set(transition.action.to_bytes(self.int_size, 'big'))
            if transition.next_state is not None:
                self._set(transition.next_state.tobytes())
            else:
                self._offset += self.array_bytes
            self._set(int(transition.reward).to_bytes(self.int_size, 'big', signed=True))
            self._set(transition.done.to_bytes(1, 'big'))

    def _set(self, bytearray_: Union[bytearray, bytes]):
        """
        update `_buf` and move `_offset`

        :param bytearray_: a bytearray
        """
        len_ = len(bytearray_)
        self._buf[self._offset: self._offset + len_] = bytearray_
        self._offset = self._offset + len_

    def __iter__(self):
        for i in range(self._length):
            yield self[i]