我是否需要在模式注册表下为 kafka 流变更日志主题注册模式?
Do i need to register schema for kafka stream changelog topic under schema registry?
我正在使用处理器 API 和 Kafka StreamDSL 实现 kafka 流项目。
我在处理器中的处理函数是
@Override
public void process(final String key, final T event) {
keyValueStore.put(key, event);
}
我的拓扑是
protected Topology buildTopology() {
final StreamsBuilder builder = new StreamsBuilder();
KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(stateStoreName);
StoreBuilder<KeyValueStore<String, T>> storeBuilder =
Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), Serdes.serdeFrom(
new EventSerializer(streamProperties()),
new EventDeserializer(streamProperties())));
builder.addStateStore(storeBuilder);
final KStream<String, T> stream = builder.stream(inputTopic);
stream.process(() -> new Processor<>(stateStoreName), stateStoreName);
stream.to(outputTopic);
return builder.build();
}
最后这是我的自定义 EventSerializer class:
public class EventSerializer<T extends SpecificRecordBase & SpecificRecord>
implements Serializer<T> {
private final KafkaAvroSerializer inner;
public EventSerializer(Map<String, ?> properties) {
inner = new KafkaAvroSerializer();
configure(properties, false);
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
inner.configure(EventSerdeConfig.withProducerConfig(configs), isKey);
}
@Override
public byte[] serialize(final String topic, final T record) {
return inner.serialize(topic, record);
}
}
当处理器将事件放入 keyValueStore 时,我遇到了错误 io.confluent.rest.exceptions.RestNotFoundException: Subject not found。 经过一段时间的调试,我意识到这是因为序列化程序序列化事件时遇到问题。函数 public byte[] serialize(final String topic, final T record)
中的主题是 application id-store-changelog
。这是卡夫卡的内在行为,尽管我不知道为什么。 Serialzer 找不到此组合主题的架构,因此引发错误。
我是否需要为这个组合主题注册模式,或者有什么方法可以将真正的消费者主题传递给已经注册了模式的序列化器吗?
当你有 new KafkaAvroSerializer();
时,它默认指向架构注册表的 localhost:8081
。
您不需要注册(尽管您可以),因为生产者使用 inner.serialize
作为序列化逻辑的一部分进行注册
注意:扩展 KafkaAvroSerializer
可能更有意义
我正在使用处理器 API 和 Kafka StreamDSL 实现 kafka 流项目。 我在处理器中的处理函数是
@Override
public void process(final String key, final T event) {
keyValueStore.put(key, event);
}
我的拓扑是
protected Topology buildTopology() {
final StreamsBuilder builder = new StreamsBuilder();
KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(stateStoreName);
StoreBuilder<KeyValueStore<String, T>> storeBuilder =
Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), Serdes.serdeFrom(
new EventSerializer(streamProperties()),
new EventDeserializer(streamProperties())));
builder.addStateStore(storeBuilder);
final KStream<String, T> stream = builder.stream(inputTopic);
stream.process(() -> new Processor<>(stateStoreName), stateStoreName);
stream.to(outputTopic);
return builder.build();
}
最后这是我的自定义 EventSerializer class:
public class EventSerializer<T extends SpecificRecordBase & SpecificRecord>
implements Serializer<T> {
private final KafkaAvroSerializer inner;
public EventSerializer(Map<String, ?> properties) {
inner = new KafkaAvroSerializer();
configure(properties, false);
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
inner.configure(EventSerdeConfig.withProducerConfig(configs), isKey);
}
@Override
public byte[] serialize(final String topic, final T record) {
return inner.serialize(topic, record);
}
}
当处理器将事件放入 keyValueStore 时,我遇到了错误 io.confluent.rest.exceptions.RestNotFoundException: Subject not found。 经过一段时间的调试,我意识到这是因为序列化程序序列化事件时遇到问题。函数 public byte[] serialize(final String topic, final T record)
中的主题是 application id-store-changelog
。这是卡夫卡的内在行为,尽管我不知道为什么。 Serialzer 找不到此组合主题的架构,因此引发错误。
我是否需要为这个组合主题注册模式,或者有什么方法可以将真正的消费者主题传递给已经注册了模式的序列化器吗?
当你有 new KafkaAvroSerializer();
时,它默认指向架构注册表的 localhost:8081
。
您不需要注册(尽管您可以),因为生产者使用 inner.serialize
注意:扩展 KafkaAvroSerializer
可能更有意义