Kafka Streams - GenericAvroSerde 上的未知魔法字节
Kafka Streams - Unknown magic byte on GenericAvroSerde
尝试使用 Kafka Streams 传输 Avro 数据时,我遇到了这个错误:
Exception in thread "StreamThread-1" org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
尽管我在邮件列表中发现了几个关于它的旧线程,none 中所述的解决方案解决了这个问题。希望我能在这里找到解决方案。
我的设置如下所示:
StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName
StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[GenericAvroSerde]
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, localhost:8081)
我已经尝试将 KEY_SERDE
设置为与 VALUE_SERDE
相同的设置,但即使这是 "marked" 作为邮件列表中的解决方案,它在我的案例.
我正在使用我的架构生成 GenericData.Record
,如下所示:
val record = new GenericData.Record(schema)
...
record.put(field, value)
当我启动调试模式并检查生成的记录时,一切正常,记录中有数据并且映射正确。
我像这样流式传输 KStream(我之前使用过分支):
splitTopics.get(0).to(s"${destTopic}_Testing")
我正在使用 GenericData.Record
作为记录。这可能是与 GenericAvroSerde
?
结合使用时出现的问题
我的问题的解决方案是在反序列化我从输入主题中获得的字符串值后交换 VALUE_SERDE
。
由于 Serde
是序列化和反序列化的组合 "element",我不能简单地使用 StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[GenericAvroSerde]
,而是必须使用 StringSerde
来反序列化输入记录,并且只然后使用 AvroSerde
将其写出到输出主题。
现在看起来像这样:
// default streams configuration serdes are different from the actual output configurations
val streamsConfiguration: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, kStreamsConf.getString("APPLICATION_ID"))
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kStreamsConf.getString("BOOTSTRAP_SERVERS_CONFIG"))
p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kStreamsConf.getString("AUTO_OFFSET_RESET_CONFIG"))
p.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
p.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
p.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kStreamsConf.getString("SCHEMA_REGISTRY_URL_CONFIG"))
p
}
// adjusted output serdes for avro records
val keySerde: Serde[String] = Serdes.String
val valSerde: Serde[GenericData.Record] = new GenericAvroSerde()
valSerde.configure(
Collections.singletonMap(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
streamsConfiguration.get(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG)
),
/* isKeySerde = */ false
)
// Now using the adjusted serdes to write to output like this
stream.to(keySerde, valSerde, "destTopic")
这样,它就像魅力一样。
谢谢
尝试使用 Kafka Streams 传输 Avro 数据时,我遇到了这个错误:
Exception in thread "StreamThread-1" org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
尽管我在邮件列表中发现了几个关于它的旧线程,none 中所述的解决方案解决了这个问题。希望我能在这里找到解决方案。
我的设置如下所示:
StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName
StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[GenericAvroSerde]
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, localhost:8081)
我已经尝试将 KEY_SERDE
设置为与 VALUE_SERDE
相同的设置,但即使这是 "marked" 作为邮件列表中的解决方案,它在我的案例.
我正在使用我的架构生成 GenericData.Record
,如下所示:
val record = new GenericData.Record(schema)
...
record.put(field, value)
当我启动调试模式并检查生成的记录时,一切正常,记录中有数据并且映射正确。
我像这样流式传输 KStream(我之前使用过分支):
splitTopics.get(0).to(s"${destTopic}_Testing")
我正在使用 GenericData.Record
作为记录。这可能是与 GenericAvroSerde
?
我的问题的解决方案是在反序列化我从输入主题中获得的字符串值后交换 VALUE_SERDE
。
由于 Serde
是序列化和反序列化的组合 "element",我不能简单地使用 StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[GenericAvroSerde]
,而是必须使用 StringSerde
来反序列化输入记录,并且只然后使用 AvroSerde
将其写出到输出主题。
现在看起来像这样:
// default streams configuration serdes are different from the actual output configurations
val streamsConfiguration: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, kStreamsConf.getString("APPLICATION_ID"))
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kStreamsConf.getString("BOOTSTRAP_SERVERS_CONFIG"))
p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kStreamsConf.getString("AUTO_OFFSET_RESET_CONFIG"))
p.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
p.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
p.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kStreamsConf.getString("SCHEMA_REGISTRY_URL_CONFIG"))
p
}
// adjusted output serdes for avro records
val keySerde: Serde[String] = Serdes.String
val valSerde: Serde[GenericData.Record] = new GenericAvroSerde()
valSerde.configure(
Collections.singletonMap(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
streamsConfiguration.get(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG)
),
/* isKeySerde = */ false
)
// Now using the adjusted serdes to write to output like this
stream.to(keySerde, valSerde, "destTopic")
这样,它就像魅力一样。
谢谢