了解并发进程

Understanding a concurrent process

我正在构建一个执行特定 IO-bound 任务的脚本。我需要它来尝试下载大量数据集,在丢弃数据本身之前记录有关它们大小的某些信息。

问题是我从中获取此数据的来源不提供 content-length headers,因此无法事先知道文件大小。这要求我找到一种方法来监控下载过程需要多长时间,并有一种方法可以终止该过程并继续进行其他此类过程,以防它花费太长时间(例如超过 60 秒)。这是必要的,以避免在非常大的数据集上获得 "stuck"。

requests 不提供此功能 built-in,在花费大量时间搜索解决方案后,我决定 运行 并发进程通过 pebble library. My understanding is that this is a small extension to the standard lib multiprocessing 模块添加了一些安全功能,即错误处理和超时(这正是我想要的)。

基于 Process pool 示例,这是我的代码:

try:
    with ProcessPool(max_workers=4) as pool:
        iterator = pool.map(get_data, process_tuples[3:6], timeout=10)

        while True:
            try:
                rows, cols, filesize, i = next(iterator)
                datasets[i]['rows'] = rows
                datasets[i]['columns'] = cols
                datasets[i]['filesize'] = filesize
            except TimeoutError as error:
                print("Function took longer than %d seconds. Skipping responsible endpoint..." % error.args[1])
            except StopIteration:
                break
finally:
    with open("../../../data/" + FILE_SLUG + "/glossaries/geospatial.json", "w") as fp:
        json.dump(datasets, fp, indent=4)

但这在两个方面与预期行为不同:

  1. 我原以为 timeout=10 限制了每个下载过程(由 get_data 完成)所花费的时间。但是,当我 运行 在一个大文件上执行此操作时,我收到一个 TimeoutError 消息,表明我的进程已花费超过 30 秒。 30 是我输入长度的 3 倍;那根本不是我想要的。那里发生了什么?
  2. TimeoutError 被引发时,不是丢弃 运行 并移动到下一个(我想要的)过程跳到 finally 块(我不'想要)。我认为这是对我第一个问题的回答的结果。

其实在requests中你可以设置stream=True and use Response.iter_content()来进一步控制工作流程。

对于您的情况,我们可以在 downloading/iterating 处理响应数据时跟踪经过的时间:

import time
import requests

def get_content(url, timeout):
    """
    Get response data from url before timeout
    """
    start = time.time()
    data = ''
    response = requests.get(url, stream=True)

    for chunk in response.iter_content(chunk_size = 1024): # You can set a bigger chunk_size for less iterations
        if (time.time() - start) > timeout:
            response.close()
            return {'TimedOut': True, 'data': None}
        else:
            data += chunk

    response.close()
    return {'TimedOut': False, 'data': data}

所以基本上你设置一个timeout值,如果数据太大或者网络太慢,一旦花费超过timeout就会返回结果,那些不完整的数据将被垃圾收集。

接下来因为是IO-bound的任务,我们可以用threading或者multiprocessing来完成这个工作,下面是一个使用threading

的例子
import threading, Queue

def worker(queue):
    while not queue.empty():
        url = queue.get()

        result = get_content(url, 60)

        # Do other stuff

if __name__ == '__main__':
    limit = 10 # number of threads to use
    thread_pool = [None] * limit
    queue = Queue.Queue()
    urls = ['xxxx', 'xxxxx']

    for url in urls:
        queue.put(url)

    for thread in thread_pool:
        thread = threading.Thread(target=worker, args=(queue, ))
        thread.start()