pyspark 找不到 KafkaUtils.createDirectStream
pyspark is unable to find KafkaUtils.createDirectStream
我有以下 pyspark 脚本,假设连接到本地 kafka 集群:
from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
## Constants
APP_NAME = "PythonStreamingDirectKafkaWordCount"
##OTHER FUNCTIONS/CLASSES
def main():
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 2)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
当我运行这个时,我得到以下错误:
File "/home/ubuntu/spark-1.3.0-bin-hadoop2.4/hello1.py", line 16, in main
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
AttributeError: type object 'KafkaUtils' has no attribute 'createDirectStream'
我应该怎么做才能访问 KafkaUtils.createDirectStream?
您正在使用 Spark 1.3.0,Python 版本的 createDirectStream
已在 Spark 1.4.0 中引入。 Spark 1.3 仅提供 Scala 和 Java 实现。
如果您想使用直接流,则必须更新您的 Spark 安装。
我有以下 pyspark 脚本,假设连接到本地 kafka 集群:
from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
## Constants
APP_NAME = "PythonStreamingDirectKafkaWordCount"
##OTHER FUNCTIONS/CLASSES
def main():
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 2)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
当我运行这个时,我得到以下错误:
File "/home/ubuntu/spark-1.3.0-bin-hadoop2.4/hello1.py", line 16, in main
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
AttributeError: type object 'KafkaUtils' has no attribute 'createDirectStream'
我应该怎么做才能访问 KafkaUtils.createDirectStream?
您正在使用 Spark 1.3.0,Python 版本的 createDirectStream
已在 Spark 1.4.0 中引入。 Spark 1.3 仅提供 Scala 和 Java 实现。
如果您想使用直接流,则必须更新您的 Spark 安装。