用于自定义 JSON 的 Serde class(Kotlin 和 Kafka)
Serde class for custom JSON (Kotlin & Kafka)
我正在 Kotlin 中编写一个 kafka 流应用程序,它使用 JSON 消息(没有 AVRO 或架构注册表)。
在 MyMessage.kt 中,我已将 MyMessage
class 声明为 @Serializable
。
MyMessage.kt
import kotlinx.serialization.Serializable
@Serializable
data class MyMessage(val from: String, val to: String, val msg: String)
Streaming.kt
val s: KString<String, MyMessage> = streamsBuilder()
.stream("my", Consumed.with(Serdes.String, Serdes.serdeFrom(MyMessage::class.java)
当 运行 时,我在上面的行中收到以下错误:
Exception in thread "main" java.lang.IllegalArgumentException: Unknown class for built-in serializer. Supported types are: String, Short, Integer, Long, Float, Double, ByteArray, ByteBuffer, Bytes, UUID
我错过了什么?
Serdes.serdesFrom()
的参数需要一个 Serializer
和 Deserializer
对象(这两个接口都是包 org.apache.kafka.common.serialization
中的 Kafka 接口,与 @Serializable
注解.
您需要创建 类 MyMessageSerializer extends Serializer
和 MyMessageDeserialzer extends Deserializer
并将这些对象传递到方法中。
要使用 类 实现实际的序列化/反序列化,如果需要,您可以依赖默认的序列化/反序列化。
我正在 Kotlin 中编写一个 kafka 流应用程序,它使用 JSON 消息(没有 AVRO 或架构注册表)。
在 MyMessage.kt 中,我已将 MyMessage
class 声明为 @Serializable
。
MyMessage.kt
import kotlinx.serialization.Serializable
@Serializable
data class MyMessage(val from: String, val to: String, val msg: String)
Streaming.kt
val s: KString<String, MyMessage> = streamsBuilder()
.stream("my", Consumed.with(Serdes.String, Serdes.serdeFrom(MyMessage::class.java)
当 运行 时,我在上面的行中收到以下错误:
Exception in thread "main" java.lang.IllegalArgumentException: Unknown class for built-in serializer. Supported types are: String, Short, Integer, Long, Float, Double, ByteArray, ByteBuffer, Bytes, UUID
我错过了什么?
Serdes.serdesFrom()
的参数需要一个 Serializer
和 Deserializer
对象(这两个接口都是包 org.apache.kafka.common.serialization
中的 Kafka 接口,与 @Serializable
注解.
您需要创建 类 MyMessageSerializer extends Serializer
和 MyMessageDeserialzer extends Deserializer
并将这些对象传递到方法中。
要使用 类 实现实际的序列化/反序列化,如果需要,您可以依赖默认的序列化/反序列化。