如何确定 Kafka 消息的主题

How to determine topic for a Kafka message

我有如下代码从 Kafka 读取并保存到 Elasticsearch,我正在使用 spark streaming:

JavaDStream<String> liness = messages.map(new Function<ConsumerRecord<String, String>, String>() {
        @Override
        public String call(ConsumerRecord<String, String> kafkaRecord) throws Exception {
            System.out.println(kafkaRecord.topic() + kafkaRecord.value());
            return kafkaRecord.value();
        }
    });

        JavaEsSparkStreaming.saveJsonToEs(lines, "events/redict");
        jssc.start();

我的问题是我需要根据主题名称使 "events/redict" 动态化,我也可以在 messages.map 调用中获取主题名称,但是如何添加动态性在吗?

您可以在此处添加 "dynamism",方法是为每个主题创建一个流,或者将您的 messages 流过滤为 eventsredict RDD。

分别将这两个保存到正确的 ES 索引中。