Google 云存储的 S3Guard 或 s3committer
S3Guard or s3committer for Google Cloud Storage
我在 Google Cloud Platform 上使用 Dataproc 和 Parquet,数据在 GCS 上,写入大量中小型文件是一个主要的麻烦,比我得到的要慢几倍使用较小的文件或 HDFS。
Hadoop 社区一直致力于 S3Guard,它使用 DynamoDB for S3A。类似地,s3committer 使用 S3 的多部分 API 来提供一个更高效的简单替代提交程序。
我正在 GCS 上寻找类似的解决方案。来自 S3 的多部分 API 是 GCS 的 XML API 不提供的少数东西之一,因此不能按原样使用。相反,GCS 有一个 "combine" API ,您可以在其中单独上传文件,然后发出合并查询。这似乎可以用来适应 s3committer
的多部分上传,但我不太确定。
我找不到任何关于在 GCS 上使用 S3Guard 和备用键值存储的信息(以及 S3A 连接器——甚至不确定它是否可以与 GCS 一起使用 XML API) .
0-重命名提交似乎是 Hadoop 和 Apache Spark 的常见问题。除了 "writing less, bigger files" 之外,GCS 上通常的解决方案是什么?
这里有一些不同的东西在起作用。对于强制执行列表一致性的问题,Dataproc 传统上依赖于每个集群的 NFS 挂载来应用客户端强制执行的写入后列表一致性;最近,Google Cloud Storage 已设法改进其写入后列表一致性语义,现在列表操作在所有写入后立即具有强一致性。 Dataproc 正在逐步取消客户端强制一致性,GCS 不再需要 DynamoDB 上的 S3Guard 之类的东西。
至于分段上传,理论上可以像您提到的那样使用 GCS Compose,但在大多数情况下,单个大文件的并行分段上传在单流情况下最有帮助,而大多数 Hadoop/Spark 工作负载已经在每台机器上并行执行不同的任务,因此对每个单独的上传流进行多线程处理是无益的;无论有没有并行分段上传,总吞吐量将大致相同。
这样就留下了使用多部分 API 来执行 conditional/atomic 提交的问题。 Hadoop 的 GCS 连接器目前确实使用了一种叫做 "resumable uploads" 的东西,理论上一个节点可以负责 "committing" 一个由完全不同的节点上传的对象;客户端库目前的结构还不足以让这一切变得非常简单。然而,与此同时,GCS "rename" 的 "copy-and-delete" 阶段也与 S3 不同,因为它是作为元数据操作而不是真正的数据 "copy" 完成的。这使得 GCS 可以使用 vanilla Hadoop FileCommitter,而不是需要将 "directly" 提交到最终位置并跳过“_temporary”机制。必须 "copy/delete" 每个文件的元数据而不是真正的目录重命名可能并不理想,但它也与基础数据大小不成正比,仅与文件数量成正比。
当然,这一切仍然没有解决提交大量小文件效率低下的问题。但是,它确实使 "direct commit" 方面可能不像您想象的那么重要;更常见的是,更大的问题是 Hive 在完成时没有并行化文件提交,尤其是在提交到大量分区目录时。 Spark 在这方面做得更好,Hive 应该会随着时间的推移而改进。
最近使用 native SSL library in Dataproc 1.2 提高了性能,您无需 "write less, bigger files" 即可尝试,只需使用开箱即用的 Dataproc 1.2。
否则,真正的解决方案确实涉及写入更少、更大的文件,因为即使您修复了写入端,如果您有太多小文件,您也会在读取端受到影响。 GCS 针对吞吐量进行了高度优化,因此与实际计算相比,任何小于 64MB 或 128MB 的数据都可能花费更多时间在启动任务和打开流的开销上(应该能够在 200ms-500ms 或所以)。
在这种情况下,您需要确保设置 hive.merge.mapfiles
、hive.merge.mapredfiles
或 hive.merge.tezfiles
之类的设置(如果您正在使用这些设置),或者重新分区您的 Spark 数据帧在保存到 GCS 之前;合并到更大的分区通常是非常值得的,因为它可以让您的文件易于管理并从持续的更快读取中获益。
编辑:我忘记提及的一件事是,我一直在松散地使用术语 repartition
,但在这种情况下,由于我们严格尝试将文件打包成更大的文件,您可能用 coalesce
做的更好;在另一个 .
中有更多讨论
S3Guard,HADOOP-13345 通过让 DynamoDB 存储列表来改进 S3 的一致性。这首次使可靠地使用 S3A 作为工作的直接目标成为可能。如果没有它,执行时间 可能 似乎是问题所在,但真正的问题是基于重命名的提交者可能会得到不一致的列表,甚至看不到它必须重命名的文件。
S3Guard Committer 工作 HADOOP-13786 将在完成后(截至 2017 年 8 月,仍在进行中)提供两个提交者。
暂存提交者
- 工作人员写入本地文件系统
- 任务提交者上传到S3但没有完成操作。相反,它将提交元信息保存到 HDFS。
- 此提交元信息在 HDFS 中作为正常 task/job 数据提交。
- 在作业提交中,提交者从 HDFS 读取待处理提交的数据并完成它们,然后清理任何未完成的提交。
任务提交是一次所有数据的上传,时间为O(data/bandwidth
).
这是基于 Ryan 在 Netflix 的 s3committer,一开始玩起来最安全。
魔法提交者
调用是因为它 "magic" 在文件系统中。
- 文件系统本身识别像
s3a://dest/__magic/job044/task001/__base/part-000.orc.snappy
这样的路径
- 将写入重定向到
s3a://dest/__magic/job044/task001/__base/part-000.orc.snappy
;未完成流 close()
调用中的写入。
- 将提交元信息保存到 s3a,此处
s3a://dest/__magic/job044/task001/__base/part-000.orc.snappy.pending
任务提交:从该目录加载所有 .pending 文件,聚合,保存在别处。时间是O(files)
;数据大小不重要。
任务中止:加载所有 .pending 文件,中止提交
- 作业提交:从已提交的任务中加载所有挂起的文件,完成。
因为它是在 S3 中列出文件,所以需要 S3Guard 来提供 AWS S3 上的一致性(其他 S3 实现开箱即用,所以不需要它)。
两位提交者共享相同的代码库,他们的工作提交将是 O(files/threads)
,因为他们都是短 POST 请求,不占用带宽或太多时间。
在测试中,对于小型测试规模文件,登台提交器比魔术提交器更快,因为魔术提交器与 S3 交谈更多,速度很慢......尽管 S3Guard 速度 listing/getFileStatus 调用。您写入的数据越多,临时提交者提交的任务提交时间就越长,而魔术任务的任务提交对于相同数量的文件来说是不变的。两者都比使用 rename() 更快,因为它是如何被列表模仿的,复制
GCS 和 Hadoop/Spark 提交算法
(我这里没有看GCS代码,所以保留错误的权利。以Dennis Huo的说法为权威)
如果 GCS 的 rename() 比 S3A copy-then-delete 更有效,它应该更快,O(file) 比 O(data) 多,这取决于代码中的并行化。
我不知道他们是否可以寻找 0 重命名提交者。 FileOutputFormat
下 mapreduce 代码的更改旨在支持 different/pluggable 不同文件系统的提交者,因此他们有机会在这里做点什么。
现在,请确保您使用的是 v2 MR 提交算法,该算法虽然对故障的恢复能力较差,但至少会将重命名推入任务提交,而不是作业提交。
我在 Google Cloud Platform 上使用 Dataproc 和 Parquet,数据在 GCS 上,写入大量中小型文件是一个主要的麻烦,比我得到的要慢几倍使用较小的文件或 HDFS。
Hadoop 社区一直致力于 S3Guard,它使用 DynamoDB for S3A。类似地,s3committer 使用 S3 的多部分 API 来提供一个更高效的简单替代提交程序。
我正在 GCS 上寻找类似的解决方案。来自 S3 的多部分 API 是 GCS 的 XML API 不提供的少数东西之一,因此不能按原样使用。相反,GCS 有一个 "combine" API ,您可以在其中单独上传文件,然后发出合并查询。这似乎可以用来适应 s3committer
的多部分上传,但我不太确定。
我找不到任何关于在 GCS 上使用 S3Guard 和备用键值存储的信息(以及 S3A 连接器——甚至不确定它是否可以与 GCS 一起使用 XML API) .
0-重命名提交似乎是 Hadoop 和 Apache Spark 的常见问题。除了 "writing less, bigger files" 之外,GCS 上通常的解决方案是什么?
这里有一些不同的东西在起作用。对于强制执行列表一致性的问题,Dataproc 传统上依赖于每个集群的 NFS 挂载来应用客户端强制执行的写入后列表一致性;最近,Google Cloud Storage 已设法改进其写入后列表一致性语义,现在列表操作在所有写入后立即具有强一致性。 Dataproc 正在逐步取消客户端强制一致性,GCS 不再需要 DynamoDB 上的 S3Guard 之类的东西。
至于分段上传,理论上可以像您提到的那样使用 GCS Compose,但在大多数情况下,单个大文件的并行分段上传在单流情况下最有帮助,而大多数 Hadoop/Spark 工作负载已经在每台机器上并行执行不同的任务,因此对每个单独的上传流进行多线程处理是无益的;无论有没有并行分段上传,总吞吐量将大致相同。
这样就留下了使用多部分 API 来执行 conditional/atomic 提交的问题。 Hadoop 的 GCS 连接器目前确实使用了一种叫做 "resumable uploads" 的东西,理论上一个节点可以负责 "committing" 一个由完全不同的节点上传的对象;客户端库目前的结构还不足以让这一切变得非常简单。然而,与此同时,GCS "rename" 的 "copy-and-delete" 阶段也与 S3 不同,因为它是作为元数据操作而不是真正的数据 "copy" 完成的。这使得 GCS 可以使用 vanilla Hadoop FileCommitter,而不是需要将 "directly" 提交到最终位置并跳过“_temporary”机制。必须 "copy/delete" 每个文件的元数据而不是真正的目录重命名可能并不理想,但它也与基础数据大小不成正比,仅与文件数量成正比。
当然,这一切仍然没有解决提交大量小文件效率低下的问题。但是,它确实使 "direct commit" 方面可能不像您想象的那么重要;更常见的是,更大的问题是 Hive 在完成时没有并行化文件提交,尤其是在提交到大量分区目录时。 Spark 在这方面做得更好,Hive 应该会随着时间的推移而改进。
最近使用 native SSL library in Dataproc 1.2 提高了性能,您无需 "write less, bigger files" 即可尝试,只需使用开箱即用的 Dataproc 1.2。
否则,真正的解决方案确实涉及写入更少、更大的文件,因为即使您修复了写入端,如果您有太多小文件,您也会在读取端受到影响。 GCS 针对吞吐量进行了高度优化,因此与实际计算相比,任何小于 64MB 或 128MB 的数据都可能花费更多时间在启动任务和打开流的开销上(应该能够在 200ms-500ms 或所以)。
在这种情况下,您需要确保设置 hive.merge.mapfiles
、hive.merge.mapredfiles
或 hive.merge.tezfiles
之类的设置(如果您正在使用这些设置),或者重新分区您的 Spark 数据帧在保存到 GCS 之前;合并到更大的分区通常是非常值得的,因为它可以让您的文件易于管理并从持续的更快读取中获益。
编辑:我忘记提及的一件事是,我一直在松散地使用术语 repartition
,但在这种情况下,由于我们严格尝试将文件打包成更大的文件,您可能用 coalesce
做的更好;在另一个
S3Guard,HADOOP-13345 通过让 DynamoDB 存储列表来改进 S3 的一致性。这首次使可靠地使用 S3A 作为工作的直接目标成为可能。如果没有它,执行时间 可能 似乎是问题所在,但真正的问题是基于重命名的提交者可能会得到不一致的列表,甚至看不到它必须重命名的文件。
S3Guard Committer 工作 HADOOP-13786 将在完成后(截至 2017 年 8 月,仍在进行中)提供两个提交者。
暂存提交者
- 工作人员写入本地文件系统
- 任务提交者上传到S3但没有完成操作。相反,它将提交元信息保存到 HDFS。
- 此提交元信息在 HDFS 中作为正常 task/job 数据提交。
- 在作业提交中,提交者从 HDFS 读取待处理提交的数据并完成它们,然后清理任何未完成的提交。
任务提交是一次所有数据的上传,时间为O(data/bandwidth
).
这是基于 Ryan 在 Netflix 的 s3committer,一开始玩起来最安全。
魔法提交者
调用是因为它 "magic" 在文件系统中。
- 文件系统本身识别像
s3a://dest/__magic/job044/task001/__base/part-000.orc.snappy
这样的路径
- 将写入重定向到
s3a://dest/__magic/job044/task001/__base/part-000.orc.snappy
;未完成流close()
调用中的写入。 - 将提交元信息保存到 s3a,此处
s3a://dest/__magic/job044/task001/__base/part-000.orc.snappy.pending
任务提交:从该目录加载所有 .pending 文件,聚合,保存在别处。时间是
O(files)
;数据大小不重要。任务中止:加载所有 .pending 文件,中止提交
- 作业提交:从已提交的任务中加载所有挂起的文件,完成。
因为它是在 S3 中列出文件,所以需要 S3Guard 来提供 AWS S3 上的一致性(其他 S3 实现开箱即用,所以不需要它)。
两位提交者共享相同的代码库,他们的工作提交将是 O(files/threads)
,因为他们都是短 POST 请求,不占用带宽或太多时间。
在测试中,对于小型测试规模文件,登台提交器比魔术提交器更快,因为魔术提交器与 S3 交谈更多,速度很慢......尽管 S3Guard 速度 listing/getFileStatus 调用。您写入的数据越多,临时提交者提交的任务提交时间就越长,而魔术任务的任务提交对于相同数量的文件来说是不变的。两者都比使用 rename() 更快,因为它是如何被列表模仿的,复制
GCS 和 Hadoop/Spark 提交算法
(我这里没有看GCS代码,所以保留错误的权利。以Dennis Huo的说法为权威)
如果 GCS 的 rename() 比 S3A copy-then-delete 更有效,它应该更快,O(file) 比 O(data) 多,这取决于代码中的并行化。
我不知道他们是否可以寻找 0 重命名提交者。 FileOutputFormat
下 mapreduce 代码的更改旨在支持 different/pluggable 不同文件系统的提交者,因此他们有机会在这里做点什么。
现在,请确保您使用的是 v2 MR 提交算法,该算法虽然对故障的恢复能力较差,但至少会将重命名推入任务提交,而不是作业提交。