Kafka 字符串到 Dataframe - pyspark

Kafka string to Dataframe - pyspark

我有一个 Kafka 生产者:

producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send('topic', ('12', 'AB DD', 'targer_1', '18'))
producer.send('topic', ('33', 'CC FF', 'target_2', '23'))

Spark 消费者应该处理这个流:

sc = SparkContext(appName="PythonStreamingDirectKafkaWordCountw")
ssc = StreamingContext(sc, 4)
kvs = KafkaUtils.createDirectStream(ssc, topic, {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1])

请帮助我将此流转换为可查询的 JSON 其中键值结构是这样的:

{"A": '12', "B": 'AB DD', "C": 'targer_1', "D": '18'}

我想像这样过滤对象流:

Df.select("A", "C").where("D > 19")

并将其发送回 Kafka。 如果您有任何建议,我很乐意听取。

使用结构化流和完整的 JSON 编码对您来说会容易得多。将数据写入 JSON

from pyspark.sql.functions import from_json, col, to_json
from pyspark.sql.types import *

producer = KafkaProducer(
   value_serializer=lambda v: json.dumps(dict(zip(["A", "B", "C", "D"], v))).encode('utf-8')
)

使用 Spark Kafka 阅读 reader(您必须包含 spark-sql-kafka 包):

df = (spark.readStream.format("kafka")
   .option("kafka.bootstrap.servers", brokers)
   .option("subscribe", "topic")
   .load())

定义模式:

schema = StructType([StructField(c, StringType()) for c in ["A", "B", "C", "D"]])

解析、过滤和写入

(df
    # Parse JSON
    .select(from_json(col("value").cast("string"), schema).alias("value"))
    # Filter
    .where(col("value.D").cast("integer") > 19)
    # Serialize to JSON
    .select(to_json("value").alias("value"))
    # And write
    .writeStream
    .format("kafka")
    .option("topic", output_topic)
    .option("kafka.bootstrap.servers", brokers)
    .option("checkpointLocation", checkpont_directory)
    .start())

使用旧的 API 你可以:

  • Use 可以使用 valueDecodermessageHandler 参数 createDirectStream 来解码传入数据。您也可以使用 map.
  • 使用foreachPartition可以:

    • 将转换后的数据转换为 DataFrame
    • 过滤掉记录。
    • 开始制作人。
    • 写入卡夫卡。