Python: 如何同步一个for循环

Python: How to asynchronize a for loop

是否可以用asyncio迭代Python中的generator对象?我创建了一个名为 hash_generator() 的简单函数,其中 return 是一个唯一的哈希值。现在我决定对循环进行基准测试,我用大约 8 秒的时间迭代打印 100,000 个哈希值。我可以 运行 以异步方式进行此操作以最大程度地减少时间吗?我阅读了它的文档,但我很困惑。我想探索异步,我想从这个问题开始。

import hashlib
import string
import random
import time


def hash_generator():
    """Return a unique hash"""
    prefix = int(time.time())
    suffix = (random.choice(string.ascii_letters) for i in range(10))
    key = ".".join([str(prefix), str("".join(suffix))])
    value = hashlib.blake2b(key.encode(), digest_size=6).hexdigest()
    return value.upper()


"""Iterating the hashes and printing the time it loaded"""
hashes = (hash_generator() for i in range(100000))
time_before = time.time()
[print(i) for i in hashes]
time_after = time.time()
difference = time_after - time_before
print('Loaded in {0:.2f}sec'.format(difference))
# 40503CBA2DAE
# ...
# A511068F4945
# Loaded in 8.81sec

编辑 1

random.choice() 函数是程序到 运行 花费太长时间的主要原因。我重新创建了下面的函数,将当前时间和来自 os.urandom(低冲突)的随机字符串作为值。我尝试了多线程,但没有让 运行 的任务尽可能快,而是速度太慢了。始终欢迎任何重构以下代码的建议。

import hashlib
import time
import os
import timeit


def hash_generator():
    """Return a unique hash"""
    prefix = str(time.time())
    suffix = str(os.urandom(10))
    key = "".join([prefix, suffix])
    value = hashlib.blake2b(key.encode(), digest_size=6).hexdigest()
    return value.upper()


"""Iterating the hashes and printing the time it loaded"""
print(timeit.timeit(hash_generator, number=100000), "sec")
# 0.497149389999322 sec

编辑 2

在 Jack Taylor 和 Whosebugers 的帮助下,我可以通过使用 multiprocessing 超过 1M 次迭代看到差异。我对下面的代码进行了基准测试。

import hashlib
import time
import os
import timeit
import multiprocessing


def hash_generator(_=None):
    """Return a unique hash"""
    prefix = str(time.time())
    suffix = str(os.urandom(10))
    key = "".join([prefix, suffix])
    value = hashlib.blake2b(key.encode(), digest_size=6).hexdigest()
    return value.upper()


# Allows for the safe importing of the main module
if __name__ == "__main__":
    start_time = time.time()
    number_processes = 4
    iteration = 10000000
    pool = multiprocessing.Pool(number_processes)
    results = pool.map(hash_generator, range(iteration))
    pool.close()
    pool.join()
    end_time = time.time()
    pool_runtime = end_time - start_time
    print('(Pool) Loaded in: {0:.5f} sec'.format(pool_runtime))

    ordinary_runtime = timeit.timeit(hash_generator, number=iteration)
    print('(Ordinary) Loaded in: {0:.5f} sec'.format(ordinary_runtime))

iteration = 10
(Pool) Loaded in: 1.20685 sec
(Ordinary) Loaded in: 0.00023 sec

iteration = 1000
(Pool) Loaded in: 0.72233 sec
(Ordinary) Loaded in: 0.01767 sec

iteration = 1000
(Pool) Loaded in: 0.99571 sec
(Ordinary) Loaded in: 0.01208 sec

iteration = 10,000
(Pool) Loaded in: 1.07876 sec
(Ordinary) Loaded in: 0.12652 sec

iteration = 100,000
(Pool) Loaded in: 1.57068 sec
(Ordinary) Loaded in: 1.23418 sec

iteration = 1,000,000
(Pool) Loaded in: 4.28724 sec
(Ordinary) Loaded in: 11.56332 sec

iteration = 10,000,000
(Pool) Loaded in: 27.26819 sec
(Ordinary) Loaded in: 132.68170 sec

看起来你可能更适合顺序版本。传统观点认为,在 Python 中,使用 I/O-bound 作业(文件 reads/writes,网络),您可以通过使用事件循环或多线程来提高速度,并且使用 CPU-bound jobs (like computing hashes) 你可以通过使用多个进程来加速。

但是,我采用了您的版本并使用 concurrent.futures 和进程池重写了它,结果并没有加快它的速度,而是让它慢了 10 倍。

代码如下:

from concurrent import futures
import hashlib
import string
import random
import time

def hash_generator():
    """Return a unique hash"""
    prefix = int(time.time())
    suffix = (random.choice(string.ascii_letters) for i in range(10))
    key = ".".join([str(prefix), str("".join(suffix))])
    value = hashlib.blake2b(key.encode(), digest_size=6).hexdigest()
    return value.upper()

def main(workers = None):
    """Iterating the hashes and printing the time it loaded"""
    time_before = time.time()
    with futures.ProcessPoolExecutor(workers) as executor:
        worker_count = executor._max_workers
        jobs = (executor.submit(hash_generator) for i in range(100000))
        for future in futures.as_completed(jobs):
            print(future.result())
    time_after = time.time()
    difference = time_after - time_before
    print('Loaded in {0:.2f}sec with {1} workers'.format(difference, worker_count))

if __name__ == '__main__':
    main()

# 2BD6056CC0B4
# ...
# D0A6707225EB
# Loaded in 50.74sec with 4 workers

对于多个进程,启动和停止不同进程以及进程间通信会产生一些开销,这可能是多进程版本比顺序版本慢的原因,即使它使用了所有CPU 个核心。

您也可以尝试使用集群将工作拆分到多台计算机上,and/or 用低级语言编写算法(我觉得 Go 是个不错的选择)。但这是否值得你花时间,我不知道。