如何将 spark streaming 保存到本地 pc 和 hdfs?
how to save spark streaming to local pc and hdfs?
尝试过此数据正在流式传输,但无法在本地磁盘或 hdfs 中以元组的形式保存该数据。
从 pyspark 导入 SparkConf,SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
## Constants
APP_NAME = "PythonStreamingDirectKafkaWordCount"
##OTHER FUNCTIONS/CLASSES
def main():
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 2)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
def process(RDD):
#RDD.pprint()
kvs2=RDD.map()
kvs2.saveAsTextFiles('path')
#kvs.foreachRDD(lambda x: process(x))
#kvs1=kvs.map(lambda x: x)
kvs.pprint()
kvs.saveAsTextFiles('path','txt')
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
在这一行:
kvs.saveAsTextFiles('path','txt')
您正在存储原始流,而不是带有元组的流。改为从 counts 存储:
counts.saveAsTextFiles('path','txt')
很好奇 'path'.
中提供的目录下保存在工作节点上的文件
pySpark 不支持保存到 HDFS API 至于最新版本,其他语言有 saveAsHadoopFiles。 Link 到 doc.
尝试过此数据正在流式传输,但无法在本地磁盘或 hdfs 中以元组的形式保存该数据。 从 pyspark 导入 SparkConf,SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
## Constants
APP_NAME = "PythonStreamingDirectKafkaWordCount"
##OTHER FUNCTIONS/CLASSES
def main():
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 2)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
def process(RDD):
#RDD.pprint()
kvs2=RDD.map()
kvs2.saveAsTextFiles('path')
#kvs.foreachRDD(lambda x: process(x))
#kvs1=kvs.map(lambda x: x)
kvs.pprint()
kvs.saveAsTextFiles('path','txt')
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
在这一行:
kvs.saveAsTextFiles('path','txt')
您正在存储原始流,而不是带有元组的流。改为从 counts 存储:
counts.saveAsTextFiles('path','txt')
很好奇 'path'.
中提供的目录下保存在工作节点上的文件pySpark 不支持保存到 HDFS API 至于最新版本,其他语言有 saveAsHadoopFiles。 Link 到 doc.