从spark中的kafka消息中获取主题

get topic from kafka message in spark

在我们的 spark-streaming 作业中,我们从 kafka 读取流中的消息。

为此,我们使用 KafkaUtils.createDirectStream API 其中 returns JavaPairInputDStreamfrom.

通过以下方式从 kafka(来自三个主题 - test1、test2、test3)读取消息:

private static final String TOPICS = "test1,test2,test3";
HashSet<String> topicsSet = new HashSet<>(Arrays.asList(TOPICS.split(",")));

HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", BROKERS);

JavaPairInputDStream<String, String> messages = 
KafkaUtils.createDirectStream(
                streamingContext,
                String.class,
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                kafkaParams,
                topicsSet
                );

我们想以不同的方式处理来自每个主题的消息,为了实现这一点,我们需要知道每个消息的主题名称。

所以我们执行以下操作:

JavaDStream<String> lines = messages.map(new SplitToLinesFunction());

这是SplitToLinesFunction的实现:

public class SplitToLinesFunction implements Function<Tuple2<String, String>, String> {
    @Override
    public String call(Tuple2<String, String> tuple2) 
    {
        System.out.println(tuple2._1);
        return tuple2._2();
    }
}

问题是 tuple2._1 为空,我们假设 tuple2._1 将包含一些元数据,例如消息来源的 topic/partition 的名称。

然而,当我们打印 tuple2._1 时,它是空的。

我们的问题 - 有没有办法在 kafka 中发送主题名称,以便在 spark-streaming 代码中,tuple2._1 将包含它(而不是 null)?

请注意,我们还尝试从 spark-streaming kafka-integration tutorial 中提到的 DStream 中获取主题名称:

但它 returns 发送到 KafkaUtils.createDirectStream 的所有主题,而不是消息(属于当前 RDD)到达的特定主题。

所以它没有帮助我们识别主题的名称,从那里发送 RDD 中的消息。

编辑

作为对 David 回答的回应——我试过这样使用 MessageAndMetadata

        Map<TopicAndPartition, Long> topicAndPartition = new HashMap();
        topicAndPartition.put(new TopicAndPartition("test1", 0), 1L);
        topicAndPartition.put(new TopicAndPartition("test2", 0), 1L);
        topicAndPartition.put(new TopicAndPartition("test3", 0), 1L);

        class MessageAndMetadataFunction implements Function<MessageAndMetadata<String, String>, String>
        {

            @Override
            public String call(MessageAndMetadata<String, String> v1)
                    throws Exception {
                // nothing is printed here
                System.out.println("topic = " + v1.topic() + ", partition = " + v1.partition());
                return v1.topic();
            }

        }

        JavaInputDStream<String> messages = KafkaUtils.createDirectStream(streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, topicAndPartition, new MessageAndMetadataFunction());
        messages.foreachRDD(new VoidFunction() {

            @Override
            public void call(Object t) throws Exception {
                JavaRDD<String> rdd = (JavaRDD<String>)t;
                OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
                // here all the topics kafka listens to are printed, but that doesn't help
                for (OffsetRange offset : offsets) {
                    System.out.println(offset.topic() + " " + offset.partition() + " " + offset.fromOffset() + " " + offset.untilOffset());
                }
            }
        });

问题是 MessageAndMetadataFunction.call 方法中没有打印任何内容。我应该修复什么才能在 MessageAndMetadataFunction.call 方法中获取该 RDD 的相关主题?

使用 createDirectStream 的一个版本,该版本将 messageHandler 函数作为参数。这是我的做法:

val messages = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder, (String, Array[Byte]](
  ssc,
  kafkaParams,
  getPartitionsAndOffsets(topics).map(t => (t._1, t._2._1).toMap,
  (msg: MessageAndMetadata[Array[Byte],Array[Byte]]) => { (msg.topic, msg.message)}
)

那里有些东西对你来说没有任何意义 -- 相关部分是

(msg: MessageAndMetadata[Array[Byte],Array[Byte]]) => { (msg.topic, msg.message)}

如果您不熟悉 Scala,该函数所做的只是 return 包含 msg.topicmsg.messageTuple2。您的函数需要 return 这两个,以便您在下游使用它们。您可以只 return 整个 MessageAndMetadata 对象,这会为您提供一些其他有趣的字段。但是如果你只想要 topicmessage,那么使用上面的

Kafka integration guide 的底部,有一个从消息中提取主题的示例。

Java中的相关代码:

 // Hold a reference to the current offset ranges, so it can be used downstream
 final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();

 directKafkaStream.transformToPair(
   new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
     @Override
     public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {
       OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
       offsetRanges.set(offsets);
       return rdd;
     }
   }
 ).map(
   ...
 ).foreachRDD(
   new Function<JavaPairRDD<String, String>, Void>() {
     @Override
     public Void call(JavaPairRDD<String, String> rdd) throws IOException {
       for (OffsetRange o : offsetRanges.get()) {
         System.out.println(
           o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
         );
       }
       ...
       return null;
     }
   }
 );

这可能会被折叠成更紧凑的内容,只要求主题而不是其他内容。