multiprocessing pool.map 未按顺序处理列表

multiprocessing pool.map not processing list in order

我有这个脚本来并行处理一些 url:

import multiprocessing
import time

list_of_urls = []

for i in range(1,1000):
    list_of_urls.append('http://example.com/page=' + str(i))

def process_url(url):
    page_processed = url.split('=')[1]
    print 'Processing page %s'% page_processed
    time.sleep(5)

pool = multiprocessing.Pool(processes=4)
pool.map(process_url, list_of_urls)

列表是有序的,但是当我 运行 它时,脚本不会按顺序从列表中选择 url:

Processing page 1
Processing page 64
Processing page 127
Processing page 190
Processing page 65
Processing page 2
Processing page 128
Processing page 191

相反,我希望它首先处理第 1,2,3,4 页,然后按照列表中的顺序继续。有这样做的选项吗?

多处理是一种异步操作,这意味着它根据定义是非顺序的。线程(或在 python 的案例进程中)从您的列表中拉出 urls,并且无法保证哪个进程将首先完成。因此 url 1 可能会在 url 64 之前开始处理,但是由于网络 I/O 中的随机性,例如 url 64 可能会先完成。

首先问问自己是否真的需要按顺序执行这些操作。如果答案是肯定的,那么最好的办法是执行阻塞步骤——强制所有并行计算先完成,然后对完成的结果进行排序。

因此,如果您的 url 列表非常大,并且您想要一些顺序元素但仍然利用并行化,您可以将列表分块,然后按顺序 运行 每个块通过上面的并行逻辑。

如果您不传递参数 chunksize,那么 map 将使用此算法计算区块:

chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
if extra:
   chunksize += 1

它将您的可迭代对象切割成 task_batches 和 运行 在不同的进程中。这就是为什么它不按顺序排列的原因。解决方案是声明块大小等于 1。

import multiprocessing
import time

list_test = range(10)

def process(task):
    print "task:", task
    time.sleep(1)

pool = multiprocessing.Pool(processes=3)
pool.map(process, list_test, chunksize=1)

task: 0
task: 1
task: 2
task: 3
task: 4
task: 5
task: 6
task: 7
task: 8
task: 9