如何使用多处理在一个非常大的列表中删除重复项?
How to use multiprocessing to drop duplicates in a very big list?
假设我有一个包含随机数的巨大列表,例如
L = [random.randrange(0,25000000000) for _ in range(1000000000)]
我需要删除此列表中的重复项
我为包含较少元素的列表编写了这段代码
def remove_duplicates(list_to_deduplicate):
seen = set()
result=[]
for i in list_to_deduplicate:
if i not in seen:
result.append(i)
seen.add(i)
return result
在上面的代码中,我创建了一个集合,这样我就可以记住哪些数字已经出现在我正在处理的列表中,如果该数字不在集合中,那么我将它添加到我需要的结果列表中return 并将其保存在集合中,这样它就不会再次添加到结果列表中
现在,对于列表中的 1000000 个数字,一切都很好,我可以快速得到结果,但是对于比 1000000000 个更好的数字,问题出现了,我需要在我的机器上使用不同的内核来尝试解决问题,然后合并来自多个进程的结果
我的第一个猜测是让所有进程都可以访问一个集合,但是会出现很多复杂情况
一个进程如何读取,而另一个进程可能正在添加到集合中,我什至不知道是否可以在进程之间共享一个集合我知道我们可以使用队列或管道,但我不确定如何使用它
谁能给我一个解决这个问题的最佳方法的建议
我乐于接受任何新想法
不能说我喜欢这个,但它应该会起作用,毕竟是时尚。
将数据分成N个只读块。为每个工人分配一个以研究数据。一切都是只读的,所以都可以共享。每个工作人员 i 1...N 对照所有其他 'future' 列表检查其列表 i+1...N
每个工人 i 为其 i+1...N 列表维护一点 table它的项目命中任何未来的项目。
当每个人都完成后,worker i 将它的位 table 发送回 master,在那里 tit 可以被 ANDed。然后零被删除。没有排序没有集合。检查速度不快,不过。
如果你不想打扰多个位 tables 你可以让每个工人 i 在他们发现自己区域上方的 dup 时写零责任。但是,现在您 运行 陷入 真正的共享内存问题 。就此而言,您甚至可以让每个作品只删除其区域上方的 dup,但同上。
连分工都回避了问题。对于每个工作人员来说,为自己的每个条目遍历其他人的列表是很昂贵的。 *(N-1)len(地区)/2。每个 worker 都可以创建一组它的区域,或者对它的区域进行排序。两者都可以加快检查速度,但成本会增加。
我怀疑即使您的最大列表也足够大,以至于多处理会改善时间安排。使用 numpy 和 多线程 可能是你最好的机会。
多处理引入了相当多的开销并增加了内存消耗,就像前面正确提到的 @Frank Merrow 一样。
但是,多线程并非如此(扩展)。重要的是不要混淆这些术语,因为进程和线程是不同的。
同一个进程中的线程共享它们的内存,不同的进程不共享。
在 Python 中使用多核的问题是 GIL,它不允许多个线程(在同一进程中)并行执行 Python 字节码。像 numpy 这样的一些 C 扩展可以释放 GIL,这可以从多线程的多核并行中获益。这是您仅通过使用 numpy 就可以在重大改进之上获得一些速度的机会。
from multiprocessing.dummy import Pool # .dummy uses threads
import numpy as np
r = np.random.RandomState(42).randint(0, 25000000000, 100_000_000)
n_threads = 8
result = np.unique(np.concatenate(
Pool(n_threads).map(np.unique, np.array_split(r, n_threads)))
).tolist()
使用 numpy 和线程池,拆分数组,使子数组在单独的线程中唯一,然后连接子数组并使重新组合的数组再次唯一。
最终删除重组数组的重复项是必要的,因为在子数组中只能识别 local 重复项。
对于低熵数据(许多重复项)使用pandas.unique
instead of numpy.unique
can be much faster. Unlike numpy.unique
它也保留了出现顺序。
请注意,只有在 numpy 函数 尚未多线程 under the hood 通过调用低级数学时,使用像上面这样的线程池才有意义图书馆。因此,请始终测试它是否真的提高了性能,不要认为这是理所当然的。
使用以下范围内的 100M 个随机生成的整数进行测试:
- 高熵:0 - 25_000_000_000(199560 次重复)
- 低熵:0 - 1000
代码
import time
import timeit
from multiprocessing.dummy import Pool # .dummy uses threads
import numpy as np
import pandas as pd
def time_stmt(stmt, title=None):
t = timeit.repeat(
stmt=stmt,
timer=time.perf_counter_ns, repeat=3, number=1, globals=globals()
)
print(f"\t{title or stmt}")
print(f"\t\t{min(t) / 1e9:.2f} s")
if __name__ == '__main__':
n_threads = 8 # machine with 8 cores (4 physical cores)
stmt_np_unique_pool = \
"""
np.unique(np.concatenate(
Pool(n_threads).map(np.unique, np.array_split(r, n_threads)))
).tolist()
"""
stmt_pd_unique_pool = \
"""
pd.unique(np.concatenate(
Pool(n_threads).map(pd.unique, np.array_split(r, n_threads)))
).tolist()
"""
# -------------------------------------------------------------------------
print(f"\nhigh entropy (few duplicates) {'-' * 30}\n")
r = np.random.RandomState(42).randint(0, 25000000000, 100_000_000)
r = list(r)
time_stmt("list(set(r))")
r = np.asarray(r)
# numpy.unique
time_stmt("np.unique(r).tolist()")
# pandas.unique
time_stmt("pd.unique(r).tolist()")
# numpy.unique & Pool
time_stmt(stmt_np_unique_pool, "numpy.unique() & Pool")
# pandas.unique & Pool
time_stmt(stmt_pd_unique_pool, "pandas.unique() & Pool")
# ---
print(f"\nlow entropy (many duplicates) {'-' * 30}\n")
r = np.random.RandomState(42).randint(0, 1000, 100_000_000)
r = list(r)
time_stmt("list(set(r))")
r = np.asarray(r)
# numpy.unique
time_stmt("np.unique(r).tolist()")
# pandas.unique
time_stmt("pd.unique(r).tolist()")
# numpy.unique & Pool
time_stmt(stmt_np_unique_pool, "numpy.unique() & Pool")
# pandas.unique() & Pool
time_stmt(stmt_pd_unique_pool, "pandas.unique() & Pool")
就像您在下面的计时中看到的那样,仅使用不带多线程的 numpy 已经带来了最大的性能改进。另请注意,pandas.unique()
比 numpy.unique()
(仅)快于许多重复项。
high entropy (few duplicates) ------------------------------
list(set(r))
32.76 s
np.unique(r).tolist()
12.32 s
pd.unique(r).tolist()
23.01 s
numpy.unique() & Pool
9.75 s
pandas.unique() & Pool
28.91 s
low entropy (many duplicates) ------------------------------
list(set(r))
5.66 s
np.unique(r).tolist()
4.59 s
pd.unique(r).tolist()
0.75 s
numpy.unique() & Pool
1.17 s
pandas.unique() & Pool
0.19 s
假设我有一个包含随机数的巨大列表,例如
L = [random.randrange(0,25000000000) for _ in range(1000000000)]
我需要删除此列表中的重复项
我为包含较少元素的列表编写了这段代码
def remove_duplicates(list_to_deduplicate):
seen = set()
result=[]
for i in list_to_deduplicate:
if i not in seen:
result.append(i)
seen.add(i)
return result
在上面的代码中,我创建了一个集合,这样我就可以记住哪些数字已经出现在我正在处理的列表中,如果该数字不在集合中,那么我将它添加到我需要的结果列表中return 并将其保存在集合中,这样它就不会再次添加到结果列表中
现在,对于列表中的 1000000 个数字,一切都很好,我可以快速得到结果,但是对于比 1000000000 个更好的数字,问题出现了,我需要在我的机器上使用不同的内核来尝试解决问题,然后合并来自多个进程的结果
我的第一个猜测是让所有进程都可以访问一个集合,但是会出现很多复杂情况 一个进程如何读取,而另一个进程可能正在添加到集合中,我什至不知道是否可以在进程之间共享一个集合我知道我们可以使用队列或管道,但我不确定如何使用它
谁能给我一个解决这个问题的最佳方法的建议 我乐于接受任何新想法
不能说我喜欢这个,但它应该会起作用,毕竟是时尚。
将数据分成N个只读块。为每个工人分配一个以研究数据。一切都是只读的,所以都可以共享。每个工作人员 i 1...N 对照所有其他 'future' 列表检查其列表 i+1...N
每个工人 i 为其 i+1...N 列表维护一点 table它的项目命中任何未来的项目。
当每个人都完成后,worker i 将它的位 table 发送回 master,在那里 tit 可以被 ANDed。然后零被删除。没有排序没有集合。检查速度不快,不过。
如果你不想打扰多个位 tables 你可以让每个工人 i 在他们发现自己区域上方的 dup 时写零责任。但是,现在您 运行 陷入 真正的共享内存问题 。就此而言,您甚至可以让每个作品只删除其区域上方的 dup,但同上。
连分工都回避了问题。对于每个工作人员来说,为自己的每个条目遍历其他人的列表是很昂贵的。 *(N-1)len(地区)/2。每个 worker 都可以创建一组它的区域,或者对它的区域进行排序。两者都可以加快检查速度,但成本会增加。
我怀疑即使您的最大列表也足够大,以至于多处理会改善时间安排。使用 numpy 和 多线程 可能是你最好的机会。
多处理引入了相当多的开销并增加了内存消耗,就像前面正确提到的 @Frank Merrow 一样。 但是,多线程并非如此(扩展)。重要的是不要混淆这些术语,因为进程和线程是不同的。 同一个进程中的线程共享它们的内存,不同的进程不共享。
在 Python 中使用多核的问题是 GIL,它不允许多个线程(在同一进程中)并行执行 Python 字节码。像 numpy 这样的一些 C 扩展可以释放 GIL,这可以从多线程的多核并行中获益。这是您仅通过使用 numpy 就可以在重大改进之上获得一些速度的机会。
from multiprocessing.dummy import Pool # .dummy uses threads
import numpy as np
r = np.random.RandomState(42).randint(0, 25000000000, 100_000_000)
n_threads = 8
result = np.unique(np.concatenate(
Pool(n_threads).map(np.unique, np.array_split(r, n_threads)))
).tolist()
使用 numpy 和线程池,拆分数组,使子数组在单独的线程中唯一,然后连接子数组并使重新组合的数组再次唯一。 最终删除重组数组的重复项是必要的,因为在子数组中只能识别 local 重复项。
对于低熵数据(许多重复项)使用pandas.unique
instead of numpy.unique
can be much faster. Unlike numpy.unique
它也保留了出现顺序。
请注意,只有在 numpy 函数 尚未多线程 under the hood 通过调用低级数学时,使用像上面这样的线程池才有意义图书馆。因此,请始终测试它是否真的提高了性能,不要认为这是理所当然的。
使用以下范围内的 100M 个随机生成的整数进行测试:
- 高熵:0 - 25_000_000_000(199560 次重复)
- 低熵:0 - 1000
代码
import time
import timeit
from multiprocessing.dummy import Pool # .dummy uses threads
import numpy as np
import pandas as pd
def time_stmt(stmt, title=None):
t = timeit.repeat(
stmt=stmt,
timer=time.perf_counter_ns, repeat=3, number=1, globals=globals()
)
print(f"\t{title or stmt}")
print(f"\t\t{min(t) / 1e9:.2f} s")
if __name__ == '__main__':
n_threads = 8 # machine with 8 cores (4 physical cores)
stmt_np_unique_pool = \
"""
np.unique(np.concatenate(
Pool(n_threads).map(np.unique, np.array_split(r, n_threads)))
).tolist()
"""
stmt_pd_unique_pool = \
"""
pd.unique(np.concatenate(
Pool(n_threads).map(pd.unique, np.array_split(r, n_threads)))
).tolist()
"""
# -------------------------------------------------------------------------
print(f"\nhigh entropy (few duplicates) {'-' * 30}\n")
r = np.random.RandomState(42).randint(0, 25000000000, 100_000_000)
r = list(r)
time_stmt("list(set(r))")
r = np.asarray(r)
# numpy.unique
time_stmt("np.unique(r).tolist()")
# pandas.unique
time_stmt("pd.unique(r).tolist()")
# numpy.unique & Pool
time_stmt(stmt_np_unique_pool, "numpy.unique() & Pool")
# pandas.unique & Pool
time_stmt(stmt_pd_unique_pool, "pandas.unique() & Pool")
# ---
print(f"\nlow entropy (many duplicates) {'-' * 30}\n")
r = np.random.RandomState(42).randint(0, 1000, 100_000_000)
r = list(r)
time_stmt("list(set(r))")
r = np.asarray(r)
# numpy.unique
time_stmt("np.unique(r).tolist()")
# pandas.unique
time_stmt("pd.unique(r).tolist()")
# numpy.unique & Pool
time_stmt(stmt_np_unique_pool, "numpy.unique() & Pool")
# pandas.unique() & Pool
time_stmt(stmt_pd_unique_pool, "pandas.unique() & Pool")
就像您在下面的计时中看到的那样,仅使用不带多线程的 numpy 已经带来了最大的性能改进。另请注意,pandas.unique()
比 numpy.unique()
(仅)快于许多重复项。
high entropy (few duplicates) ------------------------------
list(set(r))
32.76 s
np.unique(r).tolist()
12.32 s
pd.unique(r).tolist()
23.01 s
numpy.unique() & Pool
9.75 s
pandas.unique() & Pool
28.91 s
low entropy (many duplicates) ------------------------------
list(set(r))
5.66 s
np.unique(r).tolist()
4.59 s
pd.unique(r).tolist()
0.75 s
numpy.unique() & Pool
1.17 s
pandas.unique() & Pool
0.19 s