Spark 闭包行为

Spark closures behavior

我有一个关于 spark 闭包的基本问题。我无法区分场景 2 和 3 之间的代码行为,两者都产生相同的输出,但根据我的理解,场景 3 不应按预期工作。

以下代码适用于所有场景:

class A implements Serializable{
        String t;
        A(String t){
            this.t=t;
        }
    }

//Initiaze spark context
JavaSparkContext context=....
//create rdd
JavaRDD<String> rdd = context.parallelize(Arrays.asList("a","b","c","d","e"),3);

场景 1:不要这样做,因为 A 在驱动程序中初始化并且在执行程序中不可见。

A a=new A("pqr");
rdd.map(i->i+a.t).collect();

场景二:分享对象推荐方式

Broadcast<A> broadCast = context.broadcast(new A("pqr"));
rdd.map(i->broadCast.getValue().t+i).collect();
//output: [pqra, pqrb, pqrc, pqrd, pqre]

场景 3:为什么即使我在驱动程序中启动 A,此代码也能按预期工作?

 class TestFunction implements Function<String, String>, Serializable {
    private A val;
    public TestFunction(){ }
    public TestFunction(A a){
        this.val = a;
    }
    @Override
    public String call(String integer) throws Exception {
        return val.t+integer;
    }
}
    TestFunction mapFunction = new TestFunction(new A("pqr"));
    System.out.println(rdd.map(mapFunction).collect());
    //output: [pqra, pqrb, pqrc, pqrd, pqre]

注意:我是运行集群模式下的程序

场景 1 和场景 3 生成的 Java 字节码几乎相同。使用 Broadcast(场景 2)的好处是广播对象只会被发送给一个执行器一次,并在该执行器上的其他任务中重用它。 Scenerio 1 & 3 总是将对象 A 发送给每个任务的执行者。