使用 Spark Streaming+Kafka 的 HDFS 中的空文件夹
Empty folder in HDFS using Spark Streaming+Kafka
我正在使用 Spark Streaming + Kafka 将数据引入 HDFS。
val ssc = new StreamingContext(sparkContext, Seconds(30))
val messageRecBased = KafkaUtils.createStream(ssc, zkQuorum, group, topic)
.map(_._2)
每 30 秒,Kafka 队列中的所有数据将存储在 HDFS 中的一个单独文件夹中。一些文件夹包含一个名为 part-00000 的空文件,因为在相应的批处理间隔(30 秒)内没有数据。
我使用以下行来过滤这些文件夹:
messageRecBased.filter { x => x.size == 0 }
messageRecBased.repartition(1).saveAsTextFiles("PATH")
但它不起作用,它仍然会生成包含空文件的文件夹。
您可以检查分区是否为空,如果不是则只保存RDD,就像这样。此代码应防止保存空 RDD。
messageRecBased.partitions.isEmpty
如果您查看 DStream.saveAsTextFiles() 方法定义,它只是将 RDD.saveAsObjectFile
生成到 DirectKafkaInputDStream
中的每个 RDD
。
def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope {
val saveFunc = (rdd: RDD[T], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsObjectFile(file)
}
this.foreachRDD(saveFunc)
}
因此,除了使用 DStream.saveAsTextFiles()
,您还可以选择编写自己的内容,例如:
messageRecBased.foreachRDD{ rdd =>
rdd.repartition(1)
if(!rdd.isEmpty)
rdd.saveAsObjectFile("FILE_PATH")
}
这就是我创建新目录并避免空批次的方法。
import java.time.format.DateTimeFormatter
import java.time.LocalDateTime
messageRecBased.foreachRDD{ rdd =>
rdd.repartition(1)
val eachRdd = rdd.map(record => record.value)
if(!eachRdd.isEmpty)
eachRdd.saveAsTextFile("hdfs/location/"+DateTimeFormatter.ofPattern("yyyyMMddHHmmss").format(LocalDateTime.now)+"/")
}
我正在使用 Spark Streaming + Kafka 将数据引入 HDFS。
val ssc = new StreamingContext(sparkContext, Seconds(30))
val messageRecBased = KafkaUtils.createStream(ssc, zkQuorum, group, topic)
.map(_._2)
每 30 秒,Kafka 队列中的所有数据将存储在 HDFS 中的一个单独文件夹中。一些文件夹包含一个名为 part-00000 的空文件,因为在相应的批处理间隔(30 秒)内没有数据。 我使用以下行来过滤这些文件夹:
messageRecBased.filter { x => x.size == 0 }
messageRecBased.repartition(1).saveAsTextFiles("PATH")
但它不起作用,它仍然会生成包含空文件的文件夹。
您可以检查分区是否为空,如果不是则只保存RDD,就像这样。此代码应防止保存空 RDD。
messageRecBased.partitions.isEmpty
如果您查看 DStream.saveAsTextFiles() 方法定义,它只是将 RDD.saveAsObjectFile
生成到 DirectKafkaInputDStream
中的每个 RDD
。
def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope {
val saveFunc = (rdd: RDD[T], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsObjectFile(file)
}
this.foreachRDD(saveFunc)
}
因此,除了使用 DStream.saveAsTextFiles()
,您还可以选择编写自己的内容,例如:
messageRecBased.foreachRDD{ rdd =>
rdd.repartition(1)
if(!rdd.isEmpty)
rdd.saveAsObjectFile("FILE_PATH")
}
这就是我创建新目录并避免空批次的方法。
import java.time.format.DateTimeFormatter
import java.time.LocalDateTime
messageRecBased.foreachRDD{ rdd =>
rdd.repartition(1)
val eachRdd = rdd.map(record => record.value)
if(!eachRdd.isEmpty)
eachRdd.saveAsTextFile("hdfs/location/"+DateTimeFormatter.ofPattern("yyyyMMddHHmmss").format(LocalDateTime.now)+"/")
}