Python pool.map 元组列表
Python pool.map for list of tuples
我遇到了以下问题。我正在尝试重构我的代码,以便使用多线程处理 API 调用。我的核心数据是以下格式的简单元组列表:
lst = [('/Users/sth/photo1.jpg',
'/Users/sth/photo2'),
('/Users/sth/photo1.jpg',
'/Users/sth/photo3'), (...)]
我使用的函数获取 lst 列表并通过需要一对照片的 API 对其进行处理。毕竟每对返回一个数字。到目前为止,我正在使用一个循环将一个元组放入我的函数中并生成提到的数字。我想以一种方式并行化整个计算,即一个进程占用我的列表的一部分并调用批处理中元组的函数。为此,我尝试对多处理模块使用池函数:
from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(2)
results = pool.map(score_function, lst)
但是,出现以下错误:
IOError: [Errno 2] No such file or directory: 'U'
这里发生了一些奇怪的事情。它试图将我的元组中的单个字符视为参数。任何想法如何正确地做到这一点?
谢谢
@edit
缺少 score_function 定义是我的错。让我更新问题:
def score_function(pairs):
score_list = list()
for pair in pairs:
score = findElement(target = pair[0], source = pair[1])
score_list.append([pair[0], pair[1], score])
return score_list
其中 findElement 定义为:
def findElement(target, source):
with open(source, 'rb') as source_:
source_bytes = source_.read()
with open(target, 'rb') as target_:
target_bytes = target_.read()
score = API_request(target_bytes = target_bytes,
source_bytes = source_bytes)
return score
不知道你的 score_function 以及你如何定义和访问它,我猜你有类似
def score_function(param):
with open(param[1], "r") as fp:
....
在那里。
错误表明你的参数根本不是一个元组而是一个字符串(param[1],如果param是一个元组,将是第二个元素,而param[1],如果param是一个字符串,将是是第二个字符,在你的例子中是 U 来自 /Users...)。放
print param
在那里,看看它是什么。要么你的 lst 完全错误并且缺少括号,并且 pool.map 将它展平并将每个单独的组件作为字符串发送到你的函数,要么你的 lst 中有一个有问题的条目不是元组。例如,如果有一个条目,而您忘记在它周围加上括号,它会将每个单独的字符串作为参数发送
lst = [('/Users/bar/photo1.jpg', '/Users/bar/photo2'),
('/Users/bar/photo2.jpg', '/Users/bar/photo3'),
'/Users/bar/photo3.jpg', '/Users/bar/photo4',
(...., .....), (...., .....)]
它可以处理前两个,但第三个(和第四个,因为它现在是一个单独的条目,而不是元组中的第二个组件)会导致您现在遇到的错误。无论哪种方式,打印出您的 score_function
收到的参数应该可以帮助您找出问题所在。您对 pool.map 的使用是正确的,它应该可以工作,假设您的输入符合您的预期。
你的问题是 for 循环。它将您的元组分解为单独的字符串。这样做它应该工作:
def score_function(pairs):
score_list = list()
score = findElement(target = pairs[0], source = pairs[1])
score_list.append([pairs[0], pairs[1], score])
return score_list
您可能假设您的 score_function
会收到 lst
变量作为参数。这不会发生。 lst
需要像您的情况一样是一个列表,并且 pool.map 会自动将其拆分为各个元素并将一个元素准确地提供给 score_function
并一直这样做直到整个列表被由你的工人处理。对 worker 的每次调用都只接收它应该处理的一个元素作为参数。您的各个元素是元组 (path1, path2),当您在此元组上调用 for
时,您在循环中只会收到一个路径(字符串),并且 pair[1] 只是该字符串的第二个字符。
希望这对您有所帮助。
您可以使用 starmap 函数代替地图,如下所示:
from multiprocessing import Pool
pool = Pool(processes=4)
results = pool.starmap(score_function, lst)
pool.close()
pool.join()
我遇到了以下问题。我正在尝试重构我的代码,以便使用多线程处理 API 调用。我的核心数据是以下格式的简单元组列表:
lst = [('/Users/sth/photo1.jpg',
'/Users/sth/photo2'),
('/Users/sth/photo1.jpg',
'/Users/sth/photo3'), (...)]
我使用的函数获取 lst 列表并通过需要一对照片的 API 对其进行处理。毕竟每对返回一个数字。到目前为止,我正在使用一个循环将一个元组放入我的函数中并生成提到的数字。我想以一种方式并行化整个计算,即一个进程占用我的列表的一部分并调用批处理中元组的函数。为此,我尝试对多处理模块使用池函数:
from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(2)
results = pool.map(score_function, lst)
但是,出现以下错误:
IOError: [Errno 2] No such file or directory: 'U'
这里发生了一些奇怪的事情。它试图将我的元组中的单个字符视为参数。任何想法如何正确地做到这一点?
谢谢
@edit
缺少 score_function 定义是我的错。让我更新问题:
def score_function(pairs):
score_list = list()
for pair in pairs:
score = findElement(target = pair[0], source = pair[1])
score_list.append([pair[0], pair[1], score])
return score_list
其中 findElement 定义为:
def findElement(target, source):
with open(source, 'rb') as source_:
source_bytes = source_.read()
with open(target, 'rb') as target_:
target_bytes = target_.read()
score = API_request(target_bytes = target_bytes,
source_bytes = source_bytes)
return score
不知道你的 score_function 以及你如何定义和访问它,我猜你有类似
def score_function(param):
with open(param[1], "r") as fp:
....
在那里。
错误表明你的参数根本不是一个元组而是一个字符串(param[1],如果param是一个元组,将是第二个元素,而param[1],如果param是一个字符串,将是是第二个字符,在你的例子中是 U 来自 /Users...)。放
print param
在那里,看看它是什么。要么你的 lst 完全错误并且缺少括号,并且 pool.map 将它展平并将每个单独的组件作为字符串发送到你的函数,要么你的 lst 中有一个有问题的条目不是元组。例如,如果有一个条目,而您忘记在它周围加上括号,它会将每个单独的字符串作为参数发送
lst = [('/Users/bar/photo1.jpg', '/Users/bar/photo2'),
('/Users/bar/photo2.jpg', '/Users/bar/photo3'),
'/Users/bar/photo3.jpg', '/Users/bar/photo4',
(...., .....), (...., .....)]
它可以处理前两个,但第三个(和第四个,因为它现在是一个单独的条目,而不是元组中的第二个组件)会导致您现在遇到的错误。无论哪种方式,打印出您的 score_function
收到的参数应该可以帮助您找出问题所在。您对 pool.map 的使用是正确的,它应该可以工作,假设您的输入符合您的预期。
你的问题是 for 循环。它将您的元组分解为单独的字符串。这样做它应该工作:
def score_function(pairs):
score_list = list()
score = findElement(target = pairs[0], source = pairs[1])
score_list.append([pairs[0], pairs[1], score])
return score_list
您可能假设您的 score_function
会收到 lst
变量作为参数。这不会发生。 lst
需要像您的情况一样是一个列表,并且 pool.map 会自动将其拆分为各个元素并将一个元素准确地提供给 score_function
并一直这样做直到整个列表被由你的工人处理。对 worker 的每次调用都只接收它应该处理的一个元素作为参数。您的各个元素是元组 (path1, path2),当您在此元组上调用 for
时,您在循环中只会收到一个路径(字符串),并且 pair[1] 只是该字符串的第二个字符。
希望这对您有所帮助。
您可以使用 starmap 函数代替地图,如下所示:
from multiprocessing import Pool
pool = Pool(processes=4)
results = pool.starmap(score_function, lst)
pool.close()
pool.join()