使用 PySpark Structured Streaming 在不重新启动会话的情况下使用 Kafka 源从模式中添加/删除列
Adding / removing columns from schema with Kafka source without restarting the session using PySpark Structured Streaming
我使用的是 Pyspark 3.2.0,我是结构化流媒体的新手,找不到这个问题的答案。
我想使用如下预定义模式从 Kafka 主题读取 json 数据(省略与初始化/连接相关的代码):
# The skeleton schema is defined in 'schema.py'
skeleton_schema = get_skeleton_schema()
df = df.selectExpr("CAST(value AS STRING)") \
.select(from_json("value", skeleton_schema).alias("data")) \
.select(col("data.*"))
...
df.writeStream \
.format("console") \
.outputMode("append") \
.trigger(processingTime='5 minutes') \
.start()
df.awaitTermination()
我希望能够修改 'schema.py' 文件中的 skeleton_schema(例如 add/remove 列)并将这些更改反映到未来的触发器中。有没有办法做到这一点?如果不是,是否有不同的机制可以在不重新启动会话的情况下更新模式?
除非 get_skeleton_schema()
函数本身是每个批处理 运行 且未缓存(例如,调用外部 REST API、数据库或解析某些文件),否则它不会在显示的代码中,那么不,不可能在运行时更改它。
请记住,无法保证运行同一批次中的所有记录都具有相同的架构....
您需要将列作为字节使用,然后使用 ForEachWriter 实现来实现它,但我对 pyspark 不够熟悉,无法举个例子
根据您实际要将数据写入的位置(不是控制台,例如使用 Mongo 或 Snowflake),您可以考虑使用 Kafka Connect,然后使用 Avro 或 Protobuf 序列化而不是 JSON。然后您的生产者将决定何时以向后兼容的方式 introduce/remove 列,由模式注册表强制执行,并且您的消费者不必自己更改或定义任何模式
我使用的是 Pyspark 3.2.0,我是结构化流媒体的新手,找不到这个问题的答案。
我想使用如下预定义模式从 Kafka 主题读取 json 数据(省略与初始化/连接相关的代码):
# The skeleton schema is defined in 'schema.py'
skeleton_schema = get_skeleton_schema()
df = df.selectExpr("CAST(value AS STRING)") \
.select(from_json("value", skeleton_schema).alias("data")) \
.select(col("data.*"))
...
df.writeStream \
.format("console") \
.outputMode("append") \
.trigger(processingTime='5 minutes') \
.start()
df.awaitTermination()
我希望能够修改 'schema.py' 文件中的 skeleton_schema(例如 add/remove 列)并将这些更改反映到未来的触发器中。有没有办法做到这一点?如果不是,是否有不同的机制可以在不重新启动会话的情况下更新模式?
除非 get_skeleton_schema()
函数本身是每个批处理 运行 且未缓存(例如,调用外部 REST API、数据库或解析某些文件),否则它不会在显示的代码中,那么不,不可能在运行时更改它。
请记住,无法保证运行同一批次中的所有记录都具有相同的架构....
您需要将列作为字节使用,然后使用 ForEachWriter 实现来实现它,但我对 pyspark 不够熟悉,无法举个例子
根据您实际要将数据写入的位置(不是控制台,例如使用 Mongo 或 Snowflake),您可以考虑使用 Kafka Connect,然后使用 Avro 或 Protobuf 序列化而不是 JSON。然后您的生产者将决定何时以向后兼容的方式 introduce/remove 列,由模式注册表强制执行,并且您的消费者不必自己更改或定义任何模式