如何确定 Kafka 消息的主题
How to determine topic for a Kafka message
我有如下代码从 Kafka 读取并保存到 Elasticsearch,我正在使用 spark streaming:
JavaDStream<String> liness = messages.map(new Function<ConsumerRecord<String, String>, String>() {
@Override
public String call(ConsumerRecord<String, String> kafkaRecord) throws Exception {
System.out.println(kafkaRecord.topic() + kafkaRecord.value());
return kafkaRecord.value();
}
});
JavaEsSparkStreaming.saveJsonToEs(lines, "events/redict");
jssc.start();
我的问题是我需要根据主题名称使 "events/redict"
动态化,我也可以在 messages.map
调用中获取主题名称,但是如何添加动态性在吗?
您可以在此处添加 "dynamism",方法是为每个主题创建一个流,或者将您的 messages
流过滤为 events
和 redict
RDD。
分别将这两个保存到正确的 ES 索引中。
我有如下代码从 Kafka 读取并保存到 Elasticsearch,我正在使用 spark streaming:
JavaDStream<String> liness = messages.map(new Function<ConsumerRecord<String, String>, String>() {
@Override
public String call(ConsumerRecord<String, String> kafkaRecord) throws Exception {
System.out.println(kafkaRecord.topic() + kafkaRecord.value());
return kafkaRecord.value();
}
});
JavaEsSparkStreaming.saveJsonToEs(lines, "events/redict");
jssc.start();
我的问题是我需要根据主题名称使 "events/redict"
动态化,我也可以在 messages.map
调用中获取主题名称,但是如何添加动态性在吗?
您可以在此处添加 "dynamism",方法是为每个主题创建一个流,或者将您的 messages
流过滤为 events
和 redict
RDD。
分别将这两个保存到正确的 ES 索引中。