将kafka的Spark批量流式传输到单个文件中
Spark batch-streaming of kafka into single file
我正在使用批处理流 (maxRatePerPartition 10.000) 从 Kafka 流式传输数据。所以在每批中我处理 10.000 条 kafka 消息。
在这个批处理中 运行 我通过从 rdd 创建一个数据帧来处理每条消息。处理后,我将每条处理过的记录保存到同一个文件中:dataFrame.write.mode(SaveMode.append)。
所以它将所有消息附加到同一个文件。
只要在一批 运行 内 运行 就可以了。但是在执行下一批 运行 后(处理接下来的 10.000 条消息),它会为接下来的 10.000 条消息创建一个新文件。
现在的问题是:每个文件(块)保留文件系统的 50mb,但只包含大约 1mb(10.000 条消息)。
与其每批 运行 创建新文件,我宁愿将其全部附加到一个文件,只要它不超过 50mb。
你知道怎么做吗?或者为什么它在我的例子中不起作用?您可以在这里查看我的编码:
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.immutable.Set
object SparkStreaming extends Constants {
def main(args: Array[String]) {
//create a new Spark configuration...
val conf = new SparkConf()
.setMaster("local[2]") // ...using 2 cores
.setAppName("Streaming")
.set("spark.streaming.kafka.maxRatePerPartition", "10000") //... processing max. 10000 messages per second
//create a streaming context for micro batch
val ssc = new StreamingContext(conf, Seconds(1)) //Note: processing max. 1*10000 messages (see config above.)
//Setup up Kafka DStream
val kafkaParams = Map("metadata.broker.list" -> "sandbox.hortonworks.com:6667",
"auto.offset.reset" -> "smallest") //Start from the beginning
val kafkaTopics = Set(KAFKA_TOPIC_PARQUET)
val directKafkaStream = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc,
kafkaParams, kafkaTopics)
val records = directKafkaStream.map(Source => StreamingFunctions.transformAvroSource(Source))
records.foreachRDD((rdd: RDD[TimeseriesRddRecord], time: Time) => {
val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) // Worker node singleton
import sqlContext.implicits._
val dataFrame = rdd.toDF()
dataFrame.write.mode(SaveMode.Append).partitionBy(PARQUET_PARTITIONBY_COLUMNS :_*).parquet(PARQUET_FILE_PATH_TIMESERIES_LOCAL)
println(s"Written entries: ${dataFrame.count()}")
}
)
//start streaming until the process is killed
ssc.start()
ssc.awaitTermination()
}
/** Case class for converting RDD to DataFrame */
case class DataFrameRecord(thingId: String, timestamp: Long, propertyName: String, propertyValue: Double)
/** Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton {
@transient private var instance: SQLContext = _
def getInstance(sparkContext: SparkContext): SQLContext = {
if (instance == null) {
instance = new SQLContext(sparkContext)
}
instance
}
}
}
我很乐意听取您的意见。
谢谢,亚历克斯
这可以通过使用 coalesce
函数然后覆盖现有文件来完成。
但是正如线程Spark coalesce looses file when program is aborted中所讨论的那样,当程序被中断时会出现错误。
所以暂时看来实现这样的逻辑还不够
我正在使用批处理流 (maxRatePerPartition 10.000) 从 Kafka 流式传输数据。所以在每批中我处理 10.000 条 kafka 消息。
在这个批处理中 运行 我通过从 rdd 创建一个数据帧来处理每条消息。处理后,我将每条处理过的记录保存到同一个文件中:dataFrame.write.mode(SaveMode.append)。 所以它将所有消息附加到同一个文件。
只要在一批 运行 内 运行 就可以了。但是在执行下一批 运行 后(处理接下来的 10.000 条消息),它会为接下来的 10.000 条消息创建一个新文件。
现在的问题是:每个文件(块)保留文件系统的 50mb,但只包含大约 1mb(10.000 条消息)。 与其每批 运行 创建新文件,我宁愿将其全部附加到一个文件,只要它不超过 50mb。
你知道怎么做吗?或者为什么它在我的例子中不起作用?您可以在这里查看我的编码:
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SQLContext, SaveMode}
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.immutable.Set
object SparkStreaming extends Constants {
def main(args: Array[String]) {
//create a new Spark configuration...
val conf = new SparkConf()
.setMaster("local[2]") // ...using 2 cores
.setAppName("Streaming")
.set("spark.streaming.kafka.maxRatePerPartition", "10000") //... processing max. 10000 messages per second
//create a streaming context for micro batch
val ssc = new StreamingContext(conf, Seconds(1)) //Note: processing max. 1*10000 messages (see config above.)
//Setup up Kafka DStream
val kafkaParams = Map("metadata.broker.list" -> "sandbox.hortonworks.com:6667",
"auto.offset.reset" -> "smallest") //Start from the beginning
val kafkaTopics = Set(KAFKA_TOPIC_PARQUET)
val directKafkaStream = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc,
kafkaParams, kafkaTopics)
val records = directKafkaStream.map(Source => StreamingFunctions.transformAvroSource(Source))
records.foreachRDD((rdd: RDD[TimeseriesRddRecord], time: Time) => {
val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) // Worker node singleton
import sqlContext.implicits._
val dataFrame = rdd.toDF()
dataFrame.write.mode(SaveMode.Append).partitionBy(PARQUET_PARTITIONBY_COLUMNS :_*).parquet(PARQUET_FILE_PATH_TIMESERIES_LOCAL)
println(s"Written entries: ${dataFrame.count()}")
}
)
//start streaming until the process is killed
ssc.start()
ssc.awaitTermination()
}
/** Case class for converting RDD to DataFrame */
case class DataFrameRecord(thingId: String, timestamp: Long, propertyName: String, propertyValue: Double)
/** Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton {
@transient private var instance: SQLContext = _
def getInstance(sparkContext: SparkContext): SQLContext = {
if (instance == null) {
instance = new SQLContext(sparkContext)
}
instance
}
}
}
我很乐意听取您的意见。 谢谢,亚历克斯
这可以通过使用 coalesce
函数然后覆盖现有文件来完成。
但是正如线程Spark coalesce looses file when program is aborted中所讨论的那样,当程序被中断时会出现错误。
所以暂时看来实现这样的逻辑还不够