将任务添加到 ForEachPartition 后无法序列化

Task not serializable after adding it to ForEachPartition

尝试在 spark 结构化流中实现 Apache pulsar Sink 时,我在 spark 中收到任务不可序列化异常。

我已经尝试将 PulsarConfig 外推到一个单独的 class 并在 .foreachPartition lambda 函数中调用它,我通常为 JDBC 连接和我集成到 spark 结构化流中的其他系统执行此操作如下图所示:

PulsarSink Class

class PulsarSink(
                sqlContext: SQLContext,
                parameters: Map[String, String],
                partitionColumns: Seq[String],
                outputMode: OutputMode)  extends Sink{

  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    data.toJSON.foreachPartition( partition => {
      val pulsarConfig = new PulsarConfig(parameters).client
      val producer = pulsarConfig.newProducer(Schema.STRING)
        .topic(parameters.get("topic").get)
        .compressionType(CompressionType.LZ4)
        .sendTimeout(0, TimeUnit.SECONDS)
        .create
      partition.foreach(rec => producer.send(rec))
      producer.flush()
    })
  }

PulsarConfig Class

class PulsarConfig(parameters: Map[String, String]) {

  def client(): PulsarClient = {
    import scala.collection.JavaConverters._

    if(!parameters.get("tlscert").isEmpty && !parameters.get("tlskey").isEmpty) {
      val tlsAuthMap = Map("tlsCertFile" -> parameters.get("tlscert").get,
        "tlsKeyFile" -> parameters.get("tlskey").get).asJava

      val tlsAuth: Authentication = AuthenticationFactory.create(classOf[AuthenticationTls].getName, tlsAuthMap)
      PulsarClient.builder
        .serviceUrl(parameters.get("broker").get)
        .tlsTrustCertsFilePath(parameters.get("tlscert").get)
        .authentication(tlsAuth)
        .enableTlsHostnameVerification(false)
        .allowTlsInsecureConnection(true)
        .build
    }
    else{
      PulsarClient.builder
        .serviceUrl(parameters.get("broker").get)
        .enableTlsHostnameVerification(false)
        .allowTlsInsecureConnection(true)
        .build
    }
  }
}

我收到的错误信息如下:

ERROR StreamExecution: Query [id = 12c715c2-2d62-4523-a37a-4555995ccb74, runId = d409c0db-7078-4654-b0ce-96e46dfb322c] terminated with error
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:340)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:330)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition.apply(RDD.scala:925)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition.apply(RDD.scala:924)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
    at org.apache.spark.sql.Dataset$$anonfun$foreachPartition.apply$mcV$sp(Dataset.scala:2341)
    at org.apache.spark.sql.Dataset$$anonfun$foreachPartition.apply(Dataset.scala:2341)
    at org.apache.spark.sql.Dataset$$anonfun$foreachPartition.apply(Dataset.scala:2341)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2828)
    at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2340)
    at org.apache.spark.datamediation.impl.sink.PulsarSink.addBatch(PulsarSink.scala:20)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch.apply$mcV$sp(StreamExecution.scala:666)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch.apply(StreamExecution.scala:666)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch.apply(StreamExecution.scala:666)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:665)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$$anonfun$apply$mcZ$sp.apply$mcV$sp(StreamExecution.scala:306)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$$anonfun$apply$mcZ$sp.apply(StreamExecution.scala:294)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$$anonfun$apply$mcZ$sp.apply(StreamExecution.scala:294)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches.apply$mcZ$sp(StreamExecution.scala:294)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:290)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:206)
Caused by: java.io.NotSerializableException: org.apache.spark.datamediation.impl.sink.PulsarSink
Serialization stack:
    - object not serializable (class: org.apache.spark.datamediation.impl.sink.PulsarSink, value: org.apache.spark.datamediation.impl.sink.PulsarSink@38813f43)
    - field (class: org.apache.spark.datamediation.impl.sink.PulsarSink$$anonfun$addBatch, name: $outer, type: class org.apache.spark.datamediation.impl.sink.PulsarSink)
    - object (class org.apache.spark.datamediation.impl.sink.PulsarSink$$anonfun$addBatch, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:337)
    ... 31 more

"foreachPartition" 中使用的值可以从 class 级别重新分配给函数变量:

override def addBatch(batchId: Long, data: DataFrame): Unit = {
  val parametersLocal = parameters
  data.toJSON.foreachPartition( partition => {
    val pulsarConfig = new PulsarConfig(parametersLocal).client