无法使用 Kafka-Streams 反序列化自定义 Serde
Unable to deserialize a custom Serde using Kafka-Streams
我正在尝试使用 Kafka-streams 创建一个大写 person
实体的简单拓扑。
case class Person(id: Int, name: String, age: Int)
我自定义的Serializer和Deserializer是这样的:
class KafkaBytesSerializer[T] extends Serializer[T] {
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = 0
override def serialize(topic: String, data: T): Array[Byte] = {
val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
val oos = new ObjectOutputStream(stream)
oos.writeObject(data)
oos.close()
stream.toByteArray
}
override def close(): Unit = 0
}
class KafkaBytesDeserializer[T] extends Deserializer[T]{
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = 0
override def deserialize(topic: String, data: Array[Byte]): T = {
val objIn = new ObjectInputStream(new ByteArrayInputStream(data))
val obj = objIn.readObject().asInstanceOf[T]
objIn.close
obj
}
override def close(): Unit = 0
}
流媒体应用的主要调用代码是这样的:
val personSerde: Serde[Person] =
Serdes.serdeFrom(new KafkaBytesSerializer[Person], new KafkaBytesDeserializer[Person])
val builder = new StreamsBuilder()
builder
.stream[String, Person](INPUT_TOPIC)(Consumed.`with`(Serdes.String(), personSerde))
.map[String, Person]((k,p) => (k, Person(p.id, p.name.toUpperCase(), p.age)))
.peek((k, p) => println("Key" + k + " Person: " + p))
.to(OUTPUT_TOPIC)(Produced.`with`(Serdes.String(), personSerde))
当我 运行 应用程序时,我收到 class 转换异常:
[MainApp-consumer-group-b45b436d-1412-494b-9733-f75a61c9b9e3-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [MainApp-consumer-group-b45b436d-1412-494b-9733-f75a61c9b9e3-StreamThread-1] Encountered the following error during processing:
java.lang.ClassCastException: [B cannot be cast to models.Person
at org.apache.kafka.streams.scala.FunctionsCompatConversions$ValueMapperFromFunction$$anon.apply(FunctionsCompatConversions.scala:66)
at org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey(AbstractStream.java:103)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:40)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
我怀疑反序列化级别出了问题,但不确定为什么?
任何指点都会有所帮助。
ProducerApp 有问题。您将 value.serializer
设置为 com.thebigscale.serdes.PersonSerializer
,然后尝试发送字节数组。你不应该序列化你的 POJO。 Kafka Serializer 会为你做这件事——刚刚发送了 Person 对象实例。
下面我用评论修复了你的代码
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "com.thebigscale.serdes.PersonSerializer")
val producer = new KafkaProducer[String, Person](props) // <-- Instead BYTE_ARRAY -> Person
val person = new Person(4, "user4", 27)
//val personSerializer = new KafkaBytesSerializer[Person]() // remove
//val bytePerson: BYTE_ARRAY = personSerializer.serialize("", person) // remove
val record = new ProducerRecord[String, Person](KafkaConf.INPUT_TOPIC, "key1", person) // instead BYTE_ARRAY -> Person, bytePerson -> person
producer.send(record, new Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
if (exception != null ) {
println("Exception thrown by producer: " + exception)
} else {
println("Record sent successfully: " + metadata)
}
}
})
我正在尝试使用 Kafka-streams 创建一个大写 person
实体的简单拓扑。
case class Person(id: Int, name: String, age: Int)
我自定义的Serializer和Deserializer是这样的:
class KafkaBytesSerializer[T] extends Serializer[T] {
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = 0
override def serialize(topic: String, data: T): Array[Byte] = {
val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
val oos = new ObjectOutputStream(stream)
oos.writeObject(data)
oos.close()
stream.toByteArray
}
override def close(): Unit = 0
}
class KafkaBytesDeserializer[T] extends Deserializer[T]{
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = 0
override def deserialize(topic: String, data: Array[Byte]): T = {
val objIn = new ObjectInputStream(new ByteArrayInputStream(data))
val obj = objIn.readObject().asInstanceOf[T]
objIn.close
obj
}
override def close(): Unit = 0
}
流媒体应用的主要调用代码是这样的:
val personSerde: Serde[Person] =
Serdes.serdeFrom(new KafkaBytesSerializer[Person], new KafkaBytesDeserializer[Person])
val builder = new StreamsBuilder()
builder
.stream[String, Person](INPUT_TOPIC)(Consumed.`with`(Serdes.String(), personSerde))
.map[String, Person]((k,p) => (k, Person(p.id, p.name.toUpperCase(), p.age)))
.peek((k, p) => println("Key" + k + " Person: " + p))
.to(OUTPUT_TOPIC)(Produced.`with`(Serdes.String(), personSerde))
当我 运行 应用程序时,我收到 class 转换异常:
[MainApp-consumer-group-b45b436d-1412-494b-9733-f75a61c9b9e3-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [MainApp-consumer-group-b45b436d-1412-494b-9733-f75a61c9b9e3-StreamThread-1] Encountered the following error during processing:
java.lang.ClassCastException: [B cannot be cast to models.Person
at org.apache.kafka.streams.scala.FunctionsCompatConversions$ValueMapperFromFunction$$anon.apply(FunctionsCompatConversions.scala:66)
at org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey(AbstractStream.java:103)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:40)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
我怀疑反序列化级别出了问题,但不确定为什么?
任何指点都会有所帮助。
ProducerApp 有问题。您将 value.serializer
设置为 com.thebigscale.serdes.PersonSerializer
,然后尝试发送字节数组。你不应该序列化你的 POJO。 Kafka Serializer 会为你做这件事——刚刚发送了 Person 对象实例。
下面我用评论修复了你的代码
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "com.thebigscale.serdes.PersonSerializer")
val producer = new KafkaProducer[String, Person](props) // <-- Instead BYTE_ARRAY -> Person
val person = new Person(4, "user4", 27)
//val personSerializer = new KafkaBytesSerializer[Person]() // remove
//val bytePerson: BYTE_ARRAY = personSerializer.serialize("", person) // remove
val record = new ProducerRecord[String, Person](KafkaConf.INPUT_TOPIC, "key1", person) // instead BYTE_ARRAY -> Person, bytePerson -> person
producer.send(record, new Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
if (exception != null ) {
println("Exception thrown by producer: " + exception)
} else {
println("Record sent successfully: " + metadata)
}
}
})