使用 Spark Streaming 后无输出
No output after using the Spark Streaming
HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
String topics = "test4";
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(" ")));
JavaDStream<String> stream1 = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class,
StringDecoder.class, kafkaParams, topicsSet)
.transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
@Override
public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) {
rdd.saveAsTextFile("output");
return rdd;
}
}).map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> kv) {
return kv._2();
}
});
stream1.print();
jssc.start();
jssc.awaitTermination();
交叉检查主题"test4"中是否存在有效数据。
我期待从 kafka 集群流式传输的字符串在控制台的 console.No 异常中打印,但也没有输出。
我在这里遗漏了什么吗?
正如ccheneson所说,这可能是因为你缺少.start()
和.awaitTermination()
也可能是因为transformations in Spark are lazy,这意味着您需要添加一个操作来获取结果。例如
stream1.print();
或者可能是因为 map
正在执行程序上执行,所以输出将在执行程序的日志中,而不是驱动程序的日志中。
在启动流应用程序后,您是否尝试过在您的主题中生成数据?
默认direct stream使用配置auto.offset.reset = largest,意思是当没有初始offset时自动重置为最大offset,所以基本上你只能读取进入的新消息在流应用程序启动后的主题中。
HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
String topics = "test4";
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(" ")));
JavaDStream<String> stream1 = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class,
StringDecoder.class, kafkaParams, topicsSet)
.transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
@Override
public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) {
rdd.saveAsTextFile("output");
return rdd;
}
}).map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> kv) {
return kv._2();
}
});
stream1.print();
jssc.start();
jssc.awaitTermination();
交叉检查主题"test4"中是否存在有效数据。
我期待从 kafka 集群流式传输的字符串在控制台的 console.No 异常中打印,但也没有输出。 我在这里遗漏了什么吗?
正如ccheneson所说,这可能是因为你缺少.start()
和.awaitTermination()
也可能是因为transformations in Spark are lazy,这意味着您需要添加一个操作来获取结果。例如
stream1.print();
或者可能是因为 map
正在执行程序上执行,所以输出将在执行程序的日志中,而不是驱动程序的日志中。
在启动流应用程序后,您是否尝试过在您的主题中生成数据?
默认direct stream使用配置auto.offset.reset = largest,意思是当没有初始offset时自动重置为最大offset,所以基本上你只能读取进入的新消息在流应用程序启动后的主题中。