在 Python 中使用 Spark Structured Streaming 从 Kafka 读取数据并打印到控制台
Read data from Kafka and print to console with Spark Structured Sreaming in Python
我 kafka_2.13-2.7.0 Ubuntu 20.04。我 运行 kafka 服务器和 zookeeper 然后创建一个主题并通过 nc -lk 9999
在其中发送一个文本文件。该主题充满了数据。另外,我的系统上有 spark-3.0.1-bin-hadoop2.7。事实上,我想使用 kafka 主题作为 Spark Structured Streaming 的来源 python。我的代码是这样的:
spark = SparkSession \
.builder \
.appName("APP") \
.getOrCreate()
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "sparktest") \
.option("startingOffsets", "earliest") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df.printSchema()
我 运行 上述代码通过 spark-submit 使用此命令:
./spark-submit --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 /home/spark/PycharmProjects/testSparkStream/KafkaToSpark.py
代码 运行 无一例外,我在 Spark 站点中收到此输出:
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
我的问题是 kafka 主题充满了数据;但是由于 运行 输出中的代码没有任何数据。你能指导我这里有什么问题吗?
代码不会打印出任何数据,只会为您提供一次架构。
您可以按照通用Structured Streaming Guide and the Structured Streaming + Kafka integration Guide中给出的说明查看如何将数据打印到控制台。请记住,在 Spark 中读取数据是一种惰性操作,没有任何操作(通常是 writeStream
操作)什么都不做。
如果您补充如下代码,您应该会看到所选数据(键和值)打印到控制台:
spark = SparkSession \
.builder \
.appName("APP") \
.getOrCreate()
df = spark\
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "sparktest") \
.option("startingOffsets", "earliest") \
.load()
query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("console") \
.option("checkpointLocation", "path/to/HDFS/dir") \
.start()
query.awaitTermination()
我 kafka_2.13-2.7.0 Ubuntu 20.04。我 运行 kafka 服务器和 zookeeper 然后创建一个主题并通过 nc -lk 9999
在其中发送一个文本文件。该主题充满了数据。另外,我的系统上有 spark-3.0.1-bin-hadoop2.7。事实上,我想使用 kafka 主题作为 Spark Structured Streaming 的来源 python。我的代码是这样的:
spark = SparkSession \
.builder \
.appName("APP") \
.getOrCreate()
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "sparktest") \
.option("startingOffsets", "earliest") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df.printSchema()
我 运行 上述代码通过 spark-submit 使用此命令:
./spark-submit --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 /home/spark/PycharmProjects/testSparkStream/KafkaToSpark.py
代码 运行 无一例外,我在 Spark 站点中收到此输出:
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
我的问题是 kafka 主题充满了数据;但是由于 运行 输出中的代码没有任何数据。你能指导我这里有什么问题吗?
代码不会打印出任何数据,只会为您提供一次架构。
您可以按照通用Structured Streaming Guide and the Structured Streaming + Kafka integration Guide中给出的说明查看如何将数据打印到控制台。请记住,在 Spark 中读取数据是一种惰性操作,没有任何操作(通常是 writeStream
操作)什么都不做。
如果您补充如下代码,您应该会看到所选数据(键和值)打印到控制台:
spark = SparkSession \
.builder \
.appName("APP") \
.getOrCreate()
df = spark\
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "sparktest") \
.option("startingOffsets", "earliest") \
.load()
query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("console") \
.option("checkpointLocation", "path/to/HDFS/dir") \
.start()
query.awaitTermination()