Kafka 字符串到 Dataframe - pyspark
Kafka string to Dataframe - pyspark
我有一个 Kafka 生产者:
producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send('topic', ('12', 'AB DD', 'targer_1', '18'))
producer.send('topic', ('33', 'CC FF', 'target_2', '23'))
Spark 消费者应该处理这个流:
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCountw")
ssc = StreamingContext(sc, 4)
kvs = KafkaUtils.createDirectStream(ssc, topic, {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1])
请帮助我将此流转换为可查询的 JSON
其中键值结构是这样的:
{"A": '12', "B": 'AB DD', "C": 'targer_1', "D": '18'}
我想像这样过滤对象流:
Df.select("A", "C").where("D > 19")
并将其发送回 Kafka。
如果您有任何建议,我很乐意听取。
使用结构化流和完整的 JSON 编码对您来说会容易得多。将数据写入 JSON
from pyspark.sql.functions import from_json, col, to_json
from pyspark.sql.types import *
producer = KafkaProducer(
value_serializer=lambda v: json.dumps(dict(zip(["A", "B", "C", "D"], v))).encode('utf-8')
)
使用 Spark Kafka 阅读 reader(您必须包含 spark-sql-kafka
包):
df = (spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", "topic")
.load())
定义模式:
schema = StructType([StructField(c, StringType()) for c in ["A", "B", "C", "D"]])
解析、过滤和写入
(df
# Parse JSON
.select(from_json(col("value").cast("string"), schema).alias("value"))
# Filter
.where(col("value.D").cast("integer") > 19)
# Serialize to JSON
.select(to_json("value").alias("value"))
# And write
.writeStream
.format("kafka")
.option("topic", output_topic)
.option("kafka.bootstrap.servers", brokers)
.option("checkpointLocation", checkpont_directory)
.start())
使用旧的 API 你可以:
- Use 可以使用
valueDecoder
或 messageHandler
参数 createDirectStream
来解码传入数据。您也可以使用 map
.
使用foreachPartition
可以:
- 将转换后的数据转换为
DataFrame
。
- 过滤掉记录。
- 开始制作人。
- 写入卡夫卡。
我有一个 Kafka 生产者:
producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send('topic', ('12', 'AB DD', 'targer_1', '18'))
producer.send('topic', ('33', 'CC FF', 'target_2', '23'))
Spark 消费者应该处理这个流:
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCountw")
ssc = StreamingContext(sc, 4)
kvs = KafkaUtils.createDirectStream(ssc, topic, {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1])
请帮助我将此流转换为可查询的 JSON 其中键值结构是这样的:
{"A": '12', "B": 'AB DD', "C": 'targer_1', "D": '18'}
我想像这样过滤对象流:
Df.select("A", "C").where("D > 19")
并将其发送回 Kafka。 如果您有任何建议,我很乐意听取。
使用结构化流和完整的 JSON 编码对您来说会容易得多。将数据写入 JSON
from pyspark.sql.functions import from_json, col, to_json
from pyspark.sql.types import *
producer = KafkaProducer(
value_serializer=lambda v: json.dumps(dict(zip(["A", "B", "C", "D"], v))).encode('utf-8')
)
使用 Spark Kafka 阅读 reader(您必须包含 spark-sql-kafka
包):
df = (spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", "topic")
.load())
定义模式:
schema = StructType([StructField(c, StringType()) for c in ["A", "B", "C", "D"]])
解析、过滤和写入
(df
# Parse JSON
.select(from_json(col("value").cast("string"), schema).alias("value"))
# Filter
.where(col("value.D").cast("integer") > 19)
# Serialize to JSON
.select(to_json("value").alias("value"))
# And write
.writeStream
.format("kafka")
.option("topic", output_topic)
.option("kafka.bootstrap.servers", brokers)
.option("checkpointLocation", checkpont_directory)
.start())
使用旧的 API 你可以:
- Use 可以使用
valueDecoder
或messageHandler
参数createDirectStream
来解码传入数据。您也可以使用map
. 使用
foreachPartition
可以:- 将转换后的数据转换为
DataFrame
。 - 过滤掉记录。
- 开始制作人。
- 写入卡夫卡。
- 将转换后的数据转换为