Python pool.map 元组列表

Python pool.map for list of tuples

我遇到了以下问题。我正在尝试重构我的代码,以便使用多线程处理 API 调用。我的核心数据是以下格式的简单元组列表:

lst = [('/Users/sth/photo1.jpg',
      '/Users/sth/photo2'),
      ('/Users/sth/photo1.jpg',
      '/Users/sth/photo3'), (...)]

我使用的函数获取 lst 列表并通过需要一对照片的 API 对其进行处理。毕竟每对返回一个数字。到目前为止,我正在使用一个循环将一个元组放入我的函数中并生成提到的数字。我想以一种方式并行化整个计算,即一个进程占用我的列表的一部分并调用批处理中元组的函数。为此,我尝试对多处理模块使用池函数:

from multiprocessing.dummy import Pool as ThreadPool 
pool = ThreadPool(2)
results = pool.map(score_function, lst)

但是,出现以下错误:

IOError: [Errno 2] No such file or directory: 'U'

这里发生了一些奇怪的事情。它试图将我的元组中的单个字符视为参数。任何想法如何正确地做到这一点?

谢谢

@edit

缺少 score_function 定义是我的错。让我更新问题:

def score_function(pairs):
    score_list = list()

    for pair in pairs:
       score = findElement(target = pair[0], source = pair[1])
       score_list.append([pair[0], pair[1], score])

    return score_list

其中 findElement 定义为:

def findElement(target, source):

    with open(source, 'rb') as source_:
        source_bytes = source_.read()

    with open(target, 'rb') as target_:
        target_bytes = target_.read()

    score = API_request(target_bytes = target_bytes,
                        source_bytes = source_bytes)
    return score

不知道你的 score_function 以及你如何定义和访问它,我猜你有类似

def score_function(param):
    with open(param[1], "r") as fp:
        ....

在那里。

错误表明你的参数根本不是一个元组而是一个字符串(param[1],如果param是一个元组,将是第二个元素,而param[1],如果param是一个字符串,将是是第二个字符,在你的例子中是 U 来自 /Users...)。放

print param

在那里,看看它是什么。要么你的 lst 完全错误并且缺少括号,并且 pool.map 将它展平并将每个单独的组件作为字符串发送到你的函数,要么你的 lst 中有一个有问题的条目不是元组。例如,如果有一个条目,而您忘记在它周围加上括号,它会将每个单独的字符串作为参数发送

lst = [('/Users/bar/photo1.jpg', '/Users/bar/photo2'),
       ('/Users/bar/photo2.jpg', '/Users/bar/photo3'),
       '/Users/bar/photo3.jpg', '/Users/bar/photo4',
       (...., .....), (...., .....)]

它可以处理前两个,但第三个(和第四个,因为它现在是一个单独的条目,而不是元组中的第二个组件)会导致您现在遇到的错误。无论哪种方式,打印出您的 score_function 收到的参数应该可以帮助您找出问题所在。您对 pool.map 的使用是正确的,它应该可以工作,假设您的输入符合您的预期。

你的问题是 for 循环。它将您的元组分解为单独的字符串。这样做它应该工作:

def score_function(pairs):
    score_list = list()

    score = findElement(target = pairs[0], source = pairs[1])
    score_list.append([pairs[0], pairs[1], score])

    return score_list

您可能假设您的 score_function 会收到 lst 变量作为参数。这不会发生。 lst 需要像您的情况一样是一个列表,并且 pool.map 会自动将其拆分为各个元素并将一个元素准确地提供给 score_function 并一直这样做直到整个列表被由你的工人处理。对 worker 的每次调用都只接收它应该处理的一个元素作为参数。您的各个元素是元组 (path1, path2),当您在此元组上调用 for 时,您在循环中只会收到一个路径(字符串),并且 pair[1] 只是该字符串的第二个字符。

希望这对您有所帮助。

您可以使用 starmap 函数代替地图,如下所示:

from multiprocessing import Pool 
pool = Pool(processes=4)
results = pool.starmap(score_function, lst)
pool.close()
pool.join()