原因:java.io.NotSerializableException:org.apache.kafka.clients.producer.KafkaProducer

Caused by: java.io.NotSerializableException: org.apache.kafka.clients.producer.KafkaProducer

我尝试用 kafka 生产者发送 java 字符串消息。并且字符串消息是从 Java spark JavaPairDStream.

中提取的
JavaPairDStream<String, String> processedJavaPairStream = inputStream.mapToPair
                 (record-> new Tuple2<>(record.key(), record.value())).mapValues(message -> message.replace('>', '#'));

String outTopics = "outputTopic";
String broker = "localhost:9092";

Properties properties = new Properties();
properties.put("bootstrap.servers", broker);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<String, String>(properties);
processedJavaPairStream.foreachRDD(rdd -> rdd.foreach(tuple2 -> {

    ProducerRecord<String, String> message = new ProducerRecord<String, String>(outTopics, tuple2._1, tuple2._2);

    System.out.println(message.key() + " : " + message.value()); //(1)

    producer.send(message).get(); //(2)
}));

(1) 行正确打印消息字符串。但是当我用像 (2) 行这样的 kafka 生产者发送这些消息时,它会抛出如下异常,

Caused by: java.io.NotSerializableException: org.apache.kafka.clients.producer.KafkaProducer
Serialization stack:
    - object not serializable (class: org.apache.kafka.clients.producer.KafkaProducer, value: org.apache.kafka.clients.producer.KafkaProducer@10f6530d)
    - element of array (index: 1)
    - array (class [Ljava.lang.Object;, size 2)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class com.aaa.StreamingKafkaDataWithSpark.streaming.SimpleDStreamExample, functionalInterfaceMethod=org/apache/spark/api/java/function/VoidFunction.call:(Ljava/lang/Object;)V, implementation=invokeStatic com/aaa/StreamingKafkaDataWithSpark/streaming/SimpleDStreamExample.lambda:(Ljava/lang/String;Lorg/apache/kafka/clients/producer/Producer;Lscala/Tuple2;)V, instantiatedMethodType=(Lscala/Tuple2;)V, numCaptured=2])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class com.aaa.StreamingKafkaDataWithSpark.streaming.SimpleDStreamExample$$Lambda1/0x000000010077e440, com.aaa.StreamingKafkaDataWithSpark.streaming.SimpleDStreamExample$$Lambda1/0x000000010077e440@7bb180e1)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 1)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=interface org.apache.spark.api.java.JavaRDDLike, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/api/java/JavaRDDLike.$anonfun$foreach$adapted:(Lorg/apache/spark/api/java/function/VoidFunction;Ljava/lang/Object;)Ljava/lang/Object;, instantiatedMethodType=(Ljava/lang/Object;)Ljava/lang/Object;, numCaptured=1])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class org.apache.spark.api.java.JavaRDDLike$$Lambda2/0x000000010077c840, org.apache.spark.api.java.JavaRDDLike$$Lambda2/0x000000010077c840@7fb1cd32)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
    ... 30 more
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
    at org.apache.spark.rdd.RDD.$anonfun$foreach(RDD.scala:926)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.foreach(RDD.scala:925)
    at org.apache.spark.api.java.JavaRDDLike.foreach(JavaRDDLike.scala:351)
    at org.apache.spark.api.java.JavaRDDLike.foreach$(JavaRDDLike.scala:350)
    at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:45)
    at com.aaa.StreamingKafkaDataWithSpark.streaming.SimpleDStreamExample.lambda(SimpleDStreamExample.java:72)
    at org.apache.spark.streaming.api.java.JavaDStreamLike.$anonfun$foreachRDD(JavaDStreamLike.scala:272)
    at org.apache.spark.streaming.api.java.JavaDStreamLike.$anonfun$foreachRDD$adapted(JavaDStreamLike.scala:272)
    at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD(DStream.scala:628)
    at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$adapted(DStream.scala:628)
    at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob(ForEachDStream.scala:51)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
    at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob(ForEachDStream.scala:51)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.util.Try$.apply(Try.scala:213)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run(JobScheduler.scala:257)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:257)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

我无法理解这个异常。我确认 kafaka 生产者消息是第 (1) 行的 <String,String> 类型。但是为什么第(2)行会抛出这个异常呢?我会错过任何流程吗?

您需要为每个 RDD 创建生产者。

RDD 分布在多个执行器上,Producer 对象无法序列化以在它们之间共享


或者,查看 documentation of Structured Streaming,您可以简单地这样做来写入一个主题;无需自己创建和发送记录

stream.writeStream().format("kafka")... 

请注意,如果目标只是将一个主题映射到另一个主题,Kafka Streams API

比 Spark

更简单且开销更少