从 kafka Spark 流接收时获取空值

Getting empty values while receiving from kafka Spark streaming

我是 Spark streaming 的新手,我正在实施一些小练习,例如从 XML 发送数据=]kafka 并且需要通过 spark streaming 接收 streaming 数据。 我尝试了所有可能的方式..但每次我收到 空值。

Kafka 端没有问题,唯一的问题是从 Spark 端接收 Streaming 数据。

这是我如何实现的代码:

package com.package;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

public class SparkStringConsumer {

    public static void main(String[] args) {

        SparkConf conf = new SparkConf()
                .setAppName("kafka-sandbox")
                .setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));

        Map<String, String> kafkaParams = new HashMap<>();
        kafkaParams.put("metadata.broker.list", "localhost:9092");
        Set<String> topics = Collections.singleton("mytopic");

        JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,
        String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
        directKafkaStream.foreachRDD(rdd -> {
        System.out.println("--- New RDD with " + rdd.partitions().size()
            + " partitions and " + rdd.count() + " records");
        rdd.foreach(record -> System.out.println(record._2));
        });


        ssc.start();
        ssc.awaitTermination();
    }
}

我正在使用以下版本:

**动物园管理员 3.4.6

斯卡拉 2.11

Spark 2.0

卡夫卡 0.8.2**

您的 Spark Streaming 应用程序看起来不错。我测试了它,它正在打印 kafka 消息。您也可以尝试下面的 "Message Received" print 语句来验证 kafka 消息。

    directKafkaStream.foreachRDD(rdd -> {
    System.out.println("Message Received "+rdd.values().take(5));
    System.out.println("--- New RDD with " + rdd.partitions().size()
        + " partitions and " + rdd.count() + " records");
    rdd.foreach(record -> System.out.println(record._2));
    });

如果您使用的是 Zookeeper,那么也将其设置为 kafka 参数

kafkaParams.put("zookeeper.connect","localhost:2181");

我在您的程序中没有看到以下导入语句,因此在此处添加。

import org.apache.spark.streaming.kafka.KafkaUtils;
import kafka.serializer.StringDecoder;

另请验证您是否可以使用命令行 kafka-console-consumer 使用主题 "mytopic" 上的消息。

你可以这样:

directKafkaStream.foreachRDD(rdd ->{            
            rdd.foreachPartition(item ->{
                while (item.hasNext()) {    
                    System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>"+item.next());
}
}
});

itme.next() 包含键值对。你可以通过使用获得价值 item.next()._2