如何从 spark 中设置和获取静态变量?

How to set and get static variables from spark?

我有一个 class 是这样的:

public class Test {
    private static String name;

    public static String getName() {
        return name;
    }

    public static void setName(String name) {
        Test.name = name;
    }

    public static void print() {
        System.out.println(name);
    }

}

在我的 Spark 驱动程序中,我这样设置名称并调用 print() 命令:

public final class TestDriver{

    public static void main(String[] args) throws Exception {
        SparkConf sparkConf = new SparkConf().setAppName("TestApp");
        // ...
        // ...
        Test.setName("TestName")
        Test.print();
        // ...
    }
}

但是,我收到 NullPointerException。如何将值传递给全局变量并使用它?

驱动程序进程中的 class 副本不是执行程序中的副本。它们不在同一个 ClassLoader 中,甚至不在同一个 JVM 中,甚至不在同一台机器上。在驱动程序上设置静态变量对其他副本没有任何作用,因此您会远程发现它为空。

好的,基本上有两种方法可以将 master 已知的值提供给执行者:

  1. 将值放入闭包中,以序列化到执行程序以执行任务。这是最常见的一种,非常simple/elegant。示例和文档 here.
  2. 用数据创建一个广播变量。这对于大尺寸的不可变数据很有用,所以你要保证它只发送一次。如果一遍又一遍地使用相同的数据也很好。示例和文档 here.

在这两种情况下都不需要使用静态变量。但是,如果您确实希望执行程序 VM 上有可用的静态值,则需要执行以下操作之一:

  1. 如果值是固定的或者配置在执行节点上可用(在 jar 中等),那么你可以有一个惰性 val,保证只初始化一次。
  2. 您可以使用使用上述两个选项之一的代码调用 mapPartitions(),然后将值存储在您的静态 variable/object 中。 mapPartitions 保证每个分区只 运行 一次(比每行一次好得多)并且对这种事情(初始化数据库连接等)很有用。

希望对您有所帮助!

P.S:至于你的例外情况:我只是没有在该代码示例中看到它,我敢打赌它发生在其他地方。


编辑以进一步说明:惰性 val 解决方案只是 Scala,不涉及 Spark...

object MyStaticObject
{
  lazy val MyStaticValue = {
     // Call a database, read a file included in the Jar, do expensive initialization computation, etc
     4
  }
} 

由于每个Executor对应一个JVM,一旦类被加载,MyStaticObject就会被初始化。 lazy 关键字保证 MyStaticValue 变量只会在第一次实际请求时被初始化,并从那以后一直保持它的值。

我想再添加一种方法,只有当您有一些变量在运行时作为参数传递时,这种方法才有意义。

spark 配置 --> --conf "spark.executor.extraJavaOptions=-DcutomField=${value}" 和 当您需要 transformations 中的数据时,您可以调用 System.getProperty("cutomField");

您可以找到更多详细信息here

注意:当我们有大量变量时,上面的讨论没有意义 .在那些情况下,我更喜欢@Daniel Langdon 方法。

我想在丹尼尔的回答中再补充一点

当使用 static 关键字声明变量时,JVM 在 class 加载期间加载它,因此如果您创建一个 jar 并在 Java /scala [=15= 中设置静态字段的初始值] 都存放在罐子里,工人可以直接使用。但是如果你在driver程序中改变一个static field的值,workers只能看到赋值到Jar中的初始值,你改变的值不会反映出来,所以你需要重新copy new jar或者需要copy class手动进入所有执行者。