原始 java 代码在 Spark 中究竟在哪里执行?

Where exactly the raw java code execute in Spark?

我是 Spark 的新手,我知道 Spark 通常会序列化函数并将其发送给所有执行程序并处理 HDFS 中可用的数据块。但是如果我有下面的代码,

Random random = new Random(); //statement A
int randomValue = random.nextInt(); //statement B
JavaPairRDD<String, Integer> pairRDD = mapRandom.mapToPair(s -> {
    return new Tuple2<>(s, randomValue);
});

AB运行的语句到底在哪里?当然,它不会在每个执行器中执行它,因为每个执行器 运行 都有自己的 JVM,我发现每个值 (s) 都映射到完全相同的 randomValue.

这是否意味着驱动程序中的这些语句AB 运行 并被复制到所有执行程序?

驱动程序上的 A 和 B 运行 并发送给执行程序。

如果你想 运行 它在执行者中,你应该这样做:

JavaPairRDD<String, Integer> pairRDD = mapRandom.mapToPair(s -> {
    Random random = new Random(); //statement A
    int randomValue = random.nextInt(); //statement B
    return new Tuple2<>(s, randomValue);
});

RDD 是分布式的(因此得名)但它在驱动程序代码中定义,因此任何不与 RDD 内容交互的代码都是在驱动程序上执行的代码(根据经验),而任何代码与 RDD 内容有关的内容将在执行程序上 运行。

如您所见,驱动程序会计算您的 randomValue 一次并将其发送给所有执行程序,因为 mapToPair lambda 参数会关闭该计算值。

在 lambda 中(仅)移动 random.nextInt() 调用将触发对分布式集合中的每个元素执行该调用。此外,random 值本身将被序列化并通过网络发送。

将随机创建本身移动到 lambda 中会使其更小(没有外部状态可捕获)但会为分布式集合中的每个元素创建一个新的 Random 实例,这显然不是最优的。

要让每个执行程序有一个随机实例值,您可以使它成为每个JVM/executor初始化一次的静态成员(或惰性单例)。要为每个(比如说)分区设置不同的随机值,您可能应该使用 forEachPartition 或类似的方法,使用 nextInt() 生成一个新值并将该值用于该分区中的所有元素。

查看更高级别的 DataFrame/SQL API,因为您可能会发现更容易实现您想要的内容,甚至不必担心代码的执行位置。