HashMap 作为 Spark Streaming 中的广播变量?

HashMap as a Broadcast Variable in Spark Streaming?

我有一些数据需要在spark streaming中分类。分类键值在程序开始时加载到 HashMap 中。因此,每个传入的数据包都需要与这些密钥进行比较并相应地标记。

我知道 spark 有称为广播变量和累加器的变量来分发对象。教程中的示例使用简单的变量,例如 etc。

如何使用 HashMap 在所有 spark worker 上共享我的 HashMap。或者,有更好的方法吗?

我正在 Java 中编写我的 spark 流应用程序。

在 spark 中,您可以用相同的方式广播任何可序列化的对象。这是最好的方法,因为您只需将数据发送给工作人员一次,然后就可以在任何任务中使用它。

斯卡拉:

val br = ssc.sparkContext.broadcast(Map(1 -> 2))

Java:

Broadcast<HashMap<String, String>> br = ssc.sparkContext().broadcast(new HashMap<>());

这是一个更好的示例,说明如何在 Java 中广播 HashMap:

在您的 Spark 应用程序中,您将创建或加载一个 HashMap。 然后使用 Sparksession 广播那个 HashMap。

HashMap<String,String> bcMap = new HashMap();
bcMap.put("key1","val1");
bcMap.put("key2","val2");

Broadcast<HashMap> bcVar = this.sparkSession.sparkContext().broadcast(bncFlowConflg, classTag(HashMap.class));

您需要以下 class 来创建 class 标签:

private static <T> ClassTag<T> classTag(Class<T> clazz) {
    return scala.reflect.ClassManifestFactory.fromClass(clazz);
}

并且可以参考Spark函数中的广播,比如map如下:

HashMap<String,String> bcVal = bcVar .getValue();