将不同分组的 rdd 值写入一个文件

write different grouped rdd values to one file

我有一个 groupedRDD 类型 key = Stringvalue = Iterable<String>

值实际上以 String 格式保存 json 数据,分组键的格式为 <tenant_id>/<year>/<month>

我想根据键名将这个rdd保存到hdfs,每个键名应该只有一个输出文件

示例:如果我的分组 rdd 中有以下键

tenant1/2016/12/output_data.json
tenant1/2017/01/output_data.json
tenant1/2017/02/output_data.json

然后在我的 HDFS 中我应该有三个文件

tenant1/2016/12/output_data.json
tenant1/2017/01/output_data.json
tenant1/2017/02/output_data.json

为此,我尝试了以下方法:

class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  override def generateActualKey(key: Any, value: Any): Any = NullWritable.get()
  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = key.asInstanceOf[String]
}

groupedRDD.partitionBy(new HashPartitioner(1))
    .saveAsHadoopFile("/user/pkhode/output/", classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat])

这给出了预期的输出文件数

/user/pkhode/output/tenant1/2016/12/output_data.json
/user/pkhode/output/tenant1/2017/01/output_data.json
/user/pkhode/output/tenant1/2017/02/output_data.json

但是这些文件中的数据应该是一行中json类型数据的每个字符串。但是结果是这样的

List({json_object_in_string1}, {json_object_in_string2}, .....)

预期结果是

{json_object_in_string1}
{json_object_in_string2}
.....

有人可以指点我,我怎样才能做到这一点?

更新:

感谢@Tim P,我已将我的代码更新为以下内容

groupedRDD.partitionBy(new HashPartitioner(1000)).mapValues(_.mkString("\n")).saveAsHadoopFile(outputPath, classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat])

这个解决方案对于较小的数据量按预期工作正常,但是当我尝试使用大约 20GB 的输入数据集时,它在 mapValue 阶段

期间给我以下错误
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
    at java.util.Arrays.copyOf(Arrays.java:2271)
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
    at com.esotericsoftware.kryo.io.Output.flush(Output.java:181)
    at com.esotericsoftware.kryo.io.Output.require(Output.java:160)
    at com.esotericsoftware.kryo.io.Output.writeString_slow(Output.java:462)
    at com.esotericsoftware.kryo.io.Output.writeString(Output.java:363)
    at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:191)
    at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:184)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
    at com.twitter.chill.TraversableSerializer$$anonfun$write.apply(Traversable.scala:29)
    at com.twitter.chill.TraversableSerializer$$anonfun$write.apply(Traversable.scala:27)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at com.twitter.chill.TraversableSerializer.write(Traversable.scala:27)
    at com.twitter.chill.TraversableSerializer.write(Traversable.scala:21)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
    at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:195)
    at org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:135)
    at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:237)
    at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:164)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

当 Spark 将 RDD 保存为文本文件时,它只是在 RDD 元素上调用 toString。首先尝试将值映射到 String

rdd.mapValues(_.mkString("\n"))

我没有使用 RDD,而是将我的 RDD 转换为 PairedRDD,如下所示:

val resultRDD = inputRDD.map(row => {
  val gson = new GsonBuilder().serializeNulls.create
  val data = gson.toJson(row)
  val fileURL = s"${row.getTenantId}/${row.getYear}/${row.getMonth}/output_data.json"

  (fileURL, data)
})

然后调用 saveAsHadoopFile 将结果保存到单独的文件中,如下所示:

class RddMultiTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  override def generateActualKey(key: Any, value: Any): Any = NullWritable.get()
  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = key.asInstanceOf[String]
}

resultRDD.partitionBy(new HashPartitioner(1000)).saveAsHadoopFile(outputPath, classOf[String], classOf[String], classOf[RddMultiTextOutputFormat])