org.json.JSONObject 的 Apache Beam 编码器
Apache Beam Coder for org.json.JSONObject
我正在 Apache Beam 中编写一个数据管道,它从 Pub/Sub 中读取,将消息反序列化为 JSONObjects 并将它们传递到其他一些管道阶段。问题是,当我尝试提交代码时出现以下错误:
An exception occured while executing the Java class. Unable to return a default Coder for Convert to JSON and obfuscate PII data/ParMultiDo(JSONifyAndObfuscate).output [PCollection]. Correct one of the following root causes:
[ERROR] No Coder has been manually specified; you may do so using .setCoder().
[ERROR] Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for org.json.JSONObject.
[ERROR] Building a Coder using a registered CoderProvider failed.
[ERROR] See suppressed exceptions for detailed failures.
[ERROR] Using the default output Coder from the producing PTransform failed: PTransform.getOutputCoder called.
基本上错误是说 Beam 找不到 org.json.JSONObject 对象的编码器。我不知道从哪里可以获得这样的编码器或如何构建一个。有什么想法吗?
谢谢!
了解编码器的最佳起点是 Beam 编程指南:Data Encoding and Type Safety。简短的版本是编码器用于指定如何在 Beam 管道中的某些点(通常在阶段边界)将不同类型的数据编码到字节字符串和从字节字符串编码。不幸的是,默认情况下 JSONObjects 没有编码器,因此您有两个选择:
避免在 PCollections 中创建 JSON对象。您可以从 JSON 中提取所需的数据,然后将其作为基本数据类型传递,或者让您自己的 class 封装您需要的数据,而不是在整个管道中传递 JSON 对象。 Java 的基本数据类型都分配了默认编码器,并且可以很容易地为 classes 生成编码器,这些编码器只是这些类型的结构。作为一个附带的好处,这就是 Beam 管道的预期构建方式,因此如果您尽可能坚持使用基本数据和知名编码器,它可能会更优化地工作。
如果需要 JSON对象,您需要为它们创建自定义编码器。编程指南包含有关如何将自定义编码器设置为默认编码器的信息。对于实现本身,使用 JSONObject 的最简单方法是使用 JSONObject.toString 将其编码为 JSON 字符串,然后使用 JSONObject 的字符串构造函数从字符串中对其进行解码。有关如何执行此操作的详细信息,请查看上面的编程指南并查看 Coder documentation。
我正在 Apache Beam 中编写一个数据管道,它从 Pub/Sub 中读取,将消息反序列化为 JSONObjects 并将它们传递到其他一些管道阶段。问题是,当我尝试提交代码时出现以下错误:
An exception occured while executing the Java class. Unable to return a default Coder for Convert to JSON and obfuscate PII data/ParMultiDo(JSONifyAndObfuscate).output [PCollection]. Correct one of the following root causes: [ERROR] No Coder has been manually specified; you may do so using .setCoder(). [ERROR] Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for org.json.JSONObject. [ERROR] Building a Coder using a registered CoderProvider failed. [ERROR] See suppressed exceptions for detailed failures. [ERROR] Using the default output Coder from the producing PTransform failed: PTransform.getOutputCoder called.
基本上错误是说 Beam 找不到 org.json.JSONObject 对象的编码器。我不知道从哪里可以获得这样的编码器或如何构建一个。有什么想法吗?
谢谢!
了解编码器的最佳起点是 Beam 编程指南:Data Encoding and Type Safety。简短的版本是编码器用于指定如何在 Beam 管道中的某些点(通常在阶段边界)将不同类型的数据编码到字节字符串和从字节字符串编码。不幸的是,默认情况下 JSONObjects 没有编码器,因此您有两个选择:
避免在 PCollections 中创建 JSON对象。您可以从 JSON 中提取所需的数据,然后将其作为基本数据类型传递,或者让您自己的 class 封装您需要的数据,而不是在整个管道中传递 JSON 对象。 Java 的基本数据类型都分配了默认编码器,并且可以很容易地为 classes 生成编码器,这些编码器只是这些类型的结构。作为一个附带的好处,这就是 Beam 管道的预期构建方式,因此如果您尽可能坚持使用基本数据和知名编码器,它可能会更优化地工作。
如果需要 JSON对象,您需要为它们创建自定义编码器。编程指南包含有关如何将自定义编码器设置为默认编码器的信息。对于实现本身,使用 JSONObject 的最简单方法是使用 JSONObject.toString 将其编码为 JSON 字符串,然后使用 JSONObject 的字符串构造函数从字符串中对其进行解码。有关如何执行此操作的详细信息,请查看上面的编程指南并查看 Coder documentation。