如何使用 FlinkKafkaConsumer 单独解析 key <K,V> 而不是 <T>

How to use FlinkKafkaConsumer to parse key separately <K,V> instead of <T>

据我所知,通过 Flink 的 AVRO 反序列化,您可以创建一个 Avro 对象流,这很好,但似乎存在一个问题,即 Flink 的 kafka 消费者仅创建单个对象流: FlinkKafkaConsumerBase<T> 而不是默认的 Kafka API 及其 KafkaConsumer。

在我的例子中,键和值都是独立的符合 AVRO 模式的对象,合并它们的模式可能是一场噩梦...

此外,似乎使用 Flink API 我无法检索 ConsumerRecord 信息?...

基于Flink Kafka Consumer,有一个构造函数:

public FlinkKafkaConsumer(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) {
    this(Collections.singletonList(topic), deserializer, props);
}

第二个参数-KeyedDeserializationSchema用于反序列化Kafka记录。它包括消息键、消息值、偏移量、主题等。因此,您可以将自己的名为 MyKafkaRecord 的类型实现为带有 Avro 键和 Avro 值的 T。然后将 MyKafkaRecord 作为 T 传递给 KeyedDeserializationSchema 的实现。以TypeInformationKeyValueSerializationSchema为例。

例如从 Kafka 读取额外信息:

class KafkaRecord<K, V> {
  private K key;
  private V value;
  private long offset;
  private int partition;
  private String topic;

  ...
}

class MySchema<K, V> implements KeyedDeserializationSchema<KafkaRecord<K, V>> {
  KafkaRecord<K, V> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) {
    KafkaRecord<K, V> rec = new KafkaRecord<>();
    rec.key = KEY_DESERIaLISER.deserialize(messageKey);
    rec.value = ...;
    rec.topic = topic;
    ...
  }
}