使用 Spark Streaming+Kafka 的 HDFS 中的空文件夹

Empty folder in HDFS using Spark Streaming+Kafka

我正在使用 Spark Streaming + Kafka 将数据引入 HDFS。

val ssc = new StreamingContext(sparkContext, Seconds(30))
val messageRecBased = KafkaUtils.createStream(ssc, zkQuorum, group, topic)
  .map(_._2)

每 30 秒,Kafka 队列中的所有数据将存储在 HDFS 中的一个单独文件夹中。一些文件夹包含一个名为 part-00000 的空文件,因为在相应的批处理间隔(30 秒)内没有数据。 我使用以下行来过滤这些文件夹:

messageRecBased.filter { x => x.size == 0 }
messageRecBased.repartition(1).saveAsTextFiles("PATH")

但它不起作用,它仍然会生成包含空文件的文件夹。

您可以检查分区是否为空,如果不是则只保存RDD,就像这样。此代码应防止保存空 RDD。

messageRecBased.partitions.isEmpty

如果您查看 DStream.saveAsTextFiles() 方法定义,它只是将 RDD.saveAsObjectFile 生成到 DirectKafkaInputDStream 中的每个 RDD

  def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope {
    val saveFunc = (rdd: RDD[T], time: Time) => {
      val file = rddToFileName(prefix, suffix, time)
      rdd.saveAsObjectFile(file)
    }
    this.foreachRDD(saveFunc)
  }

因此,除了使用 DStream.saveAsTextFiles(),您还可以选择编写自己的内容,例如:

messageRecBased.foreachRDD{ rdd =>
    rdd.repartition(1)
    if(!rdd.isEmpty)
        rdd.saveAsObjectFile("FILE_PATH")
}

这就是我创建新目录并避免空批次的方法。

import java.time.format.DateTimeFormatter
import java.time.LocalDateTime

   messageRecBased.foreachRDD{ rdd =>
        rdd.repartition(1)
        val eachRdd = rdd.map(record => record.value)
        if(!eachRdd.isEmpty)
          eachRdd.saveAsTextFile("hdfs/location/"+DateTimeFormatter.ofPattern("yyyyMMddHHmmss").format(LocalDateTime.now)+"/")
      }