如何在 Spark Streaming 应用程序中使用 Kafka 主题?
How do I consume Kafka topic inside spark streaming app?
当我从 Kafka 主题创建流并打印其内容时
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
sc = SparkContext(appName="PythonStreamingKafkaWords")
ssc = StreamingContext(sc, 10)
lines = KafkaUtils.createDirectStream(ssc, ['sample_topic'], {"bootstrap.servers": 'localhost:9092'})
lines.pprint()
ssc.start()
ssc.awaitTermination()
我得到一个空结果
-------------------------------------------
Time: 2019-12-07 13:11:50
-------------------------------------------
-------------------------------------------
Time: 2019-12-07 13:12:00
-------------------------------------------
-------------------------------------------
Time: 2019-12-07 13:12:10
-------------------------------------------
同时,它在控制台中工作:
kafka-console-consumer --topic sample_topic --from-beginning --bootstrap-server localhost:9092
正确地给出了我在 Kafka 主题中的所有文本行:
ham Ok lor... Sony ericsson salesman... I ask shuhui then she say quite gd 2 use so i considering...
ham Ard 6 like dat lor.
ham Why don't you wait 'til at least wednesday to see if you get your .
ham Huh y lei...
spam REMINDER FROM O2: To get 2.50 pounds free call credit and details of great offers pls reply 2 this text with your valid name, house no and postcode
spam This is the 2nd time we have tried 2 contact u. U have won the £750 Pound prize. 2 claim is easy, call 087187272008 NOW1! Only 10p per minute. BT-national-rate.
ham Will ü b going to esplanade fr home?
. . .
将数据从 Kafka 主题流式传输到 Spark 流应用程序的正确方法是什么?
我推荐使用 Spark 结构化流。它是Spark 2发布时自带的新一代流媒体引擎。你可以在这个link.
中查看
对于 Kafka 集成,您可以在此处查看文档 link。
您在流输出中看不到任何数据的原因是 Spark Streaming 默认开始从 latest
读取数据。因此,如果您先启动您的 Spark Streaming 应用程序,然后将数据写入 Kafka,您将在流作业中看到输出。参考文档 here:
By default, it will start consuming from the latest offset of each Kafka partition
但您也可以从主题的任何特定偏移量读取数据。看看 createDirectStream
方法 here。它需要一个字典参数 fromOffsets
,您可以在其中指定字典中每个分区的偏移量。
我已经使用 kafka 2.2.0 和 spark 2.4.3 以及 Python 3.7.3 测试了以下代码:
启动 pyspark
shell 具有 kafka 依赖项:
pyspark --packages org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:2.4.0
运行 下面的代码:
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 1)
topicPartion = TopicAndPartition('test',0)
fromOffset = {topicPartion: 0}
lines = KafkaUtils.createDirectStream(ssc, ['test'],{"bootstrap.servers": 'localhost:9092'}, fromOffsets=fromOffset)
lines.pprint()
ssc.start()
ssc.awaitTermination()
另外,如果你有 kafka 代理版本 10 或更高版本,你应该考虑使用结构化流而不是 Spark 流。请参阅结构化流文档 here and Structured Streaming with Kafka integration here.
下面是结构化流中 运行 的示例代码。
请根据你的kafka版本和spark版本使用jar版本。
我正在使用 spark 2.4.3
与 Scala 11
和 kafka 0.10
所以使用 jar spark-sql-kafka-0-10_2.11:2.4.3
.
开始 pyspark
shell:
pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test") \
.option("startingOffsets", "earliest") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("console") \
.start()
根据您的代码,我们无法直接打印流式 RDD,应该基于 foreachRDD 进行打印。DStream.foreachRDD 是 Spark Streaming 中的一个 "output operator"。它允许您访问 DStream 的底层 RDD 以执行对数据做一些实际操作的操作。
What's the meaning of DStream.foreachRDD function?
注:: Still 你也可以通过structured streaming来实现。参考:
示例工作代码: 此代码尝试从 kafka 主题读取消息并打印它。您可以根据需要更改此代码。
from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
def handler(message):
records = message.collect()
for record in records:
print(record[1])
def main():
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 10)
kvs = KafkaUtils.createDirectStream(ssc, ['topic_name'], {"metadata.broker.list": 'localhost:9192'},valueDecoder=serializer.decode_message)
kvs.foreachRDD(handler)
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
当我从 Kafka 主题创建流并打印其内容时
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
sc = SparkContext(appName="PythonStreamingKafkaWords")
ssc = StreamingContext(sc, 10)
lines = KafkaUtils.createDirectStream(ssc, ['sample_topic'], {"bootstrap.servers": 'localhost:9092'})
lines.pprint()
ssc.start()
ssc.awaitTermination()
我得到一个空结果
-------------------------------------------
Time: 2019-12-07 13:11:50
-------------------------------------------
-------------------------------------------
Time: 2019-12-07 13:12:00
-------------------------------------------
-------------------------------------------
Time: 2019-12-07 13:12:10
-------------------------------------------
同时,它在控制台中工作:
kafka-console-consumer --topic sample_topic --from-beginning --bootstrap-server localhost:9092
正确地给出了我在 Kafka 主题中的所有文本行:
ham Ok lor... Sony ericsson salesman... I ask shuhui then she say quite gd 2 use so i considering...
ham Ard 6 like dat lor.
ham Why don't you wait 'til at least wednesday to see if you get your .
ham Huh y lei...
spam REMINDER FROM O2: To get 2.50 pounds free call credit and details of great offers pls reply 2 this text with your valid name, house no and postcode
spam This is the 2nd time we have tried 2 contact u. U have won the £750 Pound prize. 2 claim is easy, call 087187272008 NOW1! Only 10p per minute. BT-national-rate.
ham Will ü b going to esplanade fr home?
. . .
将数据从 Kafka 主题流式传输到 Spark 流应用程序的正确方法是什么?
我推荐使用 Spark 结构化流。它是Spark 2发布时自带的新一代流媒体引擎。你可以在这个link.
中查看对于 Kafka 集成,您可以在此处查看文档 link。
您在流输出中看不到任何数据的原因是 Spark Streaming 默认开始从 latest
读取数据。因此,如果您先启动您的 Spark Streaming 应用程序,然后将数据写入 Kafka,您将在流作业中看到输出。参考文档 here:
By default, it will start consuming from the latest offset of each Kafka partition
但您也可以从主题的任何特定偏移量读取数据。看看 createDirectStream
方法 here。它需要一个字典参数 fromOffsets
,您可以在其中指定字典中每个分区的偏移量。
我已经使用 kafka 2.2.0 和 spark 2.4.3 以及 Python 3.7.3 测试了以下代码:
启动 pyspark
shell 具有 kafka 依赖项:
pyspark --packages org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:2.4.0
运行 下面的代码:
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 1)
topicPartion = TopicAndPartition('test',0)
fromOffset = {topicPartion: 0}
lines = KafkaUtils.createDirectStream(ssc, ['test'],{"bootstrap.servers": 'localhost:9092'}, fromOffsets=fromOffset)
lines.pprint()
ssc.start()
ssc.awaitTermination()
另外,如果你有 kafka 代理版本 10 或更高版本,你应该考虑使用结构化流而不是 Spark 流。请参阅结构化流文档 here and Structured Streaming with Kafka integration here.
下面是结构化流中 运行 的示例代码。
请根据你的kafka版本和spark版本使用jar版本。
我正在使用 spark 2.4.3
与 Scala 11
和 kafka 0.10
所以使用 jar spark-sql-kafka-0-10_2.11:2.4.3
.
开始 pyspark
shell:
pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test") \
.option("startingOffsets", "earliest") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("console") \
.start()
根据您的代码,我们无法直接打印流式 RDD,应该基于 foreachRDD 进行打印。DStream.foreachRDD 是 Spark Streaming 中的一个 "output operator"。它允许您访问 DStream 的底层 RDD 以执行对数据做一些实际操作的操作。
What's the meaning of DStream.foreachRDD function?
注:: Still 你也可以通过structured streaming来实现。参考:
示例工作代码: 此代码尝试从 kafka 主题读取消息并打印它。您可以根据需要更改此代码。
from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
def handler(message):
records = message.collect()
for record in records:
print(record[1])
def main():
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 10)
kvs = KafkaUtils.createDirectStream(ssc, ['topic_name'], {"metadata.broker.list": 'localhost:9192'},valueDecoder=serializer.decode_message)
kvs.foreachRDD(handler)
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()