`multiprocessing` `starmap_async` 只调用一次回调?

`multiprocessing` `starmap_async` only calls callback once?

我有以下代码,它为 4 个工人创建了一个池,并调用了一个工人方法。该代码在大多数情况下都可以正常工作。当 运行 我看到正在调用不同的工作人员来处理工作。然而 calc_completed 永远不会在所有工人都完成后的最后调用一次。这是预期的行为吗?我本以为回调会在每个工人完成时发生。

def calculate_worker(x, y):
    print 'working...'
    ...

def calc_completed(result):
    print 'completed: %s'%str(result)

def calc_errored(result):
    print 'error: %s'%str(result)

if __name__ == '__main__':  
    start, stop, step = 1, 1000, 1
    ranges = [(n, min(n+step, stop)) for n in xrange(start, stop, step)]

    pool = mp.Pool(processes=8)

    res = pool.starmap_async(calculate_worker, ranges,
                             callback=calculate_worker, error_callback=calc_completed)  

    pool.close()
    pool.join()
    d = res.get()       
    print(d)

calc_completed 仅在执行映射函数时遇到任何错误时才会调用(此处:calculate_worker)。

您的代码中的另一个问题是您 运行 calculate_worker 并行运行 并将其用作 callback。这没有多大意义,因为 calculate_worker 将被调用两次 - 第一次:作为 worker 函数,其次:作为报告计算已完成的函数。你应该有两个不同的功能。

鉴于您提供的代码片段中的功能,我将按以下方式对其进行更改:

res = pool.starmap_async(calculate_worker, ranges,
                         callback=calc_completed, 
                         error_callback=calc_errored)  

如果你想测试 calc_errored 是否被正确调用,那么你可以在 calculate_worker 函数中引入一些随机错误,看看它是否会被处理,例如

def calculate_worker(x, y):
    if (x % 7):
      x / (y - y)  # division by zero
    print 'working...'