Spark 关闭问题中 Jackson ObjectNode 的 NotSerializableException
NotSerializableException for Jackson ObjectNode in Spark closure issue
假设我有以下 Java 映射到 Jackson 完整数据绑定的对象:
public class Student implements Serializable{
private ObjectNode name; // two keys: "first_name", "last_name"
// getter and setter ...
}
我有以下 Spark 代码,由于作用域不同,它尝试序列化 Student
类型的闭包变量 student
。
class A(student : Student) extends Serializable {
def process(input: DataFrame): Unit = {
val test = input.map { a =>
print(student)
}
}
}
出现以下错误:Caused by: java.io.NotSerializableException: com.fasterxml.jackson.databind.node.ObjectNode
我明白为什么会出现这样的错误。基本上,Spark 将尝试序列化所有超出范围的变量 a.k.a。闭包并将其传递给每个执行者。但由于 ObjectNode 本身不是 Serializable
,执行程序无法获取 Student
个实例。
我的问题是,有什么方法可以解决这个问题?
我曾尝试使用 Map<String, String>
而不是 ObjectNode
,但由于 ObjectNode
的 put
和 set
只能有 "primitives"和 JsonNode
作为值,当我尝试这样的事情时它会导致错误:
ObjectNode meta_info = JsonNodeFactory.instance.objectNode();
meta_info.set("field name", student.getName());
有几个选项。
如果您只需要对象节点用于 json 序列化目的,那么您可以重写 Student
class 并完全删除 ObjectNode
。在您的示例中,您可以使用 firstName
和 lastName
字段
将其替换为对象
class Name implements Serializable {
String firstName;
String lastName;
}
但是,如果这不可能,您可以像这样进行自定义序列化
public class Student implements Serializable {
private transient ObjectNode name;
private void writeObject(ObjectOutputStream out) throws IOException {
ObjectMapper mapper = new ObjectMapper();
out.writeUTF(mapper.writeValueAsString(name));
// other fields here
}
private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException {
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.readTree(in.readUTF());
if (!node.isObject()) {
throw new IOException("malformed name field detected");
}
name = (ObjectNode) node;
// read other fields
}
}
在我的示例中,我将对象节点序列化为 json 字符串,但您当然可以遍历对象节点字段,分别存储每个字段。
您可以在 ObjectOutputStream
javadoc 中阅读有关自定义序列化的更多信息。
您也可以尝试不同的 data serializers like Kryo。
我最终将 student
设为 Map<String, String>
对象,并在需要时将其设为 mapper.convertValue(student, ObjectNode.class
ObjectNode
升级到 jackson-databind 到 10.x 有帮助,参考这个 https://github.com/FasterXML/jackson-databind/issues/18
假设我有以下 Java 映射到 Jackson 完整数据绑定的对象:
public class Student implements Serializable{
private ObjectNode name; // two keys: "first_name", "last_name"
// getter and setter ...
}
我有以下 Spark 代码,由于作用域不同,它尝试序列化 Student
类型的闭包变量 student
。
class A(student : Student) extends Serializable {
def process(input: DataFrame): Unit = {
val test = input.map { a =>
print(student)
}
}
}
出现以下错误:Caused by: java.io.NotSerializableException: com.fasterxml.jackson.databind.node.ObjectNode
我明白为什么会出现这样的错误。基本上,Spark 将尝试序列化所有超出范围的变量 a.k.a。闭包并将其传递给每个执行者。但由于 ObjectNode 本身不是 Serializable
,执行程序无法获取 Student
个实例。
我的问题是,有什么方法可以解决这个问题?
我曾尝试使用 Map<String, String>
而不是 ObjectNode
,但由于 ObjectNode
的 put
和 set
只能有 "primitives"和 JsonNode
作为值,当我尝试这样的事情时它会导致错误:
ObjectNode meta_info = JsonNodeFactory.instance.objectNode();
meta_info.set("field name", student.getName());
有几个选项。
如果您只需要对象节点用于 json 序列化目的,那么您可以重写 Student
class 并完全删除 ObjectNode
。在您的示例中,您可以使用 firstName
和 lastName
字段
class Name implements Serializable {
String firstName;
String lastName;
}
但是,如果这不可能,您可以像这样进行自定义序列化
public class Student implements Serializable {
private transient ObjectNode name;
private void writeObject(ObjectOutputStream out) throws IOException {
ObjectMapper mapper = new ObjectMapper();
out.writeUTF(mapper.writeValueAsString(name));
// other fields here
}
private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException {
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.readTree(in.readUTF());
if (!node.isObject()) {
throw new IOException("malformed name field detected");
}
name = (ObjectNode) node;
// read other fields
}
}
在我的示例中,我将对象节点序列化为 json 字符串,但您当然可以遍历对象节点字段,分别存储每个字段。
您可以在 ObjectOutputStream
javadoc 中阅读有关自定义序列化的更多信息。
您也可以尝试不同的 data serializers like Kryo。
我最终将 student
设为 Map<String, String>
对象,并在需要时将其设为 mapper.convertValue(student, ObjectNode.class
ObjectNode
升级到 jackson-databind 到 10.x 有帮助,参考这个 https://github.com/FasterXML/jackson-databind/issues/18