如何捕获 Apache Beam 中内置转换抛出的异常(在本例中为 JSON 解析)
Howto catch exceptions thrown by built-in transform in Apache Beam (in this case JSON Parsing)
我的管道如下(其中StringToKVTransForm、kafkaoutput、kafkainput是我在别处创建或配置的转换;这里重点是ParseJsons,因为它是内置转换
try {
PCollection<MyClass> myObjects = p
.apply(kafkaInput.withoutMetadata())
.apply(Values.create())
.apply(ParseJsons.of(MyClass.class)).setCoder(SerializableCoder.of(MyClass.class))
.apply(AsJsons.of(MyClass.class))
.apply(new StringToKvTransform())
.apply(kafkaOutput);
} catch (Throwable e){
log.info("Unexpected error", e);
}
log.info("pipeline initialized");
p.run().waitUntilFinish();
}
这里的问题是,由于各种原因,我得到的数据可能并不总是正确的 json 格式;不幸的是,这会导致整个管道崩溃
org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: Failed to parse a path.to.MyClass from JSON value: {
"myIncorrectJsonString"
}
在这种情况下,我希望我的管道继续运行并忽略不正确的输入事件,但是,我不明白如何...
原因是这是一个内置的转换(ParseJsons),它似乎把错误抛到了我无法控制的地方,导致整个程序崩溃。
All the 我看过的教程建议捕获转换中的错误,这显然不是这里的选项。
我的 goto 解决方案是扩展 ParseJsons class 并捕获错误,但它有一个私有构造函数,因此无法扩展。
任何想法,或者我是否必须编写自己的 ParseJsons 转换 class?
不幸的是,我认为他们没有任何干净的方法来做到这一点。但是,如果您打算编写自己的转换,那么如果您可以增强 ParseJsons 以添加可选的无效 json 输出流,那就太好了。它可能在一般情况下有用。
我只是想补充一下 Ankur 之前所说的内容,BEAM-5638 中已经完成了一些工作来添加异常处理,但还没有完全 done/merged JSON ] 变换。
编辑:Exceptions handling for Json transforms was recently merged 它将很快在 Beam 2.17 版本中可用。
我的管道如下(其中StringToKVTransForm、kafkaoutput、kafkainput是我在别处创建或配置的转换;这里重点是ParseJsons,因为它是内置转换
try {
PCollection<MyClass> myObjects = p
.apply(kafkaInput.withoutMetadata())
.apply(Values.create())
.apply(ParseJsons.of(MyClass.class)).setCoder(SerializableCoder.of(MyClass.class))
.apply(AsJsons.of(MyClass.class))
.apply(new StringToKvTransform())
.apply(kafkaOutput);
} catch (Throwable e){
log.info("Unexpected error", e);
}
log.info("pipeline initialized");
p.run().waitUntilFinish();
}
这里的问题是,由于各种原因,我得到的数据可能并不总是正确的 json 格式;不幸的是,这会导致整个管道崩溃
org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: Failed to parse a path.to.MyClass from JSON value: { "myIncorrectJsonString" }
在这种情况下,我希望我的管道继续运行并忽略不正确的输入事件,但是,我不明白如何...
原因是这是一个内置的转换(ParseJsons),它似乎把错误抛到了我无法控制的地方,导致整个程序崩溃。
All the 我看过的教程建议捕获转换中的错误,这显然不是这里的选项。
我的 goto 解决方案是扩展 ParseJsons class 并捕获错误,但它有一个私有构造函数,因此无法扩展。
任何想法,或者我是否必须编写自己的 ParseJsons 转换 class?
不幸的是,我认为他们没有任何干净的方法来做到这一点。但是,如果您打算编写自己的转换,那么如果您可以增强 ParseJsons 以添加可选的无效 json 输出流,那就太好了。它可能在一般情况下有用。
我只是想补充一下 Ankur 之前所说的内容,BEAM-5638 中已经完成了一些工作来添加异常处理,但还没有完全 done/merged JSON ] 变换。
编辑:Exceptions handling for Json transforms was recently merged 它将很快在 Beam 2.17 版本中可用。