KafkaRDD scala 最小示例

KafkaRDD scala minimal example

我正在尝试使用 KafkaRDD 获取 运行 示例:

val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")
val offsetRanges = Array(
    OffsetRange("topic", 0, 0, 2)
)
val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](sc, kafkaParams, offsetRanges)
rdd.map(x => println(x)).collect()

res: Array[Unit] = Array((), ())

我很小心地创建了 "topic" 单个分区并写了 2 条消息,你好,世界。

我可以得到看起来像正确的 RDD,但我怎样才能访问它的内容?我错过了什么吗?

谢谢,E.

问题是这一行,我相信:

rdd.map(x => println(x)).collect()

RDD 的工作方式,rdd.map 在执行器上运行。当您 println 时,它会将其打印到 stdout 以供执行者使用。要在驱动程序应用程序中将其打印到 stdout,请尝试以下操作:

rdd.collect().map(x => println(x))