如何在 Spark 2.2 中使用 foreachPartition 来避免任务序列化错误

How to use foreachPartition in Spark 2.2 to avoid Task Serialization error

我有以下使用结构化流 (Spark 2.2) 的工作代码,以便从 Kafka (0.10) 读取数据。 我无法解决的唯一问题是在 ForeachWriter 中使用 kafkaProducer 时与 Task serialization problem 有关。 在我为 Spark 1.6 开发的代码的旧版本中,我使用 foreachPartition 并为每个分区定义 kafkaProducer 以避免任务序列化问题。 我怎样才能在 Spark 2.2 中做到这一点?

val df: Dataset[String] = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "test") 
      .option("startingOffsets", "latest")
      .option("failOnDataLoss", "true")
      .load()
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)] 
      .map(_._2)

var mySet = spark.sparkContext.broadcast(Map(
  "metadataBrokerList"->metadataBrokerList,
  "outputKafkaTopic"->outputKafkaTopic,
  "batchSize"->batchSize,
  "lingerMS"->lingerMS))

val kafkaProducer = Utils.createProducer(mySet.value("metadataBrokerList"),
                                mySet.value("batchSize"),
                                mySet.value("lingerMS"))

val writer = new ForeachWriter[String] {

    override def process(row: String): Unit = {
         // val result = ...
         val record = new ProducerRecord[String, String](mySet.value("outputKafkaTopic"), "1", result);
        kafkaProducer.send(record)
    }

    override def close(errorOrNull: Throwable): Unit = {}

    override def open(partitionId: Long, version: Long): Boolean = {
      true
    }
}

val query = df
        .writeStream
        .foreach(writer)
        .start

query.awaitTermination()

spark.stop()

编写并使用 ForeachWriter 的实现。 (避免使用不可序列化对象的匿名 类 - 在您的情况下是 ProducerRecord)
示例:val writer = new YourForeachWriter[String]
这里还有一篇关于 Spark 序列化问题的有用文章:https://www.cakesolutions.net/teamblogs/demystifying-spark-serialisation-error