如何从 apache nifi 生成 kafka 主题中的 Avro 消息,然后使用 kafka 流读取它?

How to produce Avro message in kafka topic from apache nifi and then read it using kafka streams?

我想使用 apache nifi 将一些通用数据生成到 kafka 主题中,我希望这些数据采用 avro 格式。 我为此做了什么:

  1. 在模式注册表中创建新模式:

{ "type": "record", "name": "my_schema", "namespace": "my_namespace", "doc": "", "fields": [ { "name": "key", "type": "int" }, { "name": "value", "type": [ "null", "int" ] }, { "name": "event_time", "type": "long" } ] }

  1. 创建简单的nifi管道: ConvertAvroSchema 设置: PublishKafkaRecord 设置: AvroReader 设置: AvroRecordSetWriter 设置:
  2. 然后我尝试使用kafka流读取它:

    publicclass测试{ private final static Logger 记录器 = Logger.getLogger(KafkaFilterUsingCacheAvro.class);

    public static void main(String[] args) {
        Properties properties = new Properties();
    
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "app");
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
        properties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "registry:8081");
    
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, GenericRecord> source = builder.stream("topic");
        source.foreach((k, v) -> logger.info(String.format("[%s]: %s", k, v.toString())));
    
        Topology topology = builder.build();
        KafkaStreams streams = new KafkaStreams(topology, properties);
        streams.start();
    }
    

    }

GenericAvroSerde - https://github.com/JohnReedLOL/kafka-streams/blob/master/src/main/java/io/confluent/examples/streams/utils/GenericAvroSerde.java

结果我得到错误:

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

我还尝试在 avroreader\writer 中显式设置 avro 架构,但没有帮助。此外,如果我尝试简单地从主题中读取字节并将其转换为字符串表示形式,我会得到如下信息:

Objavro.schema{"type":"record","name":"my_schema","namespace":"my_namespace","doc":"","fields":[{"name":"key","type":"int"},{"name":"value","type":["null","int"]},{"name":"event_time","type":"long"}]}avro.codecsnappyÛ4ým[©q ÃàG0 ê¸ä»/}½{Û4ým[©q ÃàG0

我该如何解决?

在 PublishKafka 处理器中,您的 Avro writer 配置了 "Schema Write Strategy" of "Embedded Avro Schema"。这意味着写入 Kafka 的消息是嵌入了完整模式的标准 Avro 消息。

在您的消费者端(Kafka 流),它看起来期望使用汇合模式注册表,在这种情况下,它不期望嵌入式 Avro 模式,它期望指定模式 ID 的特殊字节序列,然后是纯 Avro 消息。

假设您希望保持消费者不变,那么在 NiFi 方面,您将希望将 Avro writer 的 "Schema Write Strategy" 更改为 "Confluent Schema Registry Reference"。我认为这可能还需要您更改 Avro reader 以使用 Confluent Schema Registry 服务访问架构。

或者,也许有一种方法可以让 Kafka Streams 读取嵌入式模式而不使用 Confluent 模式注册表,但我之前没有使用过 Kafka Streams,所以我不能说这是否可行。