multiprocessing.Queue() 到具有大输入大小的字典

multiprocessing.Queue() into dictionary with large input size

我有一个名为 encrypted_messages 的列表,其中包含 126018 个字符串。每个字符串都是加密的消息。我还有一个名为 decipher 的函数,给定一个字符串和一个密钥(一个从 9 到 15 的整数,两者都包括在内),returns 解密消息。我需要使用每个密钥来解密每条消息。由于 decipher 函数的计算量很大并且有很多消息,因此我实现了多处理解决方案。 我创建了一个名为 messages_queue 的 multiprocessing.JoinableQueue() 包含所有加密消息和一个名为 [=55 的 multiprocessing.Queue() =] 来存储结果。这些队列由所有进程共享。进程从 messages_queue 获取消息,使用所有密钥对它们应用 decipher 并将结果存储为 2 个元素的列表(用于解密消息和解密消息的密钥)。它看起来像这样:

[9, message_1], [15, message_2], [14, message_3], ...

results_queue 有 882126 个元素,符合预期(注意 126018*7 = 882126),其中每个元素都是一个列表。 我想从 results_queue 中获取长度为 7 的字典,其中每个键都是一个整数,每个值都是一个列表,其中包含使用该键解密的所有消息。它应该是这样的:

{9:[decrypted messages using key 9], 10:[decrypted messages using key 10], ...,
15:[decrypted messages using key 15]}

我已经尝试了几种方法来做到这一点,但我无法想出一个解决方案。我分享下面的代码:

final_results = {key:[] for key in range(9, 16)}
while not results_queue.empty():
    message = results_queue.get() # Note that this is a list: [key, message]
    final_results[message[0]].append(message[1])

我也试过首先创建一个这样的列表(我可以从列表中创建字典):

results = []
results_queue.put('STOP')
while True:
    message = results_queue.get()
    if message == 'STOP':
        break
    results.append(message)

我也试过像这样使用带有哨兵的迭代器:

results = []
results_queue.put(None)
for message in iter(results_queue.get, None):
    results.append(message)

使用所有这些方法,我丢失了很多(超过 50%)的消息。该列表应该有 882126 个列表,每次我 运行 代码它都有一个不同且更小的数字。这个数字在我看来完全是随机的。我不知道如何解决这个问题,因为当我使用更小的列表(例如 100 个元素)时,上述方法工作正常。 这个问题与输入大小有关吗?我的 multiprocessing.Queue() 太大了吗?我认为这不是进程之间的协调问题,因为我获得的 Queue() 是我所期望的,进程在此之后结束,但也许我遗漏了一些东西。

如果有用,我使用的是 Python 3.8.5 和 Linux Mint 20.2。欢迎任何帮助,因为我有点卡住了。提前致谢。

这是创建字典的代码,其中包含这样的 for

{key1:[message_1,message_2],key2:[message_3,message_4]}

message_decoded 必须是

的形状

[[key1,message1],[key2,message2]]

dict = {}

messages_decoded = []

for item in messages_decoded:

    if item[0] in dict:
        dict[item[0]].append(item[1])
    else:
        dict[item[0]] = [item[1]]

编辑

此代码将结果队列转换为列表。

list_messages = [results_queue.get() for _ in range(results_queue.qsize())]