为什么数据没有添加到地图中,而可以使用相同的函数将数据集打印到控制台。我在这里做错了什么?

Why is the data not getting added to the map while the same function can by used to print dataset to console. What am I doing wrong here?

dataset = dataset.withColumn("Probability", callUDF("checkProb", col("Confirmed"), col("Population")));
    
Map<String, Double> probability= new HashMap<>();
ArrayList<String> a =new ArrayList<>();
dataset= dataset.limit(35);
dataset.show(36);
dataset.foreach((ForeachFunction<Row>) row -> a.add(row.getAs("State").toString()));
                    
System.out.println(a.size());

无论我做什么,尺寸都显示为 0。我已经尝试过 arraylist 和 map 但没有用。

Spark 将工作负载分配给 ArrayList 的不同 . The driver process provides a copy of each local variable for each executor. This copy is independent of the original variable and if an executor alters the copy, the original variable stays unchanged. foreach is run by the executors, and each executor gets its own copy of a. You can see that if you print the identityHashCode:

ArrayList<String> a = new ArrayList<>();
dataset = dataset.limit(35);
dataset.show(36);
System.out.println("a in the driver process: " + System.identityHashCode(a));
dataset.foreach((ForeachFunction<Row>) row -> {
    a.add(row.getAs("value").toString());
    System.out.println("a on an executor " + System.identityHashCode(a));
});
System.out.println("back in the driver process: " + System.identityHashCode(a));

打印

a in the driver process: 1859780907
a on an executor 229101481
a on an executor 2105534525
a on an executor 1982276971
back in the driver process: 1859780907

因此,您调用 size()ArrayList 永远不会改变。

顺便说一句:在执行程序上使用驱动程序的局部变量是一种不好的做法,因为这可能会导致(不仅是性能)问题。您应该考虑使用 broadcast variables and accumulators.