在 PyCharm 中使用 Kafka 进行 Pyspark 流式传输
Pyspark Streaming with Kafka in PyCharm
我最近一直在尝试调试 Pycharm 中的 pyspark.streaming.kafka class,这样与在 linux 框中进行调试相比,它更容易进行故障排除。
这是我的示例代码:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
sc = SparkContext(appName="sample app")
ssc = StreamingContext(sc, 1)
kafkaParams = {"metadata.broker.list": "{broker list}",
"auto.offset.reset": "smallest"}
kafka_stream = KafkaUtils.createDirectStream(ssc, {topic list}, kafkaParams)
但是,我收到以下错误:
Traceback (most recent call last):
File "C:\Program Files (x86)\JetBrains\PyCharm 5.0.3\helpers\pydev\pydevd.py", line 2411, in <module>
globals = debugger.run(setup['file'], None, None, is_module)
File "C:\Program Files (x86)\JetBrains\PyCharm 5.0.3\helpers\pydev\pydevd.py", line 1802, in run
launch(file, globals, locals) # execute the script
File "{script path}", line 30, in <module> {topic}], kafkaParams)
File "C:\spark-1.6.0-bin- hadoop2.6\python\lib\pyspark.zip\pyspark\streaming\kafka.py", line 152, in createDirectStream
py4j.protocol.Py4JJavaError: An error occurred while calling o20.loadClass.
: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Unknown Source)
16/02/22 11:45:49 INFO SparkContext: Invoking stop() from shutdown hook
如果有人可以提供一些关于如何在 PyCharm
中调试 PySpark Kafka 流模块的指导,我将不胜感激
Kafka 支持依赖于外部 spark-streaming-kafka
JAR,它未随 Spark 二进制文件一起提供。通常这可以在提交时使用 --packages
参数指定。
对于使用 PyCharm 的本地开发,我能想到的最简单的解决方案是将其添加到 $SPARK_HOME/conf/spark-defaults.conf
。假设您使用由 Scala 2.10 构建的 Spark 1.6.0:
spark.jars.packages org.apache.spark:spark-streaming-kafka_2.10:1.6.0
请记住,您将无法将 PyCharm 调试器与 Python 工作进程一起使用。参见 How can pyspark be called in debug mode?
我最近一直在尝试调试 Pycharm 中的 pyspark.streaming.kafka class,这样与在 linux 框中进行调试相比,它更容易进行故障排除。
这是我的示例代码:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
sc = SparkContext(appName="sample app")
ssc = StreamingContext(sc, 1)
kafkaParams = {"metadata.broker.list": "{broker list}",
"auto.offset.reset": "smallest"}
kafka_stream = KafkaUtils.createDirectStream(ssc, {topic list}, kafkaParams)
但是,我收到以下错误:
Traceback (most recent call last):
File "C:\Program Files (x86)\JetBrains\PyCharm 5.0.3\helpers\pydev\pydevd.py", line 2411, in <module>
globals = debugger.run(setup['file'], None, None, is_module)
File "C:\Program Files (x86)\JetBrains\PyCharm 5.0.3\helpers\pydev\pydevd.py", line 1802, in run
launch(file, globals, locals) # execute the script
File "{script path}", line 30, in <module> {topic}], kafkaParams)
File "C:\spark-1.6.0-bin- hadoop2.6\python\lib\pyspark.zip\pyspark\streaming\kafka.py", line 152, in createDirectStream
py4j.protocol.Py4JJavaError: An error occurred while calling o20.loadClass.
: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Unknown Source)
16/02/22 11:45:49 INFO SparkContext: Invoking stop() from shutdown hook
如果有人可以提供一些关于如何在 PyCharm
中调试 PySpark Kafka 流模块的指导,我将不胜感激Kafka 支持依赖于外部 spark-streaming-kafka
JAR,它未随 Spark 二进制文件一起提供。通常这可以在提交时使用 --packages
参数指定。
对于使用 PyCharm 的本地开发,我能想到的最简单的解决方案是将其添加到 $SPARK_HOME/conf/spark-defaults.conf
。假设您使用由 Scala 2.10 构建的 Spark 1.6.0:
spark.jars.packages org.apache.spark:spark-streaming-kafka_2.10:1.6.0
请记住,您将无法将 PyCharm 调试器与 Python 工作进程一起使用。参见 How can pyspark be called in debug mode?