使用 ThreadPoolExecutor 时避免竞争条件

Avoiding race condition while using ThreadPoolExecutor

我有以下方法 concurrent_api_call_and_processing() 采用以下参数:

我做了大约 500 个 HTTP 请求,一个用于 lst 中的每个 id,使用 api_call() 然后,如果使用解析 XLM 的本地方法 callback_processing() 处理每个响应,并且 returns 一个元组

def concurrent_api_call_and_processing(api_call=None, callback_processing=None, lst=None, workers=5):
    """
    :param api_call: Function that will be called concurrently. An API call to API_Provider for each entry.
    : param lst: List of finding's ids needed by the API function to call API_Provider endpoint.
    :param callback_processing: Function that will be called after we get the response from the above  API call.
    : param workers: Number of concurrent threads that will be used.
    :return: array of tuples containing the details of each particular finding.
    """

    output = Queue()
    with ThreadPoolExecutor(max_workers=workers) as executor:
        future_to_f_detail = {executor.submit(api_call, id): id for id in lst}
        for future in as_completed(future_to_f_detail):
            try:
                find_details = future.result()
            except Exception as exc:
                print(f"Finding {id} generated and exception: {exc}")
            else:
                f_det = callback_processing(find_details)
                output.put(f_det)
    return output

我在使用此方法时开始注意到一些随机问题(不是正常终止)。

由于我使用的是数组而不是队列 (output=[]),但不确定我是否会出现竞争条件,因此我决定重构代码并开始使用 Queue (output=Queue)

我的问题是:

注意:我想指出,在 Raymond Hettinger Keynote on Concurrency, PyBay 2017 之后,我添加了 fuzz() 用于测试的睡眠方法,但无法确定我是否确实存在竞争条件。

我认为没有足够的信息可以确定这一点。

考虑一下如果传入一个递增全局变量的 api_call 函数会发生什么:

count = 0
def api_call_fn():
  global count 
  count += 1

当它被并发执行时,它会有一个递增 count 变量的竞争条件。

callback_processing 函数也是如此。


为了审核此代码是否无竞争条件,我们必须查看这两个函数的定义:)

Under the above conditions, there won't be a race condition on that code. As per concurrent.futures docs here what happens is this:

  1. executor.submit(): Returns a Future object representing the execution of the callable.
  2. as_completed(future_to_f_detail): Returns an iterator over the Future instances given by future_to_f_detail that yields futures as they complete (finished or canceled futures).

So indeed the for loop is consuming the iterator and returning one by one every future that is yield by as_completed()

So unless the call_back() or the function we called introduce some kind of async functionality ( as the example described by @dm03514 above), we are just working synchronously after the for loop

   counter = 0
   with ThreadPoolExecutor(max_workers=workers) as executor:
        future_to_f_detail = {executor.submit(api_call, id): id for id in lst}
        for future in as_completed(future_to_f_detail):
            print(f"Entering the for loop for {counter+1} time") 
            counter +=1
            try:
                find_details = future.result()
            except Exception as exc:
                print(f"Finding {id} generated and exception: {exc}")
            else:
                f_det = callback_processing(find_details)
                output.append(f_det)
    return output

If we have an array of 500 ids and we do 500 calls and all calls yield a future, we will print the message in the print 500 time, once each time before entering the try loop.

We are not forced to use a Queue to avoid a race condition in this case. Futures creates a deferred execution when we use submit we get back a future to be used later

Some important notes and recommendations:

  1. Ramalho, Luciano, Fluent Python , chapter 17th Concurrency with Future.
  2. Beazley, David: Python Cookbook Chapter 12 Concurrency. Page 516 Defining and Actor Task