使用 multiprocessing.pool.map 通过同一个套接字发送

Sending over the same socket with multiprocessing.pool.map

我正在尝试通过套接字连接打包和发送柱状数据。

为了加快速度,我考虑过将打包 (struct.pack) 拆分为多个进程。

为了避免双向 pickle,我认为让打包进程自己发送数据可能更好,因为据说套接字对象可以从 Python 3.4.

开始被 pickle

这是我工作中的简化版本:

import socket
from multiprocessing import Pool
from struct import pack

# Start and connect a socket
s = socket.socket()
s.connect((ip, port))

# Data to be packed and sent in this order
data1 = 1, 2, 3, 4
data2 = 5, 6, 7, 8
data3 = 9, 10, 11, 12

# Top level column packer/sender for mp.pool
def send_column(column):
    return s.send(pack(f'{len(column)}i', *column))


pool = Pool()

# Will this necessarily send the data in order?
pool.map(send_column, (data1, data2, data3))

我的问题是 - 是否保证数据会按顺序发送?

如果不是,确保它存在的谨慎方法是什么?

我考虑过为进程设置一个全局计数器来检查是否轮到它们了,但我很乐意听到更好的想法。

查看您的用例,您有 2 个时间密集型任务:

  • packing/serializing数据
  • 发送数据

在您的机器上打包是一项 CPU 密集型任务:由于 python 中的线程总是 运行 在同一核心上,多线程可能不会(如果有的话)获益太多.在多个进程中打包可能会加快打包部分的速度,因为可以利用多个内核 ,但另一方面,您必须将数据复制到新的 space在主内存中,因为进程不共享内存。您应该测试 multiprocessing 是否有意义,如果没有,请尝试 shared memory which would eliminate the speed loss from copying the data and will let you pack your data on multiple cores(but adds a lot of complexity to your code). For packing in general I'd also recommend looking at protobuf or flatbuffers.

另一方面发送数据,从并发中获益不是因为 CPU 需要那么多时间,而是因为网络延迟和等待确认数据包,这意味着可以实现显着的加速使用 threads or asyncio 因为使用多核不会加快等待回复的速度。

我建议您测试使用多处理库在多核上打包是否具有预期的效果。如果是这样,您将必须为您的数据包编制索引或添加时间戳,以便能够在另一侧重新对齐它们。没有 "make sure they are sent in order" 的机制,因为这会消除您使用并发节省的大部分时间。所以不要尝试在不需要的地方同步,因为那样你就可以完全跳过异步工作。

然而,如果在多个进程上打包(这就是我所怀疑的)仅产生可忽略不计的加速,我建议 packing/serializing 将数据放在一个线程(在主线程中)上,然后发送数据在每个线程上或使用 asyncio。有关操作方法,请参阅 。您将不得不期望数据乱序,因此要么为您的数据包编制索引,要么为其添加时间戳。

HTH

如果出于某种原因你绝对必须打包多个进程并且按顺序发送数据,你将不得不查看共享内存并进行设置以便主进程为每组数据创建一个进程,并与正确的进程共享每个数据集的内存。然后每个子进程必须创建一个共享内存对象来写入打包数据。打包数据必须与父进程共享。然后父进程应该遍历子进程将写入的共享内存对象,并且只发送一条数据,如果它是第一个数据,或者如果前一个数据被标记为已发送。在这种情况下发送数据不应该使用线程或任何异步方式发生,因为这样将再次无法保证正确的顺序......也就是说最好不要使用这个解决方案(极其复杂 - 最小增益),使用任何一个以上 2.

  1. 套接字将由进程共享,进程由操作系统调度程序控制,操作系统调度程序无法控制此进程的执行顺序。因此,进程随机 运行 出现(这不是全部事实 - 检查 os scheduling algorithms)并且您无法保证执行顺序和包裹交付顺序。
  2. 从网络的角度来看,当您通过共享套接字发送数据时,通常您不会等待响应(如果您使用 tcp 协议),这对我们来说将显示为同步数据包 send/deliver,响应也是如此。

为了确保按顺序传送数据包,您需要确保发送给另一端的每个数据包都能收到,因此您只能使用同步连接(仅在发送前一个数据包后发送数据包,并且您确保已收到)。 在您的用例中,我建议您拥有生成腌制对象并将它们发送到队列的进程池(它们将成为生产者)。另一个对象将成为这些对象的消费者并通过网络发送它们。