kafka 将流式传输到 HDFS
kafka to sparkstreaming to HDFS
我正在使用 creatDirectStream 来集成 SparkStreaming 和 Kafka。这是我使用的代码:
val ssc = new StreamingContext(new SparkConf, Seconds(10))
val kafkaParams = Map("metadata.broker.list" -> "sandbox:6667")
val topics = Set("topic1")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
现在我想将消息存储到HDFS 中。这样做对吗?
messages.saveAsTextFiles("/tmp/spark/messages")
saveAsTextFiles("/tmp/spark/messages")
- 这会将您的数据保存在本地文件系统中,如果提供的文件夹结构(“/tmp/spark/messages”)是您本地 HDFS 的一部分,那么它也会显示在 HDFS 目录中因为 saveAsTextFiles
利用相同的 MapeReduce API 来写入输出。
以上内容适用于 Spark Executors 和 HDFS 在同一台物理机器上的情况,但如果您的 HDFS 目录或 URL 不同且不在同一台机器上,则执行器 运行 , 那么这将不起作用。
如果您需要确保您的数据持久保存在 HDFS 中,那么最好始终提供完整的 HDFS URL。像这样 - saveAsTextFiles("http://<HOST-NAME>:9000/tmp/spark/messages")
或者您也可以利用以下任一方法:-
DStream.saveAsNewAPIHadoopFiles(<HDFS URL with Location>)
DStream.saveAsHadoopFiles(<HDFS URL with Location>)
我正在使用 creatDirectStream 来集成 SparkStreaming 和 Kafka。这是我使用的代码:
val ssc = new StreamingContext(new SparkConf, Seconds(10))
val kafkaParams = Map("metadata.broker.list" -> "sandbox:6667")
val topics = Set("topic1")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
现在我想将消息存储到HDFS 中。这样做对吗?
messages.saveAsTextFiles("/tmp/spark/messages")
saveAsTextFiles("/tmp/spark/messages")
- 这会将您的数据保存在本地文件系统中,如果提供的文件夹结构(“/tmp/spark/messages”)是您本地 HDFS 的一部分,那么它也会显示在 HDFS 目录中因为 saveAsTextFiles
利用相同的 MapeReduce API 来写入输出。
以上内容适用于 Spark Executors 和 HDFS 在同一台物理机器上的情况,但如果您的 HDFS 目录或 URL 不同且不在同一台机器上,则执行器 运行 , 那么这将不起作用。
如果您需要确保您的数据持久保存在 HDFS 中,那么最好始终提供完整的 HDFS URL。像这样 - saveAsTextFiles("http://<HOST-NAME>:9000/tmp/spark/messages")
或者您也可以利用以下任一方法:-
DStream.saveAsNewAPIHadoopFiles(<HDFS URL with Location>)
DStream.saveAsHadoopFiles(<HDFS URL with Location>)