org.apache.spark.SparkException: 任务不可序列化 - JavaSparkContext
org.apache.spark.SparkException: Task not serializable - JavaSparkContext
我正在尝试 运行 以下简单的 Spark 代码:
Gson gson = new Gson();
JavaRDD<String> stringRdd = jsc.textFile("src/main/resources/META-INF/data/supplier.json");
JavaRDD<SupplierDTO> rdd = stringRdd.map(new Function<String, SupplierDTO>()
{
private static final long serialVersionUID = -78238876849074973L;
@Override
public SupplierDTO call(String str) throws Exception
{
return gson.fromJson(str, SupplierDTO.class);
}
});
但是在执行 stringRdd.map
语句时抛出以下错误:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1478)
at org.apache.spark.rdd.RDD.map(RDD.scala:288)
at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:78)
at org.apache.spark.api.java.JavaRDD.map(JavaRDD.scala:32)
at com.demo.spark.processor.cassandra.CassandraDataUploader.uploadData(CassandraDataUploader.java:71)
at com.demo.spark.processor.cassandra.CassandraDataUploader.main(CassandraDataUploader.java:47)
Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 7 more
这里 'jsc' 是我正在使用的 JavaSparkContext
对象。
据我所知,JavaSparkContext
不是 Serializable
对象,不应在将发送给 Spark worker 的任何函数中使用它。
现在,我无法理解的是,JavaSparkContext
的实例是如何发送给工人的?我应该在我的代码中更改什么以避免这种情况?
gson
引用是 'pulling' 外部 class 进入闭包的范围,并带有它的完整对象图。
在这种情况下,在闭包中创建 gson 对象:
public SupplierDTO call(String str) throws Exception {
Gson gson = Gson();
return gson.fromJson(str, SupplierDTO.class);
}
你也可以声明spark上下文transient
如果创建 Gson 实例的成本很高,请考虑使用 mapPartitions
而不是 map
。
对我来说,我使用以下选项之一解决了这个问题:
- 如上所述,通过将SparkContext声明为
transient
- 您也可以尝试将对象 gson 设为静态
static Gson gson = new Gson();
请参考文档Job aborted due to stage failure: Task not serializable
查看解决此问题的其他可用选择
您可以使用以下代码代替第 9 行。(return gson.fromJson(str, SupplierDTO.class);
)
return new Gson().fromJson(str, SupplierDTO.class);//this is correct
并删除第 1 行。(Gson gson = new Gson();
)
我正在尝试 运行 以下简单的 Spark 代码:
Gson gson = new Gson();
JavaRDD<String> stringRdd = jsc.textFile("src/main/resources/META-INF/data/supplier.json");
JavaRDD<SupplierDTO> rdd = stringRdd.map(new Function<String, SupplierDTO>()
{
private static final long serialVersionUID = -78238876849074973L;
@Override
public SupplierDTO call(String str) throws Exception
{
return gson.fromJson(str, SupplierDTO.class);
}
});
但是在执行 stringRdd.map
语句时抛出以下错误:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1478)
at org.apache.spark.rdd.RDD.map(RDD.scala:288)
at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:78)
at org.apache.spark.api.java.JavaRDD.map(JavaRDD.scala:32)
at com.demo.spark.processor.cassandra.CassandraDataUploader.uploadData(CassandraDataUploader.java:71)
at com.demo.spark.processor.cassandra.CassandraDataUploader.main(CassandraDataUploader.java:47)
Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 7 more
这里 'jsc' 是我正在使用的 JavaSparkContext
对象。
据我所知,JavaSparkContext
不是 Serializable
对象,不应在将发送给 Spark worker 的任何函数中使用它。
现在,我无法理解的是,JavaSparkContext
的实例是如何发送给工人的?我应该在我的代码中更改什么以避免这种情况?
gson
引用是 'pulling' 外部 class 进入闭包的范围,并带有它的完整对象图。
在这种情况下,在闭包中创建 gson 对象:
public SupplierDTO call(String str) throws Exception {
Gson gson = Gson();
return gson.fromJson(str, SupplierDTO.class);
}
你也可以声明spark上下文transient
如果创建 Gson 实例的成本很高,请考虑使用 mapPartitions
而不是 map
。
对我来说,我使用以下选项之一解决了这个问题:
- 如上所述,通过将SparkContext声明为
transient
- 您也可以尝试将对象 gson 设为静态
static Gson gson = new Gson();
请参考文档Job aborted due to stage failure: Task not serializable
查看解决此问题的其他可用选择
您可以使用以下代码代替第 9 行。(return gson.fromJson(str, SupplierDTO.class);
)
return new Gson().fromJson(str, SupplierDTO.class);//this is correct
并删除第 1 行。(Gson gson = new Gson();
)