使用 Python 将 Apache Kafka 与 Apache Spark Streaming 集成
Integrating Apache Kafka with Apache Spark Streaming using Python
我正在尝试使用 Python 将 Apache Kafka 与 Apache Spark Streaming 集成(我对所有这些都不熟悉)。
为此我做了以下步骤
- 启动 Zookeeper
- 启动 Apache Kafka
- 在 Apache Kafka 中添加了主题
- 设法使用此命令列出可用主题
bin/kafka-topics.sh --list --zookeeper localhost:2181
- 我从这里获取了 Kafka 字数统计代码
https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/kafka_wordcount.py
代码是
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
ssc = StreamingContext(sc, 1)
zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
- 我使用命令
执行了代码
./spark-submit /root/girish/python/kafkawordcount.py localhost:2181
我遇到了这个错误
Traceback (most recent call last):
File "/root/girish/python/kafkawordcount.py", line 28, in <module>
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
File "/root/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/pyspark/streaming/kafka.py", line 72, in createStream
raise e
py4j.protocol.Py4JJavaError: An error occurred while calling o23.loadClass.
: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper
at java.net.URLClassLoader.run(URLClassLoader.java:366)
at java.net.URLClassLoader.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
- 我已经使用这个问题的答案更新了执行代码
到
./spark-submit --jars /root/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/lib/spark-streaming-kafka_2.10-1.3.1.jar,/usr/hdp/2.2.0.0-2041/kafka/libs/kafka_2.10-0.8.1.2.2.0.0-2041.jar,/usr/hdp/2.2.0.0-2041/kafka/libs/zkclient-0.3.jar,/usr/hdp/2.2.0.0-2041/kafka/libs/metrics-core-2.2.0.jar /root/girish/python/kafkawordcount.py localhost:2181 <topic name>
现在我收到这个错误
File "/root/girish/python/kafkawordcount.py", line 28, in <module>
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
File "/root/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/pyspark/streaming/kafka.py", line 67, in createStream
jstream = helper.createStream(ssc._jssc, kafkaParams, topics, jlevel)
File "/root/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 529, in __call__
File "/root/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 265, in get_command_part
AttributeError: 'dict' object has no attribute '_get_object_id'
请帮忙解决这个问题。
提前致谢
PS:我正在使用 Apache Spark 1.2
问题已通过使用 Apache Spark 1.3 解决,它对 Python 的支持比版本 1.2
更好
遇到同样的问题,已通过添加 kafka-assembly 包修复
bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.1 ~/py/sparkjob.py
根据你的spark和kafka版本使用。
我正在尝试使用 Python 将 Apache Kafka 与 Apache Spark Streaming 集成(我对所有这些都不熟悉)。
为此我做了以下步骤
- 启动 Zookeeper
- 启动 Apache Kafka
- 在 Apache Kafka 中添加了主题
- 设法使用此命令列出可用主题
bin/kafka-topics.sh --list --zookeeper localhost:2181
- 我从这里获取了 Kafka 字数统计代码
https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/kafka_wordcount.py
代码是
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
ssc = StreamingContext(sc, 1)
zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
- 我使用命令 执行了代码
./spark-submit /root/girish/python/kafkawordcount.py localhost:2181
我遇到了这个错误
Traceback (most recent call last):
File "/root/girish/python/kafkawordcount.py", line 28, in <module>
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
File "/root/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/pyspark/streaming/kafka.py", line 72, in createStream
raise e
py4j.protocol.Py4JJavaError: An error occurred while calling o23.loadClass.
: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper
at java.net.URLClassLoader.run(URLClassLoader.java:366)
at java.net.URLClassLoader.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
- 我已经使用这个问题的答案更新了执行代码
到
./spark-submit --jars /root/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/lib/spark-streaming-kafka_2.10-1.3.1.jar,/usr/hdp/2.2.0.0-2041/kafka/libs/kafka_2.10-0.8.1.2.2.0.0-2041.jar,/usr/hdp/2.2.0.0-2041/kafka/libs/zkclient-0.3.jar,/usr/hdp/2.2.0.0-2041/kafka/libs/metrics-core-2.2.0.jar /root/girish/python/kafkawordcount.py localhost:2181 <topic name>
现在我收到这个错误
File "/root/girish/python/kafkawordcount.py", line 28, in <module>
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
File "/root/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/pyspark/streaming/kafka.py", line 67, in createStream
jstream = helper.createStream(ssc._jssc, kafkaParams, topics, jlevel)
File "/root/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 529, in __call__
File "/root/spark-1.2.0.2.2.0.0-82-bin-2.6.0.2.2.0.0-2041/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 265, in get_command_part
AttributeError: 'dict' object has no attribute '_get_object_id'
请帮忙解决这个问题。
提前致谢
PS:我正在使用 Apache Spark 1.2
问题已通过使用 Apache Spark 1.3 解决,它对 Python 的支持比版本 1.2
更好遇到同样的问题,已通过添加 kafka-assembly 包修复
bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.1 ~/py/sparkjob.py
根据你的spark和kafka版本使用。