Flink Kafka Consumer如何动态获取处理的kafka主题名?

How to get the processing kafka topic name dynamically in Flink Kafka Consumer?

目前,我有一个Flink Cluster,它想通过一种模式来消费Kafka Topic,通过这种方式,我们不需要维护一个硬代码Kafka主题列表。

import java.util.regex.Pattern;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
...
private static final Pattern topicPattern = Pattern.compile("(DC_TEST_([A-Z0-9_]+)");
...
FlinkKafkaConsumer010<KafkaMessage> kafkaConsumer = new FlinkKafkaConsumer010<>(
          topicPattern, deserializerClazz.newInstance(), kafkaConsumerProps);
DataStream<KafkaMessage> input = env.addSource(kafkaConsumer);

我只是想知道通过上面的方式,在处理过程中如何才能知道真正的Kafka topic name? 谢谢

--更新-- 之所以需要知道主题信息,是因为我们需要这个主题名称作为参数,用于接下来的Flink sink部分。

有两种方法可以做到这一点。

选项 1:

您可以使用 Kafka-clients 库访问 Kafka 元数据,获取主题列表。添加 Maven 依赖项或等效项。

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.3.0</version>
</dependency>

您可以从 Kafka 集群中获取主题并使用正则表达式进行过滤,如下所示

 private static final Pattern topicPattern = Pattern.compile("(DC_TEST_([A-Z0-9_]+)");

  Properties properties = new Properties();
  properties.put("bootstrap.servers","localhost:9092");
  properties.put("client.id","java-admin-client");
  try (AdminClient client = AdminClient.create(properties)) {
     ListTopicsOptions options = new ListTopicsOptions();
     options.listInternal(false);
      Collection<TopicListing> listing =  client.listTopics(options).listings().get();
      List<String> allTopicsList = listings.stream().map(TopicListing::name)
      .collect(Collectors.toList());
      List<String> matchedTopics = allTopicsList.stream()
                            .filter(topicPattern.asPredicate())
                            .collect(Collectors.toList());
    }catch (Exception e) {
      e.printStackTrace();
    }
}

一旦你有 matchedTopics 列表,你可以将它传递给 FlinkKafkaConsumer。

选项 2:

Flink 1.8 版本中的

FlinkKafkaConsumer011 支持基于模式动态发现 Topic 和分区。以下是示例:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 private static final Pattern topicPattern = Pattern.compile("(DC_TEST_([A-Z0-9_]+)");
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(
    topicPattern ,
    new SimpleStringSchema(),
    properties);

Link : https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/connectors/kafka.html#kafka-consumers-topic-and-partition-discovery

在您的情况下,选项 2 最适合。

由于您希望访问主题元数据作为 KafkaMessage 的一部分,因此您需要实现 KafkaDeserializationSchema 接口,如下所示:

public class CustomKafkaDeserializationSchema extends KafkaDeserializationSchema<KafkaMessage> {
    /**
     * Deserializes the byte message.
     *
     * @param messageKey the key as a byte array (null if no key has been set).
     * @param message The message, as a byte array (null if the message was empty or deleted).
     * @param partition The partition the message has originated from.
     * @param offset the offset of the message in the original source (for example the Kafka offset).
     *
     * @return The deserialized message as an object (null if the message cannot be deserialized).
     */
    @Override
    public KafkaMessage deserialize(ConsumerRecord<byte[], byte[]> record) throws IOException {
        //You can access record.key(), record.value(), record.topic(), record.partition(), record.offset() to get topic information.
         KafkaMessage kafkaMessage = new KafkaMessage();
         kafkaMessage.setTopic(record.topic());
         // Make your kafka message here and assign the values like above.
        return kafkaMessage ;
    }

    @Override
    public boolean isEndOfStream(Long nextElement) {
        return false;
    }       
}

然后调用:

FlinkKafkaConsumer010<Tuple2<String, String>> kafkaConsumer = new FlinkKafkaConsumer010<>(
          topicPattern, new CustomKafkaDeserializationSchema, kafkaConsumerProps);

您可以实现自己的自定义 KafkaDeserializationSchema,如下所示:

  public class CustomKafkaDeserializationSchema implements KafkaDeserializationSchema<Tuple2<String, String>> {
    @Override
    public boolean isEndOfStream(Tuple2<String, String> nextElement) {
        return false;
    }

    @Override
    public Tuple2<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        return new Tuple2<>(record.topic(), new String(record.value(), "UTF-8"));
    }

    @Override
    public TypeInformation<Tuple2<String, String>> getProducedType() {
        return new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
    }
  }

使用自定义 KafkaDeserializationSchema,您可以创建其元素包含主题信息的 DataStream。在我的演示案例中,元素类型是 Tuple2<String, String>,因此您可以通过 Tuple2#f0.

访问主题名称
FlinkKafkaConsumer010<Tuple2<String, String>> kafkaConsumer = new FlinkKafkaConsumer010<>(
          topicPattern, new CustomKafkaDeserializationSchema, kafkaConsumerProps);
DataStream<Tuple2<String, String>> input = env.addSource(kafkaConsumer);

input.process(new ProcessFunction<Tuple2<String,String>, String>() {
            @Override
            public void processElement(Tuple2<String, String> value, Context ctx, Collector<String> out) throws Exception {
                String topicName = value.f0;
                // your processing logic here.
                out.collect(value.f1);
            }
        });