java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord 在卡夫卡

java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord in Kafka

我是 Apache Kafka 的初学者。以下代码示例适用于我的 Kafka 生产者和消费者。

Kafka生产者代码:

public static void main(String[] args) throws Exception{
        // TODO Auto-generated method stub
        String inputTopic = "inputTopic";
        String broker = "localhost:9092";
        
        Properties properties = new Properties();
        properties.put("bootstrap.servers", broker);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,  "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        
        Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(properties);
        String message = "  >>>>>>>>>>>>>>>>>>>>> Data Message";
        int key = 0;
        
        while(key < 1) {
            key = key + 1;
            
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(inputTopic, String.valueOf(key), (message + " " + key));
            producer.send(record).get();
        }
        
        producer.close();

接下来的代码是关于 Kafka 消费者的

Kafka消费者代码:

public static void main(String[] args) throws Exception{
        // TODO Auto-generated method stub
        SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("SimpleDStreamExample");
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        
        Collection<String> topics = Arrays.asList("inputTopic");
        
        Map<String, Object> kafkaParams = new HashMap<String, Object>();
        kafkaParams.put("bootstrap.servers", "localhost:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "spark-demo");
        kafkaParams.put("kafka.consumer.id", "kafka-consumer-01");

        kafkaParams.put("auto.offset.reset", "earliest");
        kafkaParams.put("enable.auto.commit", false);
        
        JavaStreamingContext ssc = new JavaStreamingContext(jsc, new Duration(1000));
        
        JavaInputDStream<ConsumerRecord<String, String>> inputStream = KafkaUtils.createDirectStream(
                ssc, LocationStrategies.PreferConsistent(), 
                ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)); 
        
    
        inputStream.print();
        ssc.start();
        ssc.awaitTermination();
        ssc.close();
    }

但是Kafka消费者抛出如下异常,

19/09/16 20:57:58 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
    - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = inputTopic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1568634372952, serialized key size = 1, serialized value size = 38, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1, value =   >>>>>>>>>>>>>>>>>>>>> Data Message 1))
    - element of array (index: 0)
    - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 2)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:456)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
19/09/16 20:57:58 ERROR TaskSetManager: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
    - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = inputTopic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1568634372952, serialized key size = 1, serialized value size = 38, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1, value =   >>>>>>>>>>>>>>>>>>>>> Data Message 1))
    - element of array (index: 0)
    - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 2); not retrying
19/09/16 20:57:58 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool

我无法理解这些异常的原因。我在 Kafka 生产者上确认了 <String,String> 类型的消息。但为什么消费者消息不可序列化?

如果我们将您的代码与文档中给出的示例进行比较,您没有从不可序列化的 ConsumerRecord 中提取任何数据,因此无法收集和打印其数据

JavaPairRDD<String, String> outStream = 
    inputStream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
outStream.print();