将不同分组的 rdd 值写入一个文件
write different grouped rdd values to one file
我有一个 groupedRDD
类型 key = String
和 value = Iterable<String>
值实际上以 String
格式保存 json
数据,分组键的格式为 <tenant_id>/<year>/<month>
我想根据键名将这个rdd保存到hdfs,每个键名应该只有一个输出文件
示例:如果我的分组 rdd 中有以下键
tenant1/2016/12/output_data.json
tenant1/2017/01/output_data.json
tenant1/2017/02/output_data.json
然后在我的 HDFS 中我应该有三个文件
tenant1/2016/12/output_data.json
tenant1/2017/01/output_data.json
tenant1/2017/02/output_data.json
为此,我尝试了以下方法:
class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
override def generateActualKey(key: Any, value: Any): Any = NullWritable.get()
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = key.asInstanceOf[String]
}
groupedRDD.partitionBy(new HashPartitioner(1))
.saveAsHadoopFile("/user/pkhode/output/", classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat])
这给出了预期的输出文件数
/user/pkhode/output/tenant1/2016/12/output_data.json
/user/pkhode/output/tenant1/2017/01/output_data.json
/user/pkhode/output/tenant1/2017/02/output_data.json
但是这些文件中的数据应该是一行中json类型数据的每个字符串。但是结果是这样的
List({json_object_in_string1}, {json_object_in_string2}, .....)
预期结果是
{json_object_in_string1}
{json_object_in_string2}
.....
有人可以指点我,我怎样才能做到这一点?
更新:
感谢@Tim P,我已将我的代码更新为以下内容
groupedRDD.partitionBy(new HashPartitioner(1000)).mapValues(_.mkString("\n")).saveAsHadoopFile(outputPath, classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat])
这个解决方案对于较小的数据量按预期工作正常,但是当我尝试使用大约 20GB 的输入数据集时,它在 mapValue
阶段
期间给我以下错误
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at java.util.Arrays.copyOf(Arrays.java:2271)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:181)
at com.esotericsoftware.kryo.io.Output.require(Output.java:160)
at com.esotericsoftware.kryo.io.Output.writeString_slow(Output.java:462)
at com.esotericsoftware.kryo.io.Output.writeString(Output.java:363)
at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:191)
at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:184)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
at com.twitter.chill.TraversableSerializer$$anonfun$write.apply(Traversable.scala:29)
at com.twitter.chill.TraversableSerializer$$anonfun$write.apply(Traversable.scala:27)
at scala.collection.immutable.List.foreach(List.scala:381)
at com.twitter.chill.TraversableSerializer.write(Traversable.scala:27)
at com.twitter.chill.TraversableSerializer.write(Traversable.scala:21)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:195)
at org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:135)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:237)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:164)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
当 Spark 将 RDD 保存为文本文件时,它只是在 RDD 元素上调用 toString
。首先尝试将值映射到 String
:
rdd.mapValues(_.mkString("\n"))
我没有使用 RDD
,而是将我的 RDD 转换为 PairedRDD
,如下所示:
val resultRDD = inputRDD.map(row => {
val gson = new GsonBuilder().serializeNulls.create
val data = gson.toJson(row)
val fileURL = s"${row.getTenantId}/${row.getYear}/${row.getMonth}/output_data.json"
(fileURL, data)
})
然后调用 saveAsHadoopFile
将结果保存到单独的文件中,如下所示:
class RddMultiTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
override def generateActualKey(key: Any, value: Any): Any = NullWritable.get()
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = key.asInstanceOf[String]
}
resultRDD.partitionBy(new HashPartitioner(1000)).saveAsHadoopFile(outputPath, classOf[String], classOf[String], classOf[RddMultiTextOutputFormat])
我有一个 groupedRDD
类型 key = String
和 value = Iterable<String>
值实际上以 String
格式保存 json
数据,分组键的格式为 <tenant_id>/<year>/<month>
我想根据键名将这个rdd保存到hdfs,每个键名应该只有一个输出文件
示例:如果我的分组 rdd 中有以下键
tenant1/2016/12/output_data.json
tenant1/2017/01/output_data.json
tenant1/2017/02/output_data.json
然后在我的 HDFS 中我应该有三个文件
tenant1/2016/12/output_data.json
tenant1/2017/01/output_data.json
tenant1/2017/02/output_data.json
为此,我尝试了以下方法:
class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
override def generateActualKey(key: Any, value: Any): Any = NullWritable.get()
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = key.asInstanceOf[String]
}
groupedRDD.partitionBy(new HashPartitioner(1))
.saveAsHadoopFile("/user/pkhode/output/", classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat])
这给出了预期的输出文件数
/user/pkhode/output/tenant1/2016/12/output_data.json
/user/pkhode/output/tenant1/2017/01/output_data.json
/user/pkhode/output/tenant1/2017/02/output_data.json
但是这些文件中的数据应该是一行中json类型数据的每个字符串。但是结果是这样的
List({json_object_in_string1}, {json_object_in_string2}, .....)
预期结果是
{json_object_in_string1}
{json_object_in_string2}
.....
有人可以指点我,我怎样才能做到这一点?
更新:
感谢@Tim P,我已将我的代码更新为以下内容
groupedRDD.partitionBy(new HashPartitioner(1000)).mapValues(_.mkString("\n")).saveAsHadoopFile(outputPath, classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat])
这个解决方案对于较小的数据量按预期工作正常,但是当我尝试使用大约 20GB 的输入数据集时,它在 mapValue
阶段
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at java.util.Arrays.copyOf(Arrays.java:2271)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:181)
at com.esotericsoftware.kryo.io.Output.require(Output.java:160)
at com.esotericsoftware.kryo.io.Output.writeString_slow(Output.java:462)
at com.esotericsoftware.kryo.io.Output.writeString(Output.java:363)
at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:191)
at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:184)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
at com.twitter.chill.TraversableSerializer$$anonfun$write.apply(Traversable.scala:29)
at com.twitter.chill.TraversableSerializer$$anonfun$write.apply(Traversable.scala:27)
at scala.collection.immutable.List.foreach(List.scala:381)
at com.twitter.chill.TraversableSerializer.write(Traversable.scala:27)
at com.twitter.chill.TraversableSerializer.write(Traversable.scala:21)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:195)
at org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:135)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:237)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:164)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
当 Spark 将 RDD 保存为文本文件时,它只是在 RDD 元素上调用 toString
。首先尝试将值映射到 String
:
rdd.mapValues(_.mkString("\n"))
我没有使用 RDD
,而是将我的 RDD 转换为 PairedRDD
,如下所示:
val resultRDD = inputRDD.map(row => {
val gson = new GsonBuilder().serializeNulls.create
val data = gson.toJson(row)
val fileURL = s"${row.getTenantId}/${row.getYear}/${row.getMonth}/output_data.json"
(fileURL, data)
})
然后调用 saveAsHadoopFile
将结果保存到单独的文件中,如下所示:
class RddMultiTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
override def generateActualKey(key: Any, value: Any): Any = NullWritable.get()
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = key.asInstanceOf[String]
}
resultRDD.partitionBy(new HashPartitioner(1000)).saveAsHadoopFile(outputPath, classOf[String], classOf[String], classOf[RddMultiTextOutputFormat])