K-way归并排序在多台主机上划分
K-way merge sort divided on multiple hosts
我有大约 8000 个文件,磁盘上有大约 6TB 的数据。每个文件都包含一个键值对列表,我希望将这些值合并到一个排序的键值对列表中(例如,如果键 A 出现在两个文件中,则合并文件包含键 A 一次并且该键包含来自两个文件的所有值)。
我已经实现了这个 k-way merge for a single core on a single host in Python [gist -- see 以获得对过程的直观概述]。我现在希望将工作分配到没有共享内存但可以共享网络访问的多个主机上。
我需要排序的键 space 绝对是巨大的,大约 26^24,但是数据中不存在绝大多数键(所以给每个 worker 没有意义一组与他们自己有关的键)。
其他人对如何实现分布式 k 路合并算法有任何想法吗?这让我觉得非常重要,但可能有一些我没有看到的容易实现的成果。其他人可以提供的任何指示将不胜感激。
备注
计算设置是可参数化的。我在两个计算集群上工作,每个集群都允许我同时使用 ~10-1000 个节点,每个节点有 12-24 个内核和 ~120GB RAM。在收到请求后,机器会在不确定的时间联机。网络通信通过 TCP 进行。磁盘是带有 AFS 文件系统的 SSD,存储空间充足。
此外,我正在使用一个简单的 Python 包 big-read 在任何给定时间仅将 8,000 个文件中的每个文件的 n
行读入 RAM,因此 RAM 管理"external sort" 已经很容易处理了...
高度相关:K-way 与 stxxl 合并。
如果您的比较方法不是很复杂,瓶颈很可能是文件 IO。当您通过网络而不是在本地硬盘驱动器上执行此操作时,情况会变得更糟。 (但你只能通过分析才能确定)
我确定文件 IO 是您的瓶颈(但您只能在分析后才能确定)。
我会推荐:
- 将大块数据加载到 RAM 中(尽可能大)对每个块使用快速排序以在 RAM 中对其进行排序并将其作为每个块一个文件写入磁盘。
- 使用你的 k-way 合并来合并这个大的排序文件。
这 8000 个文件中的每一个文件首先需要按键排序还是已经按键排序?如果 8000 首先需要按键排序,则初始阶段将受到 CPU 约束。这个对文件进行排序的初始阶段可以并行完成(和多线程,例如 gnu 排序)。在这一点之后,在合并步骤中,进程通常成为文件 I/O 绑定,但是如果文件 I/O 与 SSD 可以独立完成,那么合并阶段也可以并行完成,使用SSD 组。最终,生成单个排序文件的最终合并将被文件 I/O 绑定,尝试并行实现它没有任何优势。
这是一个已经解决的问题。大多数 mapreduce 框架,例如 Hadoop,都在底层进行分布式排序。最好的将具有完整的逻辑来检测故障机器,将它们取出并重做它们的工作。 (当你大规模使用大型分布式系统时,补偿机器故障很重要。)找到一个好的框架并使用它,而不是重新发明轮子。
至于他们如何排序,我知道标准方法是合并排序。起初你分发的工作块看起来像 "Sort this block." 然后你开始分发工作块看起来像 "Merge these chunks together." 当你要合并的块不适合单个电脑。然后你需要取一组块,并找出将其分区的位置,然后合并这些块。我不确定他们是如何做到这一点的。我最好的即兴想法是像每千分之一的元素子选择一样,对其进行排序,对其进行分区,并告诉每台保存完整数据的机器将其数据集切割成块的位置,以及将数据发送给谁合并。
无论如何完成,您最终都会得到一组有序的机器,每台机器都有一个有序的数据部分,并且在它们之间您可以对所有数据进行排序。
重要提示: 在处理大型分布式数据集时,避免在任何地方造成瓶颈非常重要。隐式或显式。您从分布式数据开始。您以分布式方式处理它。你结束了分布式数据。总是。
分布式 sort/merge 与单个主机上的 sort/merge 非常相似。基本思想是在单独的主机之间拆分文件。让每个主机对其单独的文件进行排序,然后开始我在 中描述的合并操作。因此每个主机都有一个优先级队列,其中包含它排序的每个文件的下一个项目。
其中一个主机维护一个优先级队列,其中包含来自其他每个主机的下一个项目。它从该队列中选择第一个,输出它,并轮询它来自的主机以获取下一个项目,并将其插入优先级队列并继续。
它是优先级队列中的优先级队列,分布在多台主机上。从图形上看,它看起来像这样:
Host1 Host2 Host3 Host4
------------------------------------------------------------------
F1 F2 F3 F4 F5 F6 F7 F8 F9 F10 F11 F12 F13 F14 F15 F16
\ | | / \ | | / \ | | / \ | | /
---------- ---------- ------------ ------------
PQ1 PQ2 PQ3 PQ4
\ \ / /
\ \ / /
\ \ / /
\ \ / /
---------------\ /------------------
\ /
\ /
\ /
--
Master PQ
on primary host
现在,一次从各个主机请求单个项目是非常低效的。主要主机可以向每个主机请求 1,000 个项目并将它们保存在单独的缓冲区中。每当主机的缓冲区用完时,主要主机就会向主机请求另一个已满的缓冲区。这将减少网络流量。
这也减少了单个主机上的 I/O:您永远不必将合并的文件写入磁盘。您按照我之前的回答中的描述对单个文件进行排序并将它们写入磁盘,然后您开始在各个主机上进行合并并将项目发送到进行大合并的主要主机。
我有大约 8000 个文件,磁盘上有大约 6TB 的数据。每个文件都包含一个键值对列表,我希望将这些值合并到一个排序的键值对列表中(例如,如果键 A 出现在两个文件中,则合并文件包含键 A 一次并且该键包含来自两个文件的所有值)。
我已经实现了这个 k-way merge for a single core on a single host in Python [gist -- see
我需要排序的键 space 绝对是巨大的,大约 26^24,但是数据中不存在绝大多数键(所以给每个 worker 没有意义一组与他们自己有关的键)。
其他人对如何实现分布式 k 路合并算法有任何想法吗?这让我觉得非常重要,但可能有一些我没有看到的容易实现的成果。其他人可以提供的任何指示将不胜感激。
备注
计算设置是可参数化的。我在两个计算集群上工作,每个集群都允许我同时使用 ~10-1000 个节点,每个节点有 12-24 个内核和 ~120GB RAM。在收到请求后,机器会在不确定的时间联机。网络通信通过 TCP 进行。磁盘是带有 AFS 文件系统的 SSD,存储空间充足。
此外,我正在使用一个简单的 Python 包 big-read 在任何给定时间仅将 8,000 个文件中的每个文件的 n
行读入 RAM,因此 RAM 管理"external sort" 已经很容易处理了...
高度相关:K-way 与 stxxl 合并。
如果您的比较方法不是很复杂,瓶颈很可能是文件 IO。当您通过网络而不是在本地硬盘驱动器上执行此操作时,情况会变得更糟。 (但你只能通过分析才能确定)
我确定文件 IO 是您的瓶颈(但您只能在分析后才能确定)。
我会推荐:
- 将大块数据加载到 RAM 中(尽可能大)对每个块使用快速排序以在 RAM 中对其进行排序并将其作为每个块一个文件写入磁盘。
- 使用你的 k-way 合并来合并这个大的排序文件。
这 8000 个文件中的每一个文件首先需要按键排序还是已经按键排序?如果 8000 首先需要按键排序,则初始阶段将受到 CPU 约束。这个对文件进行排序的初始阶段可以并行完成(和多线程,例如 gnu 排序)。在这一点之后,在合并步骤中,进程通常成为文件 I/O 绑定,但是如果文件 I/O 与 SSD 可以独立完成,那么合并阶段也可以并行完成,使用SSD 组。最终,生成单个排序文件的最终合并将被文件 I/O 绑定,尝试并行实现它没有任何优势。
这是一个已经解决的问题。大多数 mapreduce 框架,例如 Hadoop,都在底层进行分布式排序。最好的将具有完整的逻辑来检测故障机器,将它们取出并重做它们的工作。 (当你大规模使用大型分布式系统时,补偿机器故障很重要。)找到一个好的框架并使用它,而不是重新发明轮子。
至于他们如何排序,我知道标准方法是合并排序。起初你分发的工作块看起来像 "Sort this block." 然后你开始分发工作块看起来像 "Merge these chunks together." 当你要合并的块不适合单个电脑。然后你需要取一组块,并找出将其分区的位置,然后合并这些块。我不确定他们是如何做到这一点的。我最好的即兴想法是像每千分之一的元素子选择一样,对其进行排序,对其进行分区,并告诉每台保存完整数据的机器将其数据集切割成块的位置,以及将数据发送给谁合并。
无论如何完成,您最终都会得到一组有序的机器,每台机器都有一个有序的数据部分,并且在它们之间您可以对所有数据进行排序。
重要提示: 在处理大型分布式数据集时,避免在任何地方造成瓶颈非常重要。隐式或显式。您从分布式数据开始。您以分布式方式处理它。你结束了分布式数据。总是。
分布式 sort/merge 与单个主机上的 sort/merge 非常相似。基本思想是在单独的主机之间拆分文件。让每个主机对其单独的文件进行排序,然后开始我在
其中一个主机维护一个优先级队列,其中包含来自其他每个主机的下一个项目。它从该队列中选择第一个,输出它,并轮询它来自的主机以获取下一个项目,并将其插入优先级队列并继续。
它是优先级队列中的优先级队列,分布在多台主机上。从图形上看,它看起来像这样:
Host1 Host2 Host3 Host4
------------------------------------------------------------------
F1 F2 F3 F4 F5 F6 F7 F8 F9 F10 F11 F12 F13 F14 F15 F16
\ | | / \ | | / \ | | / \ | | /
---------- ---------- ------------ ------------
PQ1 PQ2 PQ3 PQ4
\ \ / /
\ \ / /
\ \ / /
\ \ / /
---------------\ /------------------
\ /
\ /
\ /
--
Master PQ
on primary host
现在,一次从各个主机请求单个项目是非常低效的。主要主机可以向每个主机请求 1,000 个项目并将它们保存在单独的缓冲区中。每当主机的缓冲区用完时,主要主机就会向主机请求另一个已满的缓冲区。这将减少网络流量。
这也减少了单个主机上的 I/O:您永远不必将合并的文件写入磁盘。您按照我之前的回答中的描述对单个文件进行排序并将它们写入磁盘,然后您开始在各个主机上进行合并并将项目发送到进行大合并的主要主机。