kafka 将流式传输到 HDFS

kafka to sparkstreaming to HDFS

我正在使用 creatDirectStream 来集成 SparkStreaming 和 Kafka。这是我使用的代码:

val ssc = new StreamingContext(new SparkConf, Seconds(10))
    val kafkaParams = Map("metadata.broker.list" -> "sandbox:6667")
    val topics = Set("topic1")

    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topics)

现在我想将消息存储到HDFS 中。这样做对吗?

messages.saveAsTextFiles("/tmp/spark/messages")

saveAsTextFiles("/tmp/spark/messages") - 这会将您的数据保存在本地文件系统中,如果提供的文件夹结构(“/tmp/spark/messages”)是您本地 HDFS 的一部分,那么它也会显示在 HDFS 目录中因为 saveAsTextFiles 利用相同的 MapeReduce API 来写入输出。

以上内容适用于 Spark Executors 和 HDFS 在同一台物理机器上的情况,但如果您的 HDFS 目录或 URL 不同且不在同一台机器上,则执行器 运行 , 那么这将不起作用。

如果您需要确保您的数据持久保存在 HDFS 中,那么最好始终提供完整的 HDFS URL。像这样 - saveAsTextFiles("http://<HOST-NAME>:9000/tmp/spark/messages")

或者您也可以利用以下任一方法:-

  1. DStream.saveAsNewAPIHadoopFiles(<HDFS URL with Location>)
  2. DStream.saveAsHadoopFiles(<HDFS URL with Location>)