当我尝试从检查点重新启动作业时,spark 广播变量的类型应该是数字还是字符串
Should spark broadcast variables' type be number or string when I try to restart a job from checkpoint
当我将一个集合设置为广播变量时,它总是返回给我序列化错误,我已经尝试过Map,HashMap,Array,都失败了
这是 Spark 的一个已知错误:https://issues.apache.org/jira/browse/SPARK-5206
您可以使用单例对象让每个执行程序自己加载数据。
您可以检查 https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java 以获取完整示例:
class JavaWordBlacklist {
private static volatile Broadcast<List<String>> instance = null;
public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
if (instance == null) {
synchronized (JavaWordBlacklist.class) {
if (instance == null) {
List<String> wordBlacklist = Arrays.asList("a", "b", "c");
instance = jsc.broadcast(wordBlacklist);
}
}
}
return instance;
}
}
public static void main(String[] args) throws Exception {
...
Function0<JavaStreamingContext> createContextFunc =
() -> createContext(ip, port, checkpointDirectory, outputPath);
JavaStreamingContext ssc =
JavaStreamingContext.getOrCreate(checkpointDirectory, createContextFunc);
ssc.start();
}
private static JavaStreamingContext createContext(String ip,
int port,
String checkpointDirectory,
String outputPath) {
SparkConf sparkConf = new SparkConf().setAppName("JavaRecoverableNetworkWordCount");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
ssc.checkpoint(checkpointDirectory);
...
wordCounts.foreachRDD((rdd, time) -> {
// Get or register the blacklist Broadcast
Broadcast<List<String>> blacklist =
JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()))
...
}
...
}
当我将一个集合设置为广播变量时,它总是返回给我序列化错误,我已经尝试过Map,HashMap,Array,都失败了
这是 Spark 的一个已知错误:https://issues.apache.org/jira/browse/SPARK-5206
您可以使用单例对象让每个执行程序自己加载数据。 您可以检查 https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java 以获取完整示例:
class JavaWordBlacklist {
private static volatile Broadcast<List<String>> instance = null;
public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
if (instance == null) {
synchronized (JavaWordBlacklist.class) {
if (instance == null) {
List<String> wordBlacklist = Arrays.asList("a", "b", "c");
instance = jsc.broadcast(wordBlacklist);
}
}
}
return instance;
}
}
public static void main(String[] args) throws Exception {
...
Function0<JavaStreamingContext> createContextFunc =
() -> createContext(ip, port, checkpointDirectory, outputPath);
JavaStreamingContext ssc =
JavaStreamingContext.getOrCreate(checkpointDirectory, createContextFunc);
ssc.start();
}
private static JavaStreamingContext createContext(String ip,
int port,
String checkpointDirectory,
String outputPath) {
SparkConf sparkConf = new SparkConf().setAppName("JavaRecoverableNetworkWordCount");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
ssc.checkpoint(checkpointDirectory);
...
wordCounts.foreachRDD((rdd, time) -> {
// Get or register the blacklist Broadcast
Broadcast<List<String>> blacklist =
JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()))
...
}
...
}