在 concurrent.futures.ProcessPoolExecutor map() 和 submit() 方法中使用 numpy.fromiter & numpy.array 的问题

Issues with using numpy.fromiter & numpy.array in concurrent.futures.ProcessPoolExecutor map() and submit() methods

背景:blog 报告使用 numpy.fromiter()numpy.array() 的速度优势。使用提供的脚本作为基础,我想在 python 的 concurrent.futures.ProcessPoolExecutor map()submit() 方法中执行时看到 numpy.fromiter() 的好处 class.

以下是我 2 秒的发现 运行:

  1. 很明显,当数组大小一般<256时,numpy.fromiter()numpy.array()快。
  2. 然而,numpy.fromiter()numpy.array() 的性能可能明显低于系列 运行,并且在由 map() 和 [= 执行时不一致python 中的 20=] 方法 concurrent.futures.ProcessPoolExecutor class.

问题: numpy.fromiter()numpy.array() 在 python 的 concurrent.futures.ProcessPoolExecutor [=73= 中的 map()submit() 方法中使用时的不一致和较差的性能是否可以】 避开?我怎样才能改进我的脚本?

下面给出了我用于此基准测试的 python 脚本。

地图():

#!/usr/bin/env python3.5
import concurrent.futures
from itertools import chain 
import time
import numpy as np
import pygal
from os import path

list_sizes = [2**x for x in range(1, 11)]
seconds = 2


def test(size_array):
    pyarray = [float(x) for x in range(size_array)]

    start = time.time()
    iterations = 0
    while time.time() - start <= seconds:
        np.fromiter(pyarray, dtype=np.float32, count=size_array)
        iterations += 1
    fromiter_count = iterations

    # array
    start = time.time()
    iterations = 0
    while time.time() - start <= seconds:
        np.array(pyarray, dtype=np.float32)
        iterations += 1
    array_count = iterations

    #return array_count, fromiter_count
    return size_array, array_count, fromiter_count


begin = time.time()
results = {}

with concurrent.futures.ProcessPoolExecutor(max_workers=6) as executor:
    data = list(chain.from_iterable(executor.map(test, list_sizes)))
    print('data = ', data)

for i in range( 0, len(data), 3 ):
    res = tuple(data[i+1:i+3])
    size_array = data[i]
    results[size_array] = res
    print("Result for size {} in {} seconds: {}".format(size_array,seconds,res))

out_folder = path.dirname(path.realpath(__file__))
print("Create diagrams in {}".format(out_folder))

chart = pygal.Line()
chart.title = "Performance in {} seconds".format(seconds)
chart.x_title = "Array size"
chart.y_title = "Iterations"

array_result = []
fromiter_result = []
x_axis = sorted(results.keys())
print(x_axis)
chart.x_labels = x_axis
chart.add('np.array', [results[x][0] for x in x_axis])
chart.add('np.fromiter', [results[x][1] for x in x_axis])
chart.render_to_png(path.join(out_folder, 'result_{}_concurrent_futures_map.png'.format(seconds)))

end = time.time()
compute_time = end - begin
print("Program Time = ", compute_time)

提交():

#!/usr/bin/env python3.5
import concurrent.futures
from itertools import chain 
import time
import numpy as np
import pygal
from os import path

list_sizes = [2**x for x in range(1, 11)]
seconds = 2


def test(size_array):
    pyarray = [float(x) for x in range(size_array)]

    start = time.time()
    iterations = 0
    while time.time() - start <= seconds:
        np.fromiter(pyarray, dtype=np.float32, count=size_array)
        iterations += 1
    fromiter_count = iterations

    # array
    start = time.time()
    iterations = 0
    while time.time() - start <= seconds:
        np.array(pyarray, dtype=np.float32)
        iterations += 1
    array_count = iterations

    return size_array, array_count, fromiter_count


begin = time.time()
results = {}

with concurrent.futures.ProcessPoolExecutor(max_workers=6) as executor:
    future_to_size_array = {executor.submit(test, size_array):size_array
                            for size_array in list_sizes}
    data = list(chain.from_iterable(
        f.result() for f in concurrent.futures.as_completed(future_to_size_array)))
    print('data = ', data)

for i in range( 0, len(data), 3 ):
    res = tuple(data[i+1:i+3])
    size_array = data[i]
    results[size_array] = res
    print("Result for size {} in {} seconds: {}".format(size_array,seconds,res))           

out_folder = path.dirname(path.realpath(__file__))
print("Create diagrams in {}".format(out_folder))

chart = pygal.Line()
chart.title = "Performance in {} seconds".format(seconds)
chart.x_title = "Array size"
chart.y_title = "Iterations"

x_axis = sorted(results.keys())
print(x_axis)
chart.x_labels = x_axis
chart.add('np.array', [results[x][0] for x in x_axis])
chart.add('np.fromiter', [results[x][1] for x in x_axis])
chart.render_to_png(path.join(out_folder, 'result_{}_concurrent_futures_submitv2.png'.format(seconds)))

end = time.time()
compute_time = end - begin
print("Program Time = ", compute_time)

连载:(对 original code 进行了细微改动)

#!/usr/bin/env python3.5
import time
import numpy as np
import pygal
from os import path

list_sizes = [2**x for x in range(1, 11)]
seconds = 2


def test(size_array):
    pyarray = [float(x) for x in range(size_array)]

    # fromiter
    start = time.time()
    iterations = 0
    while time.time() - start <= seconds:
        np.fromiter(pyarray, dtype=np.float32, count=size_array)
        iterations += 1
    fromiter_count = iterations

    # array
    start = time.time()
    iterations = 0
    while time.time() - start <= seconds:
        np.array(pyarray, dtype=np.float32)
        iterations += 1
    array_count = iterations

    return array_count, fromiter_count


begin = time.time()
results = {}

for size_array in list_sizes:
    res = test(size_array)
    results[size_array] = res
    print("Result for size {} in {} seconds: {}".format(size_array,seconds,res))

out_folder = path.dirname(path.realpath(__file__))
print("Create diagrams in {}".format(out_folder))

chart = pygal.Line()
chart.title = "Performance in {} seconds".format(seconds)
chart.x_title = "Array size"
chart.y_title = "Iterations"

x_axis = sorted(results.keys())
print(x_axis)
chart.x_labels = x_axis
chart.add('np.array', [results[x][0] for x in x_axis])
chart.add('np.fromiter', [results[x][1] for x in x_axis])
#chart.add('np.array', [x[0] for x in results.values()])
#chart.add('np.fromiter', [x[1] for x in results.values()])
chart.render_to_png(path.join(out_folder, 'result_{}_serial.png'.format(seconds)))

end = time.time()
compute_time = end - begin
print("Program Time = ", compute_time)

我之前遇到的 numpy.fromiter() 和 numpy.array() 表现不一致和糟糕的原因似乎是 concurrent.futures.ProcessPoolExecutor 使用的 个 CPU 相关联。我之前使用过 6 个 CPU。下图显示了使用 2、4、6 和 8 个 CPU 时 numpy.fromiter() 和 numpy.array() 的相应性能。这些图表显示存在可使用的最佳 CPU 数量。使用过多的 CPU(即 >4 个 CPU)可能不利于小数组大小(<512 个元素)。例如,与串行 运行 相比,>4 个 CPU 会导致性能下降(1/2 倍)甚至不一致的性能。