检查点 SqlContext nullpointerException 问题
checkpoint SqlContext nullpointerException issue
我在我的应用程序中使用检查点,当我的应用程序开始失败时,我在 SQLContext
上收到 NullPointerException
。
我假设由于 serialization/deserialization 问题,应用程序无法恢复 SQLContext
。 SQLContext
不是可序列化的吗?
下面是我的代码
//DriverClass
final JavaSparkContext javaSparkCtx = new JavaSparkContext(conf);
final SQLContext sqlContext = new SQLContext(javaSparkCtx);
JavaStreamingContextFactory javaStreamingContextFactory = new JavaStreamingContextFactory() {
@Override
public JavaStreamingContext create() { //only first time executed
// TODO Auto-generated method stub
JavaStreamingContext jssc = new JavaStreamingContext(javaSparkCtx, Durations.minutes(1));
jssc.checkpoint(CHECKPOINT_DIRECTORY);
HashMap < String, String > kafkaParams = new HashMap < String, String > ();
kafkaParams.put("metadata.broker.list",
"abc.xyz.localdomain:6667");
//....
JavaDStream < String > fullMsg = messages
.map(new MapFunction());
fullMsg.foreachRDD(new SomeClass(sqlContext));
return jssc;
}
};
}
//Closure Class
public class SomeClass implements Serializable, Function < JavaRDD < String > , Void > {
SQLContext sqlContext;
public SomeClass(SQLContext sqlContext) {
// TODO Auto-generated constructor stub
this.sqlContext = sqlContext;
}
public void doSomething() {
this.sqlContext.createDataFrame();**// here is the nullpointerException**
}
//.......
}
SQLContext
是可序列化的,因为 Spark SQL 需要在执行端内部使用 SQLContext
。但是,您不应将其序列化到 Streaming 检查点。相反,您应该像这样 SQLContext sqlContext = SQLContext.getOrCreate(rdd.context());
从 rdd 获取它
有关更多详细信息,请参阅流媒体文档:http://spark.apache.org/docs/1.6.1/streaming-programming-guide.html#dataframe-and-sql-operations
我在我的应用程序中使用检查点,当我的应用程序开始失败时,我在 SQLContext
上收到 NullPointerException
。
我假设由于 serialization/deserialization 问题,应用程序无法恢复 SQLContext
。 SQLContext
不是可序列化的吗?
下面是我的代码
//DriverClass
final JavaSparkContext javaSparkCtx = new JavaSparkContext(conf);
final SQLContext sqlContext = new SQLContext(javaSparkCtx);
JavaStreamingContextFactory javaStreamingContextFactory = new JavaStreamingContextFactory() {
@Override
public JavaStreamingContext create() { //only first time executed
// TODO Auto-generated method stub
JavaStreamingContext jssc = new JavaStreamingContext(javaSparkCtx, Durations.minutes(1));
jssc.checkpoint(CHECKPOINT_DIRECTORY);
HashMap < String, String > kafkaParams = new HashMap < String, String > ();
kafkaParams.put("metadata.broker.list",
"abc.xyz.localdomain:6667");
//....
JavaDStream < String > fullMsg = messages
.map(new MapFunction());
fullMsg.foreachRDD(new SomeClass(sqlContext));
return jssc;
}
};
}
//Closure Class
public class SomeClass implements Serializable, Function < JavaRDD < String > , Void > {
SQLContext sqlContext;
public SomeClass(SQLContext sqlContext) {
// TODO Auto-generated constructor stub
this.sqlContext = sqlContext;
}
public void doSomething() {
this.sqlContext.createDataFrame();**// here is the nullpointerException**
}
//.......
}
SQLContext
是可序列化的,因为 Spark SQL 需要在执行端内部使用 SQLContext
。但是,您不应将其序列化到 Streaming 检查点。相反,您应该像这样 SQLContext sqlContext = SQLContext.getOrCreate(rdd.context());
有关更多详细信息,请参阅流媒体文档:http://spark.apache.org/docs/1.6.1/streaming-programming-guide.html#dataframe-and-sql-operations