multiprocess.Queue.get() 在 python 中需要很长时间
multiprocess.Queue.get() needs very long time in python
我需要通过遍历数据来获取非常大的数据块。我总共需要几百万次迭代。所以我认为压制会加快我的过程,它几乎做到了。我使用 subprocess.Queue
来调用不同的线程,这实际上工作正常,但是当我调用 *subprocess.Queue.get()` 时,程序需要很长时间才能获得结果。也许我做错了什么。这是我的最小示例:
def get_losses(self, tags=None):
return_dict = {}
output_list = multiprocessing.Queue()
process_list = []
# Create quese definition
for experiment, path in self.tf_board_dicts.items():
t = multiprocessing.Process(target=self._load_vec_from_tfboard, args=(path, tags, experiment))
process_list.append(t)
print("Starting subprocesse with a total of {} workers. \n These are {}".format(len(process_list),
process_list))
# Run processes
for p in process_list:
p.start()
# Exit the finished threads
for p in process_list:
p.join()
print("All subprocesses are termianted")
# Get results
results = [output_list.get() for p in process_list]
print("All losses are gathered: {}".format([tup[0] for tup in results]))
# Create dict
for experiment_losses in results:
return_dict[experiment_losses[0]] = experiment_losses[1]
return return_dict
您可以在这里找到队列无限时间问题的答案:Python Processes not joining
This happens because the Queue
uses a buffer internally when a lot of data is being pushed into it. The process writing to the Queue
can't exit until that buffer gets flushed, which won't happen until you start pulling things out of the Queue
. So, because you're trying to join all the processes before you pull anything out of the Queue
objects they're writing to, they can't exit, so the join
hangs. You could fix the issue by draining the Queue
prior to calling join
on the processes. – dano Sep 25 '14 at 16:16
我需要通过遍历数据来获取非常大的数据块。我总共需要几百万次迭代。所以我认为压制会加快我的过程,它几乎做到了。我使用 subprocess.Queue
来调用不同的线程,这实际上工作正常,但是当我调用 *subprocess.Queue.get()` 时,程序需要很长时间才能获得结果。也许我做错了什么。这是我的最小示例:
def get_losses(self, tags=None):
return_dict = {}
output_list = multiprocessing.Queue()
process_list = []
# Create quese definition
for experiment, path in self.tf_board_dicts.items():
t = multiprocessing.Process(target=self._load_vec_from_tfboard, args=(path, tags, experiment))
process_list.append(t)
print("Starting subprocesse with a total of {} workers. \n These are {}".format(len(process_list),
process_list))
# Run processes
for p in process_list:
p.start()
# Exit the finished threads
for p in process_list:
p.join()
print("All subprocesses are termianted")
# Get results
results = [output_list.get() for p in process_list]
print("All losses are gathered: {}".format([tup[0] for tup in results]))
# Create dict
for experiment_losses in results:
return_dict[experiment_losses[0]] = experiment_losses[1]
return return_dict
您可以在这里找到队列无限时间问题的答案:Python Processes not joining
This happens because the
Queue
uses a buffer internally when a lot of data is being pushed into it. The process writing to theQueue
can't exit until that buffer gets flushed, which won't happen until you start pulling things out of theQueue
. So, because you're trying to join all the processes before you pull anything out of theQueue
objects they're writing to, they can't exit, so thejoin
hangs. You could fix the issue by draining theQueue
prior to callingjoin
on the processes. – dano Sep 25 '14 at 16:16