无法从 Spark Streaming 'RDD.foreachPartition' 调用函数,但复制函数的所有行都有效

Can't call a function from Spark Streaming 'RDD.foreachPartition' but copying all lines of the function works

我正在尝试从 Worker 节点创建一个 Spark RDD 流,而不是先在驱动程序处收集它。所以我创建了以下代码

  def writeToKafka[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)], topic: String, keySerializerClass: String, valueSerializerClass: String, brokers: String = producerBroker) = {
    rdd.foreachPartition { partitionOfRecords =>
      val producer = new KafkaProducer[K, V](getProducerProps(keySerializerClass, valueSerializerClass, brokers))
      partitionOfRecords.foreach { message =>
        producer.send(new ProducerRecord[K, V](topic, message._1, message._2))
      }
      producer.close()
    }
  }

  def getProducerProps(keySerializerClass: String, valueSerializerClass: String, brokers: String): Properties = {
    val producerProps: Properties = new Properties
    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass)
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass)
    producerProps
  }

运行 此代码导致以下异常

15/09/01 15:13:00 ERROR JobScheduler: Error running job streaming job 1441120380000 ms.3
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:805)
at com.company.opt.detector.StreamingDetector.writeToKafka(StreamingDetector.scala:84)
at com.company.opt.MyClass.MyClass$$anonfun$doStreamingWork.apply(MyClass.scala:47)
at com.company.opt.MyClass.MyClass$$anonfun$doStreamingWork.apply(MyClass.scala:47)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD.apply(DStream.scala:534)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD.apply(DStream.scala:534)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun.apply$mcV$sp(ForEachDStream.scala:42)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun.apply(ForEachDStream.scala:40)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run.apply$mcV$sp(JobScheduler.scala:176)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run.apply(JobScheduler.scala:176)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run.apply(JobScheduler.scala:176)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: com.company.opt.MyClass.MyClass$
Serialization stack:
- object not serializable (class: com.company.opt.MyClass.MyClass$, value: com.company.opt.MyClass.MyClass$@7e2bb5e0)
- field (class: com.company.opt.detector.StreamingDetector$$anonfun$writeToKafka, name: $outer, type: class com.company.opt.detector.StreamingDetector)
- object (class com.company.opt.detector.StreamingDetector$$anonfun$writeToKafka, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 21 more

然而,当我直接将代码从 getProducerProps 函数复制到我的 writeToKafka 函数时,如下所示,一切正常。

  def writeToKafka[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)], topic: String, keySerializerClass: String, valueSerializerClass: String, brokers: String = producerBroker) = {
    rdd.foreachPartition { partitionOfRecords =>
      val producerProps: Properties = new Properties
      producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
      producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass)
      producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass)
      val producer = new KafkaProducer[K, V](producerProps)
      partitionOfRecords.foreach { message =>
        producer.send(new ProducerRecord[K, V](topic, message._1, message._2))
      }
      producer.close()
    }
  }

有人可以解释为什么会这样吗?谢谢

鉴于getProducerProps是class封闭它的一个方法,当它从闭包中使用时,相当于do this.getProducerProps(...)。 然后问题变得明显:this 被拉入闭包,需要与所有其他字段一起序列化。 class 的某些成员不可序列化并给出此异常。

一个好的做法是把这样的方法放在一个单独的对象中:

object ProducerUtils extends Serializable {
 def getProducerProps(keySerializerClass: String, valueSerializerClass: String, brokers: String): Properties = ???
}

另一种方法是使该方法成为一个函数并将其分配给 val。然后,val 的值被内联,因此不会将整个实例拉入可序列化闭包:

val producerProps: (String,String,String) => Properties = ???

我同意 maasg 的回答,也许您会发现有趣 this post 探索确保闭包中的哪些数据由 Spark 序列化的主题