用于自定义 JSON 的 Serde class(Kotlin 和 Kafka)

Serde class for custom JSON (Kotlin & Kafka)

我正在 Kotlin 中编写一个 kafka 流应用程序,它使用 JSON 消息(没有 AVRO 或架构注册表)。

在 MyMessage.kt 中,我已将 MyMessage class 声明为 @Serializable

MyMessage.kt

import kotlinx.serialization.Serializable

@Serializable
data class MyMessage(val from: String, val to: String, val msg: String)

Streaming.kt

val s: KString<String, MyMessage> = streamsBuilder()
    .stream("my", Consumed.with(Serdes.String, Serdes.serdeFrom(MyMessage::class.java)

当 运行 时,我在上面的行中收到以下错误:

Exception in thread "main" java.lang.IllegalArgumentException: Unknown class for built-in serializer. Supported types are: String, Short, Integer, Long, Float, Double, ByteArray, ByteBuffer, Bytes, UUID

我错过了什么?

Serdes.serdesFrom() 的参数需要一个 SerializerDeserializer 对象(这两个接口都是包 org.apache.kafka.common.serialization 中的 Kafka 接口,与 @Serializable注解.

您需要创建 类 MyMessageSerializer extends SerializerMyMessageDeserialzer extends Deserializer 并将这些对象传递到方法中。 要使用 类 实现实际的序列化/反序列化,如果需要,您可以依赖默认的序列化/反序列化。