sc._jvm.org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper() TypeError: 'JavaPackage' object is not callable when using
sc._jvm.org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper() TypeError: 'JavaPackage' object is not callable when using
我正在学习如何将 spark
与 kafka
整合。目前我创建了 virtualenv
并安装了 pyspark
、py4j
包。
我还配置了这些环境:
- PYSPARK_PYTHON :
C:\learn_new\learn_utils\venv\Scripts\python.exe
- SPARK_HOME :
C:\spark-2.4.3-bin-hadoop2.7
那我要运行例子python源码C:\spark-2.4.3-bin-hadoop2.7\examples\src\main\python\streaming\direct_kafka_wordcount.py
脚本代码是这样的:
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: direct_kafka_wordcount.py <broker_list> <topic>", file=sys.stderr)
sys.exit(-1)
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 2)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
运行 virtualenv
下的 python 代码的命令行是这样的:
python --default --client --host localhost --port 60614 c:\spark-2.4.3-bin-hadoop2.7\examples\src\main\python\streaming\direct_kafka_wordcount.py kafka_host_name:9092 topic_name
然后我得到这个错误:
File "c:\spark-2.4.3-bin-hadoop2.7\examples\src\main\python\venv\lib\site-packages\pyspark\streaming\kafka.py", line 138, in createDirectStream
helper = KafkaUtils._get_helper(ssc._sc)
File "c:\spark-2.4.3-bin-hadoop2.7\examples\src\main\python\venv\lib\site-packages\pyspark\streaming\kafka.py", line 217, in _get_helper
return sc._jvm.org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper()
TypeError: 'JavaPackage' object is not callable
这是什么问题?
非常感谢。
我主要想在本地调试代码,所以不想用spark-submit加上--jars
或 --packages 参数 运行 代码。
但是确实需要spark-streaming-kafka-0-8-assembly_2.11-2.4.3.jar包。(这里根据你的spark版本更改包版本)
所以我尝试下载包并将其保存到C:\spark-2.4.3-bin-hadoop2。7\jars(将其更改为您的spark安装路径,并找到jars文件夹) .
那么问题就解决了。希望对其他人有帮助。
我遇到了类似的问题,只是将 jar 分别添加到两个地方,一个是 spark 拥有所有 jar 的地方。其次,将 jar 添加到 pyspark 的 jar 中,该 jar 保存在当前 python 版本中的不同位置。它奏效了
我正在学习如何将 spark
与 kafka
整合。目前我创建了 virtualenv
并安装了 pyspark
、py4j
包。
我还配置了这些环境:
- PYSPARK_PYTHON :
C:\learn_new\learn_utils\venv\Scripts\python.exe
- SPARK_HOME :
C:\spark-2.4.3-bin-hadoop2.7
那我要运行例子python源码C:\spark-2.4.3-bin-hadoop2.7\examples\src\main\python\streaming\direct_kafka_wordcount.py
脚本代码是这样的:
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: direct_kafka_wordcount.py <broker_list> <topic>", file=sys.stderr)
sys.exit(-1)
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 2)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
运行 virtualenv
下的 python 代码的命令行是这样的:
python --default --client --host localhost --port 60614 c:\spark-2.4.3-bin-hadoop2.7\examples\src\main\python\streaming\direct_kafka_wordcount.py kafka_host_name:9092 topic_name
然后我得到这个错误:
File "c:\spark-2.4.3-bin-hadoop2.7\examples\src\main\python\venv\lib\site-packages\pyspark\streaming\kafka.py", line 138, in createDirectStream
helper = KafkaUtils._get_helper(ssc._sc)
File "c:\spark-2.4.3-bin-hadoop2.7\examples\src\main\python\venv\lib\site-packages\pyspark\streaming\kafka.py", line 217, in _get_helper
return sc._jvm.org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper()
TypeError: 'JavaPackage' object is not callable
这是什么问题? 非常感谢。
我主要想在本地调试代码,所以不想用spark-submit加上--jars 或 --packages 参数 运行 代码。
但是确实需要spark-streaming-kafka-0-8-assembly_2.11-2.4.3.jar包。(这里根据你的spark版本更改包版本)
所以我尝试下载包并将其保存到C:\spark-2.4.3-bin-hadoop2。7\jars(将其更改为您的spark安装路径,并找到jars文件夹) .
那么问题就解决了。希望对其他人有帮助。
我遇到了类似的问题,只是将 jar 分别添加到两个地方,一个是 spark 拥有所有 jar 的地方。其次,将 jar 添加到 pyspark 的 jar 中,该 jar 保存在当前 python 版本中的不同位置。它奏效了