即使队列为空,地图也不会 return
map does not return even if queue is empty
当我的队列为空时,map 方法不会return。
为了完成脚本,我可以使用 treahdpool.map_async
或 tasks.get(block=False)
。
但我想了解为什么它在完成队列后不解除阻塞threadpool.map()
。
from multiprocessing.dummy import Pool as ThreadPool
from queue import Queue
def threadworker(tasks):
while True:
# try:
# func, args, kargs = tasks.get(block=False)
# except Empty:
# break
func, args, kargs = tasks.get()
try:
func(*args, **kargs)
except Exception as e:
print(e)
finally:
tasks.task_done()
def wait_delay(d):
print('sleeping for (%d)sec' % d)
time.sleep(d)
if __name__ == '__main__':
tasks = Queue()
for d in range(1,5):
tasks.put((wait_delay, (d,), {}))
threadpool = ThreadPool(processes=2)
# threadpool.map_async(threadworker, [tasks])
threadpool.map(threadworker, [tasks]) # blocking...
tasks.join()
Queue.get() 正在阻塞它只会坐在那里等待得到什么。指定超时并通过中断 while 循环来处理异常。
from queue import Queue,Empty
def threadworker(tasks):
while True:
# try:
# func, args, kargs = tasks.get(block=False)
# except Empty:
# break
try:
func, args, kargs = tasks.get(timeout=5)
except Empty:
break
try:
func(*args, **kargs)
except Exception as e:
print(e)
finally:
tasks.task_done()
当我的队列为空时,map 方法不会return。
为了完成脚本,我可以使用 treahdpool.map_async
或 tasks.get(block=False)
。
但我想了解为什么它在完成队列后不解除阻塞threadpool.map()
。
from multiprocessing.dummy import Pool as ThreadPool
from queue import Queue
def threadworker(tasks):
while True:
# try:
# func, args, kargs = tasks.get(block=False)
# except Empty:
# break
func, args, kargs = tasks.get()
try:
func(*args, **kargs)
except Exception as e:
print(e)
finally:
tasks.task_done()
def wait_delay(d):
print('sleeping for (%d)sec' % d)
time.sleep(d)
if __name__ == '__main__':
tasks = Queue()
for d in range(1,5):
tasks.put((wait_delay, (d,), {}))
threadpool = ThreadPool(processes=2)
# threadpool.map_async(threadworker, [tasks])
threadpool.map(threadworker, [tasks]) # blocking...
tasks.join()
Queue.get() 正在阻塞它只会坐在那里等待得到什么。指定超时并通过中断 while 循环来处理异常。
from queue import Queue,Empty
def threadworker(tasks):
while True:
# try:
# func, args, kargs = tasks.get(block=False)
# except Empty:
# break
try:
func, args, kargs = tasks.get(timeout=5)
except Empty:
break
try:
func(*args, **kargs)
except Exception as e:
print(e)
finally:
tasks.task_done()