如何捕获 Apache Beam 中内置转换抛出的异常(在本例中为 JSON 解析)

Howto catch exceptions thrown by built-in transform in Apache Beam (in this case JSON Parsing)

我的管道如下(其中StringToKVTransForm、kafkaoutput、kafkainput是我在别处创建或配置的转换;这里重点是ParseJsons,因为它是内置转换

try {
    PCollection<MyClass> myObjects = p
        .apply(kafkaInput.withoutMetadata())
        .apply(Values.create())
        .apply(ParseJsons.of(MyClass.class)).setCoder(SerializableCoder.of(MyClass.class))
        .apply(AsJsons.of(MyClass.class))
        .apply(new StringToKvTransform())
        .apply(kafkaOutput);
    } catch (Throwable e){
        log.info("Unexpected error", e);
    }
    log.info("pipeline initialized");
    p.run().waitUntilFinish();
}

这里的问题是,由于各种原因,我得到的数据可能并不总是正确的 json 格式;不幸的是,这会导致整个管道崩溃

org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: Failed to parse a path.to.MyClass from JSON value: { "myIncorrectJsonString" }

在这种情况下,我希望我的管道继续运行并忽略不正确的输入事件,但是,我不明白如何...

原因是这是一个内置的转换(ParseJsons),它似乎把错误抛到了我无法控制的地方,导致整个程序崩溃。

All the 我看过的教程建议捕获转换中的错误,这显然不是这里的选项。

我的 goto 解决方案是扩展 ParseJsons class 并捕获错误,但它有一个私有构造函数,因此无法扩展。

任何想法,或者我是否必须编写自己的 ParseJsons 转换 class?

不幸的是,我认为他们没有任何干净的方法来做到这一点。但是,如果您打算编写自己的转换,那么如果您可以增强 ParseJsons 以添加可选的无效 json 输出流,那就太好了。它可能在一般情况下有用。

我只是想补充一下 Ankur 之前所说的内容,BEAM-5638 中已经完成了一些工作来添加异常处理,但还没有完全 done/merged JSON ] 变换。

编辑:Exceptions handling for Json transforms was recently merged 它将很快在 Beam 2.17 版本中可用。