使用 Apache Spark 写入 HDFS 时的输出序列
Output Sequence while writing to HDFS using Apache Spark
我正在 apache Spark 中开发一个项目,要求将 spark 的处理后输出写入特定格式,如 Header -> Data -> Trailer
。为了写入 HDFS,我使用 .saveAsHadoopFile
方法并使用密钥作为文件名将数据写入多个文件。但问题是数据的顺序未维护文件写入 Data->Header->Trailer
或三个的不同组合。 RDD 转换有什么我遗漏的吗?
好的,在阅读了 google 的 Whosebug 问题、博客和邮件存档之后。我发现了 .union()
和其他转换的工作原理以及分区的管理方式。当我们使用 .union()
时,分区信息会被生成的 RDD 和排序丢失,这就是我的输出序列没有得到维护的原因。
我为解决这个问题所做的是对记录进行编号,例如
页眉 = 1,正文 = 2,页脚 = 3
所以在 RDD 上使用 sortBy
,它是所有三个的并集,我使用这个订单号和 1 个分区对它进行了排序。之后使用密钥作为文件名写入多个文件我使用了 HashPartitioner 这样相同的密钥数据应该进入单独的文件。
val header: RDD[(String,(String,Int))] = ... // this is my header RDD`
val data: RDD[(String,(String,Int))] = ... // this is my data RDD
val footer: RDD[(String,(String,Int))] = ... // this is my footer RDD
val finalRDD: [(String,String)] = header.union(data).union(footer).sortBy(x=>x._2._2,true,1).map(x => (x._1,x._2._1))
val output: RDD[(String,String)] = new PairRDDFunctions[String,String](finalRDD).partitionBy(new HashPartitioner(num))
output.saveAsHadoopFile ... // and using MultipleTextOutputFormat save to multiple file using key as filename
这可能不是最终或最经济的解决方案,但它确实有效。我也在尝试寻找其他方法来将输出序列保持为 Header->Body->Footer
。我还在所有三个 RDD 上尝试了 .coalesce(1)
,然后进行联合,但这只是向 RDD 添加了三个转换,.sortBy
函数也采用了我认为相同的分区信息,但首先合并了 RDD也工作了。如果有人有其他方法请告诉我,或者添加更多内容将非常有帮助,因为我是 Spark
的新手
参考文献:
Write to multiple outputs by key Spark - one Spark job
我正在 apache Spark 中开发一个项目,要求将 spark 的处理后输出写入特定格式,如 Header -> Data -> Trailer
。为了写入 HDFS,我使用 .saveAsHadoopFile
方法并使用密钥作为文件名将数据写入多个文件。但问题是数据的顺序未维护文件写入 Data->Header->Trailer
或三个的不同组合。 RDD 转换有什么我遗漏的吗?
好的,在阅读了 google 的 Whosebug 问题、博客和邮件存档之后。我发现了 .union()
和其他转换的工作原理以及分区的管理方式。当我们使用 .union()
时,分区信息会被生成的 RDD 和排序丢失,这就是我的输出序列没有得到维护的原因。
我为解决这个问题所做的是对记录进行编号,例如
页眉 = 1,正文 = 2,页脚 = 3
所以在 RDD 上使用 sortBy
,它是所有三个的并集,我使用这个订单号和 1 个分区对它进行了排序。之后使用密钥作为文件名写入多个文件我使用了 HashPartitioner 这样相同的密钥数据应该进入单独的文件。
val header: RDD[(String,(String,Int))] = ... // this is my header RDD`
val data: RDD[(String,(String,Int))] = ... // this is my data RDD
val footer: RDD[(String,(String,Int))] = ... // this is my footer RDD
val finalRDD: [(String,String)] = header.union(data).union(footer).sortBy(x=>x._2._2,true,1).map(x => (x._1,x._2._1))
val output: RDD[(String,String)] = new PairRDDFunctions[String,String](finalRDD).partitionBy(new HashPartitioner(num))
output.saveAsHadoopFile ... // and using MultipleTextOutputFormat save to multiple file using key as filename
这可能不是最终或最经济的解决方案,但它确实有效。我也在尝试寻找其他方法来将输出序列保持为 Header->Body->Footer
。我还在所有三个 RDD 上尝试了 .coalesce(1)
,然后进行联合,但这只是向 RDD 添加了三个转换,.sortBy
函数也采用了我认为相同的分区信息,但首先合并了 RDD也工作了。如果有人有其他方法请告诉我,或者添加更多内容将非常有帮助,因为我是 Spark
参考文献:
Write to multiple outputs by key Spark - one Spark job