Kafka Stream Ksql Json

Kafka Stream Ksql Json

kafka stream / Ksql 实际上以某种方式原生支持 json 吗?支持哪些其他格式?我看到是否可以将 json 解释为 table。我想更好地理解那部分; kafka-streams 通过 Ksql 可以通过 SQL 查询的其他格式是什么?这怎么可能或得到支持?原生支持是什么?

KSQL

对于值格式,KSQL 支持 AVRO、JSON 和 DELIMITED(如 CSV)。

您可以在此处找到文档:

卡夫卡流

Kafka Streams 在 org.apache.kafka.common.serialization 包下附带了一些 primitive/basic SerDes(序列化器/反序列化器)。

您可以在此处找到文档:

Confluent 还为通用 Avro 和特定 Avro 格式的数据提供 schema-registry 兼容的 Avro SerDes。您可以在此处找到文档:

您还可以使用 基本 SerDe 实现来实现示例附带的 JSON

作为最后的手段,您始终可以创建您自己的自定义 SerDes。为此,您必须:

  1. 通过实现为您的数据类型 T 编写序列化程序 org.apache.kafka.common.serialization.Serializer.
  2. 通过实现为 T 编写反序列化器 org.apache.kafka.common.serialization.Deserializer.
  3. 通过实现为 T 编写一个 serde org.apache.kafka.common.serialization.Serde,你要么做 手动(请参阅上一节中的现有 SerDes)或通过 利用 Serdes 中的辅助函数,例如 Serdes.serdeFrom(Serializer<T>, Deserializer<T>)。请注意,您将 如果你需要实现你自己的 class (没有通用类型) 想要在提供给的配置中使用您的自定义 serde 卡夫卡流。如果你的 serde class 有通用类型或者你使用 Serdes.serdeFrom(Serializer<T>, Deserializer<T>),你可以通过你的 serde 仅通过方法调用(例如 builder.stream("topicName", Consumed.with(...))).