从不适合模式的 Spark Streaming DataFrame 中删除(损坏的)行(来自 Kafka 的传入 JSON 数据)
Remove (corrupt) rows from Spark Streaming DataFrame that don't fit schema (incoming JSON data from Kafka)
我有一个从 Kafka 读入的 spark 结构化 steaming 应用程序。
这是我的代码的基本结构。
我创建了 Spark 会话。
val spark = SparkSession
.builder
.appName("app_name")
.getOrCreate()
然后我从流中读取
val data_stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server_list")
.option("subscribe", "topic")
.load()
在 Kafka 记录中,我将 "value" 转换为字符串。它从二进制转换为字符串。此时数据框中有1列
val df = data_stream
.select($"value".cast("string") as "json")
基于预定义的模式,我尝试将 JSON 结构解析为列。但是,这里的问题是如果数据是 "bad" 或不同的格式,那么它与定义的模式不匹配。所以下一个数据框 (df2) 将空值放入列中。
val df2 = df.select(from_json($"json", schema) as "data")
.select("data.*")
我希望能够从 df2 中过滤掉在特定列(我在数据库中用作主键的列)中具有 "null" 的行,即忽略不存在的不良数据与架构不匹配?
编辑:我有点能够做到这一点,但不是我想要的方式。
在我的过程中,我使用了一个使用 .foreach(writer)
过程的查询。它所做的是打开与数据库的连接,处理每一行,然后关闭连接。 structured streaming 的文档提到了此过程所需的必需品。在 process 方法中,我从每一行中获取值并检查我的主键是否为空,如果为空,我不会将其插入数据库。
Kafka 将数据存储为原始字节数组格式。数据生产者和消费者需要就处理的数据结构达成一致。
如果生成的消息格式发生变化,消费者需要调整以阅读相同的格式。当您的数据结构不断发展时,问题就来了,您可能需要在消费者端兼容。
通过Protobuff定义消息格式解决了这个问题。
只需过滤掉任何您不想要的空值:
df2
.filter(row => row("colName") != null)
我有一个从 Kafka 读入的 spark 结构化 steaming 应用程序。 这是我的代码的基本结构。
我创建了 Spark 会话。
val spark = SparkSession
.builder
.appName("app_name")
.getOrCreate()
然后我从流中读取
val data_stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server_list")
.option("subscribe", "topic")
.load()
在 Kafka 记录中,我将 "value" 转换为字符串。它从二进制转换为字符串。此时数据框中有1列
val df = data_stream
.select($"value".cast("string") as "json")
基于预定义的模式,我尝试将 JSON 结构解析为列。但是,这里的问题是如果数据是 "bad" 或不同的格式,那么它与定义的模式不匹配。所以下一个数据框 (df2) 将空值放入列中。
val df2 = df.select(from_json($"json", schema) as "data")
.select("data.*")
我希望能够从 df2 中过滤掉在特定列(我在数据库中用作主键的列)中具有 "null" 的行,即忽略不存在的不良数据与架构不匹配?
编辑:我有点能够做到这一点,但不是我想要的方式。
在我的过程中,我使用了一个使用 .foreach(writer)
过程的查询。它所做的是打开与数据库的连接,处理每一行,然后关闭连接。 structured streaming 的文档提到了此过程所需的必需品。在 process 方法中,我从每一行中获取值并检查我的主键是否为空,如果为空,我不会将其插入数据库。
Kafka 将数据存储为原始字节数组格式。数据生产者和消费者需要就处理的数据结构达成一致。
如果生成的消息格式发生变化,消费者需要调整以阅读相同的格式。当您的数据结构不断发展时,问题就来了,您可能需要在消费者端兼容。
通过Protobuff定义消息格式解决了这个问题。
只需过滤掉任何您不想要的空值:
df2
.filter(row => row("colName") != null)