使用并发期货时如何共享状态
How to share state when using concurrent futures
我知道使用传统的多处理库我可以声明一个值并在进程之间共享状态。
使用较新的 concurrent.futures
库时,如何在我的进程之间共享状态?
import concurrent.futures
def get_user_object(batch):
# do some work
counter = counter + 1
print(counter)
def do_multithreading(batches):
with concurrent.futures.ThreadPoolExecutor(max_workers=25) as executor:
threadingResult = executor.map(get_user_object, batches)
def run():
data_pools = get_data()
start = time.time()
with concurrent.futures.ProcessPoolExecutor(max_workers=PROCESSES) as executor:
processResult = executor.map(do_multithreading, data_pools)
end = time.time()
print("TIME TAKEN:", end - start)
if __name__ == '__main__':
run()
我想保持这个计数器的同步值。
在以前的库中,我可能使用了 multiprocessing.Value
和 Lock
。
您可以将 initializer
和 initargs
传递给 ProcessPoolExecutor
,就像传递给 multiprocessing.Pool
一样。这是一个例子:
import concurrent.futures
import multiprocessing as mp
def get_user_object(batch):
with _COUNTER.get_lock():
_COUNTER.value += 1
print(_COUNTER.value, end=' ')
def init_globals(counter):
global _COUNTER
_COUNTER = counter
def main():
counter = mp.Value('i', 0)
with concurrent.futures.ProcessPoolExecutor(
initializer=init_globals, initargs=(counter,)
) as executor:
for _ in executor.map(get_user_object, range(10)):
pass
print()
if __name__ == "__main__":
import sys
sys.exit(main())
使用:
$ python3 glob_counter.py
1 2 4 3 5 6 7 8 10 9
其中:
for _ in executor.map(get_user_object, range(10)):
可让您遍历每个 结果 。在这种情况下,get_user_object()
returns None
,所以你真的没有什么要处理的;你只是 pass
并且没有采取进一步的行动。
- 最后一个
print()
调用给你一个额外的换行符,因为原来的 print()
调用没有使用换行符 (end=' '
')
我知道使用传统的多处理库我可以声明一个值并在进程之间共享状态。
使用较新的 concurrent.futures
库时,如何在我的进程之间共享状态?
import concurrent.futures
def get_user_object(batch):
# do some work
counter = counter + 1
print(counter)
def do_multithreading(batches):
with concurrent.futures.ThreadPoolExecutor(max_workers=25) as executor:
threadingResult = executor.map(get_user_object, batches)
def run():
data_pools = get_data()
start = time.time()
with concurrent.futures.ProcessPoolExecutor(max_workers=PROCESSES) as executor:
processResult = executor.map(do_multithreading, data_pools)
end = time.time()
print("TIME TAKEN:", end - start)
if __name__ == '__main__':
run()
我想保持这个计数器的同步值。
在以前的库中,我可能使用了 multiprocessing.Value
和 Lock
。
您可以将 initializer
和 initargs
传递给 ProcessPoolExecutor
,就像传递给 multiprocessing.Pool
一样。这是一个例子:
import concurrent.futures
import multiprocessing as mp
def get_user_object(batch):
with _COUNTER.get_lock():
_COUNTER.value += 1
print(_COUNTER.value, end=' ')
def init_globals(counter):
global _COUNTER
_COUNTER = counter
def main():
counter = mp.Value('i', 0)
with concurrent.futures.ProcessPoolExecutor(
initializer=init_globals, initargs=(counter,)
) as executor:
for _ in executor.map(get_user_object, range(10)):
pass
print()
if __name__ == "__main__":
import sys
sys.exit(main())
使用:
$ python3 glob_counter.py
1 2 4 3 5 6 7 8 10 9
其中:
for _ in executor.map(get_user_object, range(10)):
可让您遍历每个 结果 。在这种情况下,get_user_object()
returnsNone
,所以你真的没有什么要处理的;你只是pass
并且没有采取进一步的行动。- 最后一个
print()
调用给你一个额外的换行符,因为原来的print()
调用没有使用换行符 (end=' '
')