如何在 Spark 2.2 中使用 foreachPartition 来避免任务序列化错误
How to use foreachPartition in Spark 2.2 to avoid Task Serialization error
我有以下使用结构化流 (Spark 2.2) 的工作代码,以便从 Kafka (0.10) 读取数据。
我无法解决的唯一问题是在 ForeachWriter
中使用 kafkaProducer
时与 Task serialization problem
有关。
在我为 Spark 1.6 开发的代码的旧版本中,我使用 foreachPartition
并为每个分区定义 kafkaProducer
以避免任务序列化问题。
我怎样才能在 Spark 2.2 中做到这一点?
val df: Dataset[String] = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "true")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
.map(_._2)
var mySet = spark.sparkContext.broadcast(Map(
"metadataBrokerList"->metadataBrokerList,
"outputKafkaTopic"->outputKafkaTopic,
"batchSize"->batchSize,
"lingerMS"->lingerMS))
val kafkaProducer = Utils.createProducer(mySet.value("metadataBrokerList"),
mySet.value("batchSize"),
mySet.value("lingerMS"))
val writer = new ForeachWriter[String] {
override def process(row: String): Unit = {
// val result = ...
val record = new ProducerRecord[String, String](mySet.value("outputKafkaTopic"), "1", result);
kafkaProducer.send(record)
}
override def close(errorOrNull: Throwable): Unit = {}
override def open(partitionId: Long, version: Long): Boolean = {
true
}
}
val query = df
.writeStream
.foreach(writer)
.start
query.awaitTermination()
spark.stop()
编写并使用 ForeachWriter 的实现。 (避免使用不可序列化对象的匿名 类 - 在您的情况下是 ProducerRecord)
示例:val writer = new YourForeachWriter[String]
这里还有一篇关于 Spark 序列化问题的有用文章:https://www.cakesolutions.net/teamblogs/demystifying-spark-serialisation-error
我有以下使用结构化流 (Spark 2.2) 的工作代码,以便从 Kafka (0.10) 读取数据。
我无法解决的唯一问题是在 ForeachWriter
中使用 kafkaProducer
时与 Task serialization problem
有关。
在我为 Spark 1.6 开发的代码的旧版本中,我使用 foreachPartition
并为每个分区定义 kafkaProducer
以避免任务序列化问题。
我怎样才能在 Spark 2.2 中做到这一点?
val df: Dataset[String] = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "true")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
.map(_._2)
var mySet = spark.sparkContext.broadcast(Map(
"metadataBrokerList"->metadataBrokerList,
"outputKafkaTopic"->outputKafkaTopic,
"batchSize"->batchSize,
"lingerMS"->lingerMS))
val kafkaProducer = Utils.createProducer(mySet.value("metadataBrokerList"),
mySet.value("batchSize"),
mySet.value("lingerMS"))
val writer = new ForeachWriter[String] {
override def process(row: String): Unit = {
// val result = ...
val record = new ProducerRecord[String, String](mySet.value("outputKafkaTopic"), "1", result);
kafkaProducer.send(record)
}
override def close(errorOrNull: Throwable): Unit = {}
override def open(partitionId: Long, version: Long): Boolean = {
true
}
}
val query = df
.writeStream
.foreach(writer)
.start
query.awaitTermination()
spark.stop()
编写并使用 ForeachWriter 的实现。 (避免使用不可序列化对象的匿名 类 - 在您的情况下是 ProducerRecord)
示例:val writer = new YourForeachWriter[String]
这里还有一篇关于 Spark 序列化问题的有用文章:https://www.cakesolutions.net/teamblogs/demystifying-spark-serialisation-error