我们如何从 Struct 数据中反序列化 value/key?
How do we deserialize value/key from Struct data?
我们使用 io.confluent.connect.sftp.SftpCsvSourceConnector
从 sftp 位置读取 csv 文件并将消息推送到 kafka.... 但是 kafka 主题中的消息不是 csv (字符串,逗号分隔),但是特定键值格式的comme Struct : ex Struct{branchBaseCurrCode=EUR, Country=CA} ...
我使用 Kafka-Streams....我该如何反序列化它?我应该使用什么配置?它反序列化到什么 java 对象?知道我的属性与模式完全相同,我可以将其直接反序列化到我的 POJO 吗?
SftpCsvSourceConnector
可以直接写在 json
而不是 Struct
吗?
配置您的连接器以使用适当的 Converter
,例如 Avro、Protobuf 等。如果需要,您甚至可以使用普通的 JSON。
例如:
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
见https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained
我们使用 io.confluent.connect.sftp.SftpCsvSourceConnector
从 sftp 位置读取 csv 文件并将消息推送到 kafka.... 但是 kafka 主题中的消息不是 csv (字符串,逗号分隔),但是特定键值格式的comme Struct : ex Struct{branchBaseCurrCode=EUR, Country=CA} ...
我使用 Kafka-Streams....我该如何反序列化它?我应该使用什么配置?它反序列化到什么 java 对象?知道我的属性与模式完全相同,我可以将其直接反序列化到我的 POJO 吗?
SftpCsvSourceConnector
可以直接写在 json
而不是 Struct
吗?
配置您的连接器以使用适当的 Converter
,例如 Avro、Protobuf 等。如果需要,您甚至可以使用普通的 JSON。
例如:
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
见https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained