如何过滤坏的和好的 json 事件,然后增加坏 json 记录的指标计数并使用 java 将这些记录存储在 apache beam 中
How to filter bad and good json events and then increment metrics count for bad json record and store those record in apache beam using java
我的 PubSub 主题有 json 原始消息事件,我想过滤好的 json record/events 和坏的 json 记录/事件并存储在不同的 PCollections 中。对于每个坏记录计数器指标,应该增加并将日志存储在另一个 PCollections 中,以便稍后我可以检查日志中是否有坏 json 记录。我需要使用哪个 Apache Beam 转换以及如何使用 Java.
使用这些转换
你可以阅读beam programming guide. You will find great solution and pattern for your use case. For example, to filter the good and the bad JSON, you need to create a transform with a standard output (let's say the correct JSON) and an addition output 不好的JSON。
因此,从那里开始,您有 2 个 PCollection。然后独立处理它们。您可以将错误 [=15=] 放入文件、BigQuery 中,或者简单地创建一个转换,在 Cloud Logging 中写入特殊的日志跟踪,以便稍后在另一个进程中获取和处理此日志跟踪(如果需要)。
我的 PubSub 主题有 json 原始消息事件,我想过滤好的 json record/events 和坏的 json 记录/事件并存储在不同的 PCollections 中。对于每个坏记录计数器指标,应该增加并将日志存储在另一个 PCollections 中,以便稍后我可以检查日志中是否有坏 json 记录。我需要使用哪个 Apache Beam 转换以及如何使用 Java.
使用这些转换你可以阅读beam programming guide. You will find great solution and pattern for your use case. For example, to filter the good and the bad JSON, you need to create a transform with a standard output (let's say the correct JSON) and an addition output 不好的JSON。
因此,从那里开始,您有 2 个 PCollection。然后独立处理它们。您可以将错误 [=15=] 放入文件、BigQuery 中,或者简单地创建一个转换,在 Cloud Logging 中写入特殊的日志跟踪,以便稍后在另一个进程中获取和处理此日志跟踪(如果需要)。