java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord 在卡夫卡
java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord in Kafka
我是 Apache Kafka 的初学者。以下代码示例适用于我的 Kafka 生产者和消费者。
Kafka生产者代码:
public static void main(String[] args) throws Exception{
// TODO Auto-generated method stub
String inputTopic = "inputTopic";
String broker = "localhost:9092";
Properties properties = new Properties();
properties.put("bootstrap.servers", broker);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(properties);
String message = " >>>>>>>>>>>>>>>>>>>>> Data Message";
int key = 0;
while(key < 1) {
key = key + 1;
ProducerRecord<String, String> record = new ProducerRecord<String, String>(inputTopic, String.valueOf(key), (message + " " + key));
producer.send(record).get();
}
producer.close();
接下来的代码是关于 Kafka 消费者的
Kafka消费者代码:
public static void main(String[] args) throws Exception{
// TODO Auto-generated method stub
SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("SimpleDStreamExample");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
Collection<String> topics = Arrays.asList("inputTopic");
Map<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "spark-demo");
kafkaParams.put("kafka.consumer.id", "kafka-consumer-01");
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", false);
JavaStreamingContext ssc = new JavaStreamingContext(jsc, new Duration(1000));
JavaInputDStream<ConsumerRecord<String, String>> inputStream = KafkaUtils.createDirectStream(
ssc, LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
inputStream.print();
ssc.start();
ssc.awaitTermination();
ssc.close();
}
但是Kafka消费者抛出如下异常,
19/09/16 20:57:58 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
- object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = inputTopic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1568634372952, serialized key size = 1, serialized value size = 38, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1, value = >>>>>>>>>>>>>>>>>>>>> Data Message 1))
- element of array (index: 0)
- array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 2)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:456)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
19/09/16 20:57:58 ERROR TaskSetManager: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
- object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = inputTopic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1568634372952, serialized key size = 1, serialized value size = 38, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1, value = >>>>>>>>>>>>>>>>>>>>> Data Message 1))
- element of array (index: 0)
- array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 2); not retrying
19/09/16 20:57:58 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
我无法理解这些异常的原因。我在 Kafka 生产者上确认了 <String,String>
类型的消息。但为什么消费者消息不可序列化?
如果我们将您的代码与文档中给出的示例进行比较,您没有从不可序列化的 ConsumerRecord 中提取任何数据,因此无法收集和打印其数据
JavaPairRDD<String, String> outStream =
inputStream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
outStream.print();
我是 Apache Kafka 的初学者。以下代码示例适用于我的 Kafka 生产者和消费者。
Kafka生产者代码:
public static void main(String[] args) throws Exception{
// TODO Auto-generated method stub
String inputTopic = "inputTopic";
String broker = "localhost:9092";
Properties properties = new Properties();
properties.put("bootstrap.servers", broker);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(properties);
String message = " >>>>>>>>>>>>>>>>>>>>> Data Message";
int key = 0;
while(key < 1) {
key = key + 1;
ProducerRecord<String, String> record = new ProducerRecord<String, String>(inputTopic, String.valueOf(key), (message + " " + key));
producer.send(record).get();
}
producer.close();
接下来的代码是关于 Kafka 消费者的
Kafka消费者代码:
public static void main(String[] args) throws Exception{
// TODO Auto-generated method stub
SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("SimpleDStreamExample");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
Collection<String> topics = Arrays.asList("inputTopic");
Map<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "spark-demo");
kafkaParams.put("kafka.consumer.id", "kafka-consumer-01");
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", false);
JavaStreamingContext ssc = new JavaStreamingContext(jsc, new Duration(1000));
JavaInputDStream<ConsumerRecord<String, String>> inputStream = KafkaUtils.createDirectStream(
ssc, LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
inputStream.print();
ssc.start();
ssc.awaitTermination();
ssc.close();
}
但是Kafka消费者抛出如下异常,
19/09/16 20:57:58 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
- object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = inputTopic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1568634372952, serialized key size = 1, serialized value size = 38, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1, value = >>>>>>>>>>>>>>>>>>>>> Data Message 1))
- element of array (index: 0)
- array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 2)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:456)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
19/09/16 20:57:58 ERROR TaskSetManager: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
- object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = inputTopic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1568634372952, serialized key size = 1, serialized value size = 38, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1, value = >>>>>>>>>>>>>>>>>>>>> Data Message 1))
- element of array (index: 0)
- array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 2); not retrying
19/09/16 20:57:58 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
我无法理解这些异常的原因。我在 Kafka 生产者上确认了 <String,String>
类型的消息。但为什么消费者消息不可序列化?
如果我们将您的代码与文档中给出的示例进行比较,您没有从不可序列化的 ConsumerRecord 中提取任何数据,因此无法收集和打印其数据
JavaPairRDD<String, String> outStream =
inputStream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
outStream.print();