以 concurrent.futures.ThreadPoolExecutor() 作为执行者:...不等待
with concurrent.futures.ThreadPoolExecutor() as executor: ... does not wait
我正在尝试在 class 的方法中使用 ThreadPoolExecutor()
创建一个线程池,该线程池将在同一个 class 中执行另一个方法。我有 with concurrent.futures.ThreadPoolExecutor()...
但它不等待,并抛出一个错误,说我在 "with..." 语句后查询的字典中没有键。我明白了为什么会抛出错误,因为字典还没有更新,因为池中的线程没有完成执行。我知道线程没有完成执行,因为我在 ThreadPoolExecutor 中调用的方法中有一个 print("done"),并且 "done" 没有打印到控制台。
我是线程的新手,所以如果有任何关于如何更好地做到这一点的建议,我们将不胜感激!
def tokenizer(self):
all_tokens = []
self.token_q = Queue()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
for num in range(5):
executor.submit(self.get_tokens, num)
executor.shutdown(wait=True)
print("Hi")
results = {}
while not self.token_q.empty():
temp_result = self.token_q.get()
results[temp_result[1]] = temp_result[0]
print(temp_result[1])
for index in range(len(self.zettels)):
for zettel in results[index]:
all_tokens.append(zettel)
return all_tokens
def get_tokens(self, thread_index):
print("!!!!!!!")
switch = {
0: self.zettels[:(len(self.zettels)/5)],
1: self.zettels[(len(self.zettels)/5): (len(self.zettels)/5)*2],
2: self.zettels[(len(self.zettels)/5)*2: (len(self.zettels)/5)*3],
3: self.zettels[(len(self.zettels)/5)*3: (len(self.zettels)/5)*4],
4: self.zettels[(len(self.zettels)/5)*4: (len(self.zettels)/5)*5],
}
new_tokens = []
for zettel in switch.get(thread_index):
tokens = re.split('\W+', str(zettel))
tokens = list(filter(None, tokens))
new_tokens.append(tokens)
print("done")
self.token_q.put([new_tokens, thread_index])
'''
预计会在 print ("Hi")
语句之前看到所有 print("!!!!!!")
和 print("done")
语句。
实际上显示 !!!!!!!
,然后是 Hi
,然后是结果字典的 KeyError
。
您需要循环 concurrent.futures.as_completed(),如图 here 所示。它会在每个线程完成时产生值。
如您所知,池正在等待; print('done')
永远不会执行,因为可能 TypeError
提早了。
池不直接等待任务完成,它等待其工作线程加入,这隐含地要求任务的执行完成,一种方式(成功)或另一种方式(异常)。
您看不到引发异常的原因是任务被包装在 Future
中。一个Future
[...] encapsulates the asynchronous execution of a callable.
Future
实例由执行程序的 submit
方法返回,它们允许查询执行状态并访问其结果。
这让我想到了一些我想说的话。
self.token_q
中的Queue
好像没必要
从您共享的代码来看,您仅使用此队列将任务结果传递回 tokenizer
函数。这不是必需的,您可以从调用 submit
returns:
的 Future
访问它
def tokenizer(self):
all_tokens = []
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(get_tokens, num) for num in range(5)]
# executor.shutdown(wait=True) here is redundant, it is called when exiting the context:
# https://github.com/python/cpython/blob/3.7/Lib/concurrent/futures/_base.py#L623
print("Hi")
results = {}
for fut in futures:
try:
res = fut.result()
results[res[1]] = res[0]
except Exception:
continue
[...]
def get_tokens(self, thread_index):
[...]
# instead of self.token_q.put([new_tokens, thread_index])
return new_tokens, thread_index
您的程序可能无法从使用线程中获益
从您共享的代码来看,get_tokens
中的操作似乎是 CPU 绑定的,而不是 I/O 绑定的。如果你的程序是 运行 CPython(或任何其他使用 Global Interpreter Lock 的解释器),在这种情况下使用线程将没有任何好处。
In CPython, the global interpreter lock, or GIL, is a mutex that protects access to Python objects, preventing multiple threads from executing Python bytecodes at once.
这意味着对于任何 Python 进程,在任何给定时间只能执行一个线程。如果您手头的任务是 I/O 绑定,即经常暂停以等待 I/O(例如,对于套接字上的数据),这不是什么大问题。如果您的任务需要在处理器中不断执行字节码,那么暂停一个线程让另一个线程执行一些指令没有任何好处。事实上,由此产生的上下文切换甚至可能是有害的。
为此,您可能想要 parallelism instead of concurrency. Take a look at ProcessPoolExecutor
。
但是,我建议按顺序、并发和并行地 运行 对您的代码进行基准测试。创建进程或线程是有代价的,并且根据要完成的任务,这样做可能比按顺序执行一个任务一个接一个地花费更长的时间。
顺便说一句,这看起来有点可疑:
for index in range(len(self.zettels)):
for zettel in results[index]:
all_tokens.append(zettel)
results
似乎总是有五个项目,因为 for num in range(5)
。如果 self.zettels
的长度大于五,我希望这里会出现一个 KeyError
。
如果保证 self.zettels
的长度为五,那么我会在这里看到一些代码优化的潜力。
我正在尝试在 class 的方法中使用 ThreadPoolExecutor()
创建一个线程池,该线程池将在同一个 class 中执行另一个方法。我有 with concurrent.futures.ThreadPoolExecutor()...
但它不等待,并抛出一个错误,说我在 "with..." 语句后查询的字典中没有键。我明白了为什么会抛出错误,因为字典还没有更新,因为池中的线程没有完成执行。我知道线程没有完成执行,因为我在 ThreadPoolExecutor 中调用的方法中有一个 print("done"),并且 "done" 没有打印到控制台。
我是线程的新手,所以如果有任何关于如何更好地做到这一点的建议,我们将不胜感激!
def tokenizer(self):
all_tokens = []
self.token_q = Queue()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
for num in range(5):
executor.submit(self.get_tokens, num)
executor.shutdown(wait=True)
print("Hi")
results = {}
while not self.token_q.empty():
temp_result = self.token_q.get()
results[temp_result[1]] = temp_result[0]
print(temp_result[1])
for index in range(len(self.zettels)):
for zettel in results[index]:
all_tokens.append(zettel)
return all_tokens
def get_tokens(self, thread_index):
print("!!!!!!!")
switch = {
0: self.zettels[:(len(self.zettels)/5)],
1: self.zettels[(len(self.zettels)/5): (len(self.zettels)/5)*2],
2: self.zettels[(len(self.zettels)/5)*2: (len(self.zettels)/5)*3],
3: self.zettels[(len(self.zettels)/5)*3: (len(self.zettels)/5)*4],
4: self.zettels[(len(self.zettels)/5)*4: (len(self.zettels)/5)*5],
}
new_tokens = []
for zettel in switch.get(thread_index):
tokens = re.split('\W+', str(zettel))
tokens = list(filter(None, tokens))
new_tokens.append(tokens)
print("done")
self.token_q.put([new_tokens, thread_index])
'''
预计会在 print ("Hi")
语句之前看到所有 print("!!!!!!")
和 print("done")
语句。
实际上显示 !!!!!!!
,然后是 Hi
,然后是结果字典的 KeyError
。
您需要循环 concurrent.futures.as_completed(),如图 here 所示。它会在每个线程完成时产生值。
如您所知,池正在等待; print('done')
永远不会执行,因为可能 TypeError
提早了。
池不直接等待任务完成,它等待其工作线程加入,这隐含地要求任务的执行完成,一种方式(成功)或另一种方式(异常)。
您看不到引发异常的原因是任务被包装在 Future
中。一个Future
[...] encapsulates the asynchronous execution of a callable.
Future
实例由执行程序的 submit
方法返回,它们允许查询执行状态并访问其结果。
这让我想到了一些我想说的话。
self.token_q
中的Queue
好像没必要
从您共享的代码来看,您仅使用此队列将任务结果传递回 tokenizer
函数。这不是必需的,您可以从调用 submit
returns:
Future
访问它
def tokenizer(self):
all_tokens = []
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(get_tokens, num) for num in range(5)]
# executor.shutdown(wait=True) here is redundant, it is called when exiting the context:
# https://github.com/python/cpython/blob/3.7/Lib/concurrent/futures/_base.py#L623
print("Hi")
results = {}
for fut in futures:
try:
res = fut.result()
results[res[1]] = res[0]
except Exception:
continue
[...]
def get_tokens(self, thread_index):
[...]
# instead of self.token_q.put([new_tokens, thread_index])
return new_tokens, thread_index
您的程序可能无法从使用线程中获益
从您共享的代码来看,get_tokens
中的操作似乎是 CPU 绑定的,而不是 I/O 绑定的。如果你的程序是 运行 CPython(或任何其他使用 Global Interpreter Lock 的解释器),在这种情况下使用线程将没有任何好处。
In CPython, the global interpreter lock, or GIL, is a mutex that protects access to Python objects, preventing multiple threads from executing Python bytecodes at once.
这意味着对于任何 Python 进程,在任何给定时间只能执行一个线程。如果您手头的任务是 I/O 绑定,即经常暂停以等待 I/O(例如,对于套接字上的数据),这不是什么大问题。如果您的任务需要在处理器中不断执行字节码,那么暂停一个线程让另一个线程执行一些指令没有任何好处。事实上,由此产生的上下文切换甚至可能是有害的。
为此,您可能想要 parallelism instead of concurrency. Take a look at ProcessPoolExecutor
。
但是,我建议按顺序、并发和并行地 运行 对您的代码进行基准测试。创建进程或线程是有代价的,并且根据要完成的任务,这样做可能比按顺序执行一个任务一个接一个地花费更长的时间。
顺便说一句,这看起来有点可疑:
for index in range(len(self.zettels)):
for zettel in results[index]:
all_tokens.append(zettel)
results
似乎总是有五个项目,因为 for num in range(5)
。如果 self.zettels
的长度大于五,我希望这里会出现一个 KeyError
。
如果保证 self.zettels
的长度为五,那么我会在这里看到一些代码优化的潜力。