了解并发进程
Understanding a concurrent process
我正在构建一个执行特定 IO-bound 任务的脚本。我需要它来尝试下载大量数据集,在丢弃数据本身之前记录有关它们大小的某些信息。
问题是我从中获取此数据的来源不提供 content-length
headers,因此无法事先知道文件大小。这要求我找到一种方法来监控下载过程需要多长时间,并有一种方法可以终止该过程并继续进行其他此类过程,以防它花费太长时间(例如超过 60 秒)。这是必要的,以避免在非常大的数据集上获得 "stuck"。
requests
不提供此功能 built-in,在花费大量时间搜索解决方案后,我决定 运行 并发进程通过 pebble
library. My understanding is that this is a small extension to the standard lib multiprocessing
模块添加了一些安全功能,即错误处理和超时(这正是我想要的)。
基于 Process pool 示例,这是我的代码:
try:
with ProcessPool(max_workers=4) as pool:
iterator = pool.map(get_data, process_tuples[3:6], timeout=10)
while True:
try:
rows, cols, filesize, i = next(iterator)
datasets[i]['rows'] = rows
datasets[i]['columns'] = cols
datasets[i]['filesize'] = filesize
except TimeoutError as error:
print("Function took longer than %d seconds. Skipping responsible endpoint..." % error.args[1])
except StopIteration:
break
finally:
with open("../../../data/" + FILE_SLUG + "/glossaries/geospatial.json", "w") as fp:
json.dump(datasets, fp, indent=4)
但这在两个方面与预期行为不同:
- 我原以为
timeout=10
限制了每个下载过程(由 get_data
完成)所花费的时间。但是,当我 运行 在一个大文件上执行此操作时,我收到一个 TimeoutError
消息,表明我的进程已花费超过 30 秒。 30 是我输入长度的 3 倍;那根本不是我想要的。那里发生了什么?
- 当
TimeoutError
被引发时,不是丢弃 运行 并移动到下一个(我想要的)过程跳到 finally
块(我不'想要)。我认为这是对我第一个问题的回答的结果。
其实在requests
中你可以设置stream=True
and use Response.iter_content()
来进一步控制工作流程。
对于您的情况,我们可以在 downloading/iterating 处理响应数据时跟踪经过的时间:
import time
import requests
def get_content(url, timeout):
"""
Get response data from url before timeout
"""
start = time.time()
data = ''
response = requests.get(url, stream=True)
for chunk in response.iter_content(chunk_size = 1024): # You can set a bigger chunk_size for less iterations
if (time.time() - start) > timeout:
response.close()
return {'TimedOut': True, 'data': None}
else:
data += chunk
response.close()
return {'TimedOut': False, 'data': data}
所以基本上你设置一个timeout
值,如果数据太大或者网络太慢,一旦花费超过timeout
就会返回结果,那些不完整的数据将被垃圾收集。
接下来因为是IO-bound的任务,我们可以用threading
或者multiprocessing
来完成这个工作,下面是一个使用threading
的例子
import threading, Queue
def worker(queue):
while not queue.empty():
url = queue.get()
result = get_content(url, 60)
# Do other stuff
if __name__ == '__main__':
limit = 10 # number of threads to use
thread_pool = [None] * limit
queue = Queue.Queue()
urls = ['xxxx', 'xxxxx']
for url in urls:
queue.put(url)
for thread in thread_pool:
thread = threading.Thread(target=worker, args=(queue, ))
thread.start()
我正在构建一个执行特定 IO-bound 任务的脚本。我需要它来尝试下载大量数据集,在丢弃数据本身之前记录有关它们大小的某些信息。
问题是我从中获取此数据的来源不提供 content-length
headers,因此无法事先知道文件大小。这要求我找到一种方法来监控下载过程需要多长时间,并有一种方法可以终止该过程并继续进行其他此类过程,以防它花费太长时间(例如超过 60 秒)。这是必要的,以避免在非常大的数据集上获得 "stuck"。
requests
不提供此功能 built-in,在花费大量时间搜索解决方案后,我决定 运行 并发进程通过 pebble
library. My understanding is that this is a small extension to the standard lib multiprocessing
模块添加了一些安全功能,即错误处理和超时(这正是我想要的)。
基于 Process pool 示例,这是我的代码:
try:
with ProcessPool(max_workers=4) as pool:
iterator = pool.map(get_data, process_tuples[3:6], timeout=10)
while True:
try:
rows, cols, filesize, i = next(iterator)
datasets[i]['rows'] = rows
datasets[i]['columns'] = cols
datasets[i]['filesize'] = filesize
except TimeoutError as error:
print("Function took longer than %d seconds. Skipping responsible endpoint..." % error.args[1])
except StopIteration:
break
finally:
with open("../../../data/" + FILE_SLUG + "/glossaries/geospatial.json", "w") as fp:
json.dump(datasets, fp, indent=4)
但这在两个方面与预期行为不同:
- 我原以为
timeout=10
限制了每个下载过程(由get_data
完成)所花费的时间。但是,当我 运行 在一个大文件上执行此操作时,我收到一个TimeoutError
消息,表明我的进程已花费超过 30 秒。 30 是我输入长度的 3 倍;那根本不是我想要的。那里发生了什么? - 当
TimeoutError
被引发时,不是丢弃 运行 并移动到下一个(我想要的)过程跳到finally
块(我不'想要)。我认为这是对我第一个问题的回答的结果。
其实在requests
中你可以设置stream=True
and use Response.iter_content()
来进一步控制工作流程。
对于您的情况,我们可以在 downloading/iterating 处理响应数据时跟踪经过的时间:
import time
import requests
def get_content(url, timeout):
"""
Get response data from url before timeout
"""
start = time.time()
data = ''
response = requests.get(url, stream=True)
for chunk in response.iter_content(chunk_size = 1024): # You can set a bigger chunk_size for less iterations
if (time.time() - start) > timeout:
response.close()
return {'TimedOut': True, 'data': None}
else:
data += chunk
response.close()
return {'TimedOut': False, 'data': data}
所以基本上你设置一个timeout
值,如果数据太大或者网络太慢,一旦花费超过timeout
就会返回结果,那些不完整的数据将被垃圾收集。
接下来因为是IO-bound的任务,我们可以用threading
或者multiprocessing
来完成这个工作,下面是一个使用threading
import threading, Queue
def worker(queue):
while not queue.empty():
url = queue.get()
result = get_content(url, 60)
# Do other stuff
if __name__ == '__main__':
limit = 10 # number of threads to use
thread_pool = [None] * limit
queue = Queue.Queue()
urls = ['xxxx', 'xxxxx']
for url in urls:
queue.put(url)
for thread in thread_pool:
thread = threading.Thread(target=worker, args=(queue, ))
thread.start()