sc._jvm.org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper() TypeError: 'JavaPackage' object is not callable when using

sc._jvm.org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper() TypeError: 'JavaPackage' object is not callable when using

我正在学习如何将 sparkkafka 整合。目前我创建了 virtualenv 并安装了 pysparkpy4j 包。

我还配置了这些环境:

  1. PYSPARK_PYTHON : C:\learn_new\learn_utils\venv\Scripts\python.exe
  2. SPARK_HOME : C:\spark-2.4.3-bin-hadoop2.7

那我要运行例子python源码C:\spark-2.4.3-bin-hadoop2.7\examples\src\main\python\streaming\direct_kafka_wordcount.py

脚本代码是这样的:

    from __future__ import print_function
    import sys


    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils


    if __name__ == "__main__":
        if len(sys.argv) != 3:
            print("Usage: direct_kafka_wordcount.py <broker_list> <topic>", file=sys.stderr)
        sys.exit(-1)

    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 2)

    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a+b)
    counts.pprint()

    ssc.start()
    ssc.awaitTermination()

运行 virtualenv 下的 python 代码的命令行是这样的:

python --default --client --host localhost --port 60614 c:\spark-2.4.3-bin-hadoop2.7\examples\src\main\python\streaming\direct_kafka_wordcount.py kafka_host_name:9092 topic_name

然后我得到这个错误:

    File "c:\spark-2.4.3-bin-hadoop2.7\examples\src\main\python\venv\lib\site-packages\pyspark\streaming\kafka.py", line 138, in createDirectStream
        helper = KafkaUtils._get_helper(ssc._sc)
    File "c:\spark-2.4.3-bin-hadoop2.7\examples\src\main\python\venv\lib\site-packages\pyspark\streaming\kafka.py", line 217, in _get_helper
        return sc._jvm.org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper()
    TypeError: 'JavaPackage' object is not callable

这是什么问题? 非常感谢。

我主要想在本地调试代码,所以不想用spark-submit加上--jars 或 --packages 参数 运行 代码。

但是确实需要spark-streaming-kafka-0-8-assembly_2.11-2.4.3.jar包。(这里根据你的spark版本更改包版本)

所以我尝试下载包并将其保存到C:\spark-2.4.3-bin-hadoop2。7\jars(将其更改为您的spark安装路径,并找到jars文件夹) .

那么问题就解决了。希望对其他人有帮助。

我遇到了类似的问题,只是将 jar 分别添加到两个地方,一个是 spark 拥有所有 jar 的地方。其次,将 jar 添加到 pyspark 的 jar 中,该 jar 保存在当前 python 版本中的不同位置。它奏效了