Python 每个线程具有不同常量的多处理
Python multiprocessing with different constant for each thread
我想用调用 boto3 的函数创建一个池 api 并为每个线程使用不同的存储桶名称:
我的函数是:
def face_reko(source_data, target_data):
bucket = s3.Bucket(bucket_name)
for key in bucket.objects.all():
key.delete()
s3_client.put_object(Bucket=bucket_name, Key=target_img, Body=target_data)
s3_client.put_object(Bucket=bucket_name, Key=source_img, Body=source_data)
response = reko.compare_faces(
SourceImage={
'S3Object': {
'Bucket': bucket_name,
'Name' : source_img
}
},
TargetImage={
'S3Object' : {
'Bucket' : bucket_name,
'Name' : target_img
}
}
)
if len(response['FaceMatches']) > 0:
return True
else:
return False
所以基本上它会删除存储桶中的所有内容,上传 2 张新图像,然后使用 Rekognition api 比较这 2 张图像。由于我无法在同一个存储桶中两次创建相同的图像,我想为每个线程创建一个存储桶,然后将一个常量传递给存储桶名称的函数,而不是 bucket_name
const.
所以最后我找到了解决我的问题的方法。我没有将函数映射到池,而是创建了一个 Worker
class 定义如下:
class Worker():
def __init__(self, proc_number, splited_list):
self.pool = Pool(proc_number)
self.proc_number = proc_number
self.splited_list = splited_list
def callback(self, result):
if result:
self.pool.terminate()
def do_job(self):
for i in range(self.proc_number):
self.pool.apply_async(face_reko, args=(self.splited_list[i],source_data, i), callback=self.callback)
self.pool.close()
self.pool.join()
所以 Worker
obj 是由许多进程和一个列表列表构成的(splitted_list
是我的主列表,分为 number_of_proc
)。然后当 do_job
函数被调用时,池开始创建具有 id i
的进程,可以在 face_reko
函数中使用。 face_reko
returns True
时池停止。要启动 Worker.pool
,只需创建一个 Worker
并像这样调用 do_job
函数:
w = Worker(proc_number=proc_number, splited_list=splited_list)
w.do_job()
希望对其他人有所帮助!
我想用调用 boto3 的函数创建一个池 api 并为每个线程使用不同的存储桶名称:
我的函数是:
def face_reko(source_data, target_data):
bucket = s3.Bucket(bucket_name)
for key in bucket.objects.all():
key.delete()
s3_client.put_object(Bucket=bucket_name, Key=target_img, Body=target_data)
s3_client.put_object(Bucket=bucket_name, Key=source_img, Body=source_data)
response = reko.compare_faces(
SourceImage={
'S3Object': {
'Bucket': bucket_name,
'Name' : source_img
}
},
TargetImage={
'S3Object' : {
'Bucket' : bucket_name,
'Name' : target_img
}
}
)
if len(response['FaceMatches']) > 0:
return True
else:
return False
所以基本上它会删除存储桶中的所有内容,上传 2 张新图像,然后使用 Rekognition api 比较这 2 张图像。由于我无法在同一个存储桶中两次创建相同的图像,我想为每个线程创建一个存储桶,然后将一个常量传递给存储桶名称的函数,而不是 bucket_name
const.
所以最后我找到了解决我的问题的方法。我没有将函数映射到池,而是创建了一个 Worker
class 定义如下:
class Worker():
def __init__(self, proc_number, splited_list):
self.pool = Pool(proc_number)
self.proc_number = proc_number
self.splited_list = splited_list
def callback(self, result):
if result:
self.pool.terminate()
def do_job(self):
for i in range(self.proc_number):
self.pool.apply_async(face_reko, args=(self.splited_list[i],source_data, i), callback=self.callback)
self.pool.close()
self.pool.join()
所以 Worker
obj 是由许多进程和一个列表列表构成的(splitted_list
是我的主列表,分为 number_of_proc
)。然后当 do_job
函数被调用时,池开始创建具有 id i
的进程,可以在 face_reko
函数中使用。 face_reko
returns True
时池停止。要启动 Worker.pool
,只需创建一个 Worker
并像这样调用 do_job
函数:
w = Worker(proc_number=proc_number, splited_list=splited_list)
w.do_job()
希望对其他人有所帮助!