Python 如何使用 multiprocessing.pool 并行下载多个文件
Python How to download multiple files in parallel using multiprocessing.pool
我正在尝试使用 multiprocessing.Pool
下载和提取 zip 文件。但是每次我执行脚本时,只会下载 3 个 zips,而看不到剩余的文件在目录中(CPU % 也接近 100%)。有人可以帮助我如何解决这个 problem/suggest 更好的方法并遵循我尝试过的代码片段。我对多处理完全陌生。我的目标是在不达到最大值 CPU 的情况下并行下载多个文件。
import StringIO
import os
import sys
import zipfile
from multiprocessing import Pool, cpu_count
import requests
filePath = os.path.dirname(os.path.abspath(__file__))
print("filePath is %s " % filePath)
sys.path.append(filePath)
url = ["http://mlg.ucd.ie/files/datasets/multiview_data_20130124.zip",
"http://mlg.ucd.ie/files/datasets/movielists_20130821.zip",
"http://mlg.ucd.ie/files/datasets/bbcsport.zip",
"http://mlg.ucd.ie/files/datasets/movielists_20130821.zip",
"http://mlg.ucd.ie/files/datasets/3sources.zip"]
def download_zips(url):
file_name = url.split("/")[-1]
response = requests.get(url)
sourceZip = zipfile.ZipFile(StringIO.StringIO(response.content))
print("\n Downloaded {} ".format(file_name))
sourceZip.extractall(filePath)
print("extracted {} \n".format(file_name))
sourceZip.close()
if __name__ == "__main__":
print("There are {} CPUs on this machine ".format(cpu_count()))
pool = Pool(cpu_count())
results = pool.map(download_zips, url)
pool.close()
pool.join()
输出低于
filePath is C:\Users\Documents\GitHub\Python-Examples-Internet\multi_processing
There are 4 CPUs on this machine
filePath is C:\Users\Documents\GitHub\Python-Examples-Internet\multi_processing
filePath is C:\Users\Documents\GitHub\Python-Examples-Internet\multi_processing
filePath is C:\Users\Documents\GitHub\Python-Examples-Internet\multi_processing
filePath is C:\Users\Documents\GitHub\Python-Examples-Internet\multi_processing
Downloaded bbcsport.zip
extracted bbcsport.zip
Downloaded 3sources.zip
extracted 3sources.zip
Downloaded multiview_data_20130124.zip
Downloaded movielists_20130821.zip
Downloaded movielists_20130821.zip
extracted multiview_data_20130124.zip
extracted movielists_20130821.zip
extracted movielists_20130821.zip
我对您的功能进行了一些小改动,并且运行良好。请注意:
- 文件
".../movielists_20130821.zip"
在您的列表中出现了两次,所以您下载了同一个东西两次(可能是打字错误?)
- 文件
".../multiview_data_20130124.zip"
、".../movielists_20130821.zip"
和 ".../3sources.zip"
解压缩后会生成一个新目录。但是,文件 ".../bbcsport.zip"
在解压缩后会将其文件放在根文件夹中,即您当前的工作目录(请参见下图)。也许您错过了这张支票?
- 我在下载函数中添加了一个 try/except 块。为什么?多处理通过为 运行 东西创建新的(子)进程来工作。 如果子进程抛出异常,父进程不会捕获它。所以如果在这个子流程中出现任何错误,那肯定是logged/handled那里。
import sys, os
import zipfile
import requests
from multiprocessing import Pool, cpu_count
from functools import partial
from io import BytesIO
def download_zip(url, filePath):
try:
file_name = url.split("/")[-1]
response = requests.get(url)
sourceZip = zipfile.ZipFile(BytesIO(response.content))
print(" Downloaded {} ".format(file_name))
sourceZip.extractall(filePath)
print(" extracted {}".format(file_name))
sourceZip.close()
except Exception as e:
print(e)
if __name__ == "__main__":
filePath = os.path.dirname(os.path.abspath(__file__))
print("filePath is %s " % filePath)
# sys.path.append(filePath) # why do you need this?
urls = ["http://mlg.ucd.ie/files/datasets/multiview_data_20130124.zip",
"http://mlg.ucd.ie/files/datasets/movielists_20130821.zip",
"http://mlg.ucd.ie/files/datasets/bbcsport.zip",
"http://mlg.ucd.ie/files/datasets/movielists_20130821.zip",
"http://mlg.ucd.ie/files/datasets/3sources.zip"]
print("There are {} CPUs on this machine ".format(cpu_count()))
pool = Pool(cpu_count())
download_func = partial(download_zip, filePath = filePath)
results = pool.map(download_func, urls)
pool.close()
pool.join()
我建议你使用多线程来完成它,因为它是一个 I/O 绑定,如下所示:
import requests, zipfile, io
import concurrent.futures
url = ["http://mlg.ucd.ie/files/datasets/multiview_data_20130124.zip",
"http://mlg.ucd.ie/files/datasets/movielists_20130821.zip",
"http://mlg.ucd.ie/files/datasets/bbcsport.zip",
"http://mlg.ucd.ie/files/datasets/movielists_20130821.zip",
"http://mlg.ucd.ie/files/datasets/3sources.zip"]
def download_zips(url):
file_name = url.split("/")[-1]
response = requests.get(url)
sourceZip = zipfile.ZipFile(io.BytesIO(response.content))
print("\n Downloaded {} ".format(file_name))
sourceZip.extractall(filePath)
print("extracted {} \n".format(file_name))
sourceZip.close()
with concurrent.futures.ThreadPoolExecutor() as exector :
exector.map(download_zip, urls)
我正在尝试使用 multiprocessing.Pool
下载和提取 zip 文件。但是每次我执行脚本时,只会下载 3 个 zips,而看不到剩余的文件在目录中(CPU % 也接近 100%)。有人可以帮助我如何解决这个 problem/suggest 更好的方法并遵循我尝试过的代码片段。我对多处理完全陌生。我的目标是在不达到最大值 CPU 的情况下并行下载多个文件。
import StringIO
import os
import sys
import zipfile
from multiprocessing import Pool, cpu_count
import requests
filePath = os.path.dirname(os.path.abspath(__file__))
print("filePath is %s " % filePath)
sys.path.append(filePath)
url = ["http://mlg.ucd.ie/files/datasets/multiview_data_20130124.zip",
"http://mlg.ucd.ie/files/datasets/movielists_20130821.zip",
"http://mlg.ucd.ie/files/datasets/bbcsport.zip",
"http://mlg.ucd.ie/files/datasets/movielists_20130821.zip",
"http://mlg.ucd.ie/files/datasets/3sources.zip"]
def download_zips(url):
file_name = url.split("/")[-1]
response = requests.get(url)
sourceZip = zipfile.ZipFile(StringIO.StringIO(response.content))
print("\n Downloaded {} ".format(file_name))
sourceZip.extractall(filePath)
print("extracted {} \n".format(file_name))
sourceZip.close()
if __name__ == "__main__":
print("There are {} CPUs on this machine ".format(cpu_count()))
pool = Pool(cpu_count())
results = pool.map(download_zips, url)
pool.close()
pool.join()
输出低于
filePath is C:\Users\Documents\GitHub\Python-Examples-Internet\multi_processing
There are 4 CPUs on this machine
filePath is C:\Users\Documents\GitHub\Python-Examples-Internet\multi_processing
filePath is C:\Users\Documents\GitHub\Python-Examples-Internet\multi_processing
filePath is C:\Users\Documents\GitHub\Python-Examples-Internet\multi_processing
filePath is C:\Users\Documents\GitHub\Python-Examples-Internet\multi_processing
Downloaded bbcsport.zip
extracted bbcsport.zip
Downloaded 3sources.zip
extracted 3sources.zip
Downloaded multiview_data_20130124.zip
Downloaded movielists_20130821.zip
Downloaded movielists_20130821.zip
extracted multiview_data_20130124.zip
extracted movielists_20130821.zip
extracted movielists_20130821.zip
我对您的功能进行了一些小改动,并且运行良好。请注意:
- 文件
".../movielists_20130821.zip"
在您的列表中出现了两次,所以您下载了同一个东西两次(可能是打字错误?) - 文件
".../multiview_data_20130124.zip"
、".../movielists_20130821.zip"
和".../3sources.zip"
解压缩后会生成一个新目录。但是,文件".../bbcsport.zip"
在解压缩后会将其文件放在根文件夹中,即您当前的工作目录(请参见下图)。也许您错过了这张支票? - 我在下载函数中添加了一个 try/except 块。为什么?多处理通过为 运行 东西创建新的(子)进程来工作。 如果子进程抛出异常,父进程不会捕获它。所以如果在这个子流程中出现任何错误,那肯定是logged/handled那里。
import sys, os
import zipfile
import requests
from multiprocessing import Pool, cpu_count
from functools import partial
from io import BytesIO
def download_zip(url, filePath):
try:
file_name = url.split("/")[-1]
response = requests.get(url)
sourceZip = zipfile.ZipFile(BytesIO(response.content))
print(" Downloaded {} ".format(file_name))
sourceZip.extractall(filePath)
print(" extracted {}".format(file_name))
sourceZip.close()
except Exception as e:
print(e)
if __name__ == "__main__":
filePath = os.path.dirname(os.path.abspath(__file__))
print("filePath is %s " % filePath)
# sys.path.append(filePath) # why do you need this?
urls = ["http://mlg.ucd.ie/files/datasets/multiview_data_20130124.zip",
"http://mlg.ucd.ie/files/datasets/movielists_20130821.zip",
"http://mlg.ucd.ie/files/datasets/bbcsport.zip",
"http://mlg.ucd.ie/files/datasets/movielists_20130821.zip",
"http://mlg.ucd.ie/files/datasets/3sources.zip"]
print("There are {} CPUs on this machine ".format(cpu_count()))
pool = Pool(cpu_count())
download_func = partial(download_zip, filePath = filePath)
results = pool.map(download_func, urls)
pool.close()
pool.join()
我建议你使用多线程来完成它,因为它是一个 I/O 绑定,如下所示:
import requests, zipfile, io
import concurrent.futures
url = ["http://mlg.ucd.ie/files/datasets/multiview_data_20130124.zip",
"http://mlg.ucd.ie/files/datasets/movielists_20130821.zip",
"http://mlg.ucd.ie/files/datasets/bbcsport.zip",
"http://mlg.ucd.ie/files/datasets/movielists_20130821.zip",
"http://mlg.ucd.ie/files/datasets/3sources.zip"]
def download_zips(url):
file_name = url.split("/")[-1]
response = requests.get(url)
sourceZip = zipfile.ZipFile(io.BytesIO(response.content))
print("\n Downloaded {} ".format(file_name))
sourceZip.extractall(filePath)
print("extracted {} \n".format(file_name))
sourceZip.close()
with concurrent.futures.ThreadPoolExecutor() as exector :
exector.map(download_zip, urls)