如何在 Spark Streaming 应用程序中使用 Kafka 主题?

How do I consume Kafka topic inside spark streaming app?

当我从 Kafka 主题创建流并打印其内容时

    import os
    os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils

    sc = SparkContext(appName="PythonStreamingKafkaWords")
    ssc = StreamingContext(sc, 10)

    lines = KafkaUtils.createDirectStream(ssc, ['sample_topic'], {"bootstrap.servers": 'localhost:9092'})

    lines.pprint()

    ssc.start()
    ssc.awaitTermination()

我得到一个空结果

    -------------------------------------------
    Time: 2019-12-07 13:11:50
    -------------------------------------------

    -------------------------------------------
    Time: 2019-12-07 13:12:00
    -------------------------------------------

    -------------------------------------------
    Time: 2019-12-07 13:12:10
    -------------------------------------------

同时,它在控制台中工作:

    kafka-console-consumer --topic sample_topic --from-beginning --bootstrap-server localhost:9092

正确地给出了我在 Kafka 主题中的所有文本行:

    ham Ok lor... Sony ericsson salesman... I ask shuhui then she say quite gd 2 use so i considering...
    ham Ard 6 like dat lor.
    ham Why don't you wait 'til at least wednesday to see if you get your .
    ham Huh y lei...
    spam    REMINDER FROM O2: To get 2.50 pounds free call credit and details of great offers pls reply 2 this text with your valid name, house no and postcode
    spam    This is the 2nd time we have tried 2 contact u. U have won the £750 Pound prize. 2 claim is easy, call 087187272008 NOW1! Only 10p per minute. BT-national-rate.
    ham Will ü b going to esplanade fr home?
    . . . 

将数据从 Kafka 主题流式传输到 Spark 流应用程序的正确方法是什么?

我推荐使用 Spark 结构化流。它是Spark 2发布时自带的新一代流媒体引擎。你可以在这个link.

中查看

对于 Kafka 集成,您可以在此处查看文档 link

您在流输出中看不到任何数据的原因是 Spark Streaming 默认开始从 latest 读取数据。因此,如果您先启动您的 Spark Streaming 应用程序,然后将数据写入 Kafka,您将在流作业中看到输出。参考文档 here

By default, it will start consuming from the latest offset of each Kafka partition

但您也可以从主题的任何特定偏移量读取数据。看看 createDirectStream 方法 here。它需要一个字典参数 fromOffsets,您可以在其中指定字典中每个分区的偏移量。

我已经使用 kafka 2.2.0 和 spark 2.4.3 以及 Python 3.7.3 测试了以下代码:

启动 pyspark shell 具有 kafka 依赖项:

pyspark --packages org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:2.4.0

运行 下面的代码:

from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 1)
topicPartion = TopicAndPartition('test',0)
fromOffset = {topicPartion: 0}

lines = KafkaUtils.createDirectStream(ssc, ['test'],{"bootstrap.servers": 'localhost:9092'}, fromOffsets=fromOffset)

lines.pprint()

ssc.start()
ssc.awaitTermination()

另外,如果你有 kafka 代理版本 10 或更高版本,你应该考虑使用结构化流而不是 Spark 流。请参阅结构化流文档 here and Structured Streaming with Kafka integration here.

下面是结构化流中 运行 的示例代码。 请根据你的kafka版本和spark版本使用jar版本。 我正在使用 spark 2.4.3Scala 11kafka 0.10 所以使用 jar spark-sql-kafka-0-10_2.11:2.4.3.

开始 pyspark shell:

pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "test") \
  .option("startingOffsets", "earliest") \
  .load()


df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("console") \
  .start()

根据您的代码,我们无法直接打印流式 RDD,应该基于 foreachRDD 进行打印。DStream.foreachRDD 是 Spark Streaming 中的一个 "output operator"。它允许您访问 DStream 的底层 RDD 以执行对数据做一些实际操作的操作。

What's the meaning of DStream.foreachRDD function?

注:: Still 你也可以通过structured streaming来实现。参考:

示例工作代码: 此代码尝试从 kafka 主题读取消息并打印它。您可以根据需要更改此代码。

from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json

def handler(message):
    records = message.collect()
    for record in records:
        print(record[1])

def main():
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 10)

    kvs = KafkaUtils.createDirectStream(ssc, ['topic_name'], {"metadata.broker.list": 'localhost:9192'},valueDecoder=serializer.decode_message)
    kvs.foreachRDD(handler)

    ssc.start()
    ssc.awaitTermination()
if __name__ == "__main__":

   main()