在 PyCharm 中使用 Kafka 进行 Pyspark 流式传输

Pyspark Streaming with Kafka in PyCharm

我最近一直在尝试调试 Pycharm 中的 pyspark.streaming.kafka class,这样与在 linux 框中进行调试相比,它更容易进行故障排除。

这是我的示例代码:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition

sc = SparkContext(appName="sample app")
ssc = StreamingContext(sc, 1)
kafkaParams = {"metadata.broker.list": "{broker list}",
               "auto.offset.reset": "smallest"}
kafka_stream = KafkaUtils.createDirectStream(ssc, {topic list}, kafkaParams)

但是,我收到以下错误:

Traceback (most recent call last):
  File "C:\Program Files (x86)\JetBrains\PyCharm   5.0.3\helpers\pydev\pydevd.py", line 2411, in <module>
    globals = debugger.run(setup['file'], None, None, is_module)
  File "C:\Program Files (x86)\JetBrains\PyCharm    5.0.3\helpers\pydev\pydevd.py", line 1802, in run
    launch(file, globals, locals)  # execute the script
  File "{script path}", line 30, in <module> {topic}], kafkaParams)
  File "C:\spark-1.6.0-bin-  hadoop2.6\python\lib\pyspark.zip\pyspark\streaming\kafka.py", line 152, in  createDirectStream
py4j.protocol.Py4JJavaError: An error occurred while calling o20.loadClass.
: java.lang.ClassNotFoundException:   org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper
    at java.net.URLClassLoader.findClass(Unknown Source)
    at java.lang.ClassLoader.loadClass(Unknown Source)
    at java.lang.ClassLoader.loadClass(Unknown Source)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Unknown Source)

16/02/22 11:45:49 INFO SparkContext: Invoking stop() from shutdown hook

如果有人可以提供一些关于如何在 PyCharm

中调试 PySpark Kafka 流模块的指导,我将不胜感激

Kafka 支持依赖于外部 spark-streaming-kafka JAR,它未随 Spark 二进制文件一起提供。通常这可以在提交时使用 --packages 参数指定。

对于使用 PyCharm 的本地开发,我能想到的最简单的解决方案是将其添加到 $SPARK_HOME/conf/spark-defaults.conf。假设您使用由 Scala 2.10 构建的 Spark 1.6.0:

spark.jars.packages org.apache.spark:spark-streaming-kafka_2.10:1.6.0

请记住,您将无法将 PyCharm 调试器与 Python 工作进程一起使用。参见 How can pyspark be called in debug mode?