是否可以在不使用 spark 中的累加器的情况下创建可变共享数据结构?

Is it possible to create a mutable shared data structure without using accumulators in spark?

我是 spark 的新手,有些事情我不太清楚。但是基本知识表明,只有累加器是可变变量,可以跨执行程序更新,并且它的值可以由驱动程序检索。在代码中初始化的任何其他变量,它们在执行程序之间更新更新的值不会中继回驱动程序,因为它们是单独的 JVM。

我正在处理一个项目的一部分,该项目将来自 zookeeper 的偏移量存储在数据结构中以供将来使用。由于偏移量是在执行程序上获得的,所以几乎不可能有一个共享数据结构来将每个分区的偏移量更新回驱动程序,就像 well.That 一样,直到我在 https://spark.apache.org/docs/2.3.0/streaming-kafka-0-8-integration.html 中遇到这段代码。

AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();
directKafkaStream.transformToPair(rdd -> { 
    OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
    offsetRanges.set(offsets);    return rdd;
}).map(
    ...
    ).foreachRDD(rdd -> {    for (OffsetRange o : offsetRanges.get()) {
        System.out.println(
            o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
        );}    
        ...
    });
System.out.println(Arrays.toString(offsetRanges.get()));

这与基本理论相矛盾,因为当我在我的驱动程序中访问 AtomicReference<OffsetRange[]> offsetRanges 的值时,我得到了正确的更新值(在执行程序代码中的 transformToPair 方法中更新),即使它应该 return 我一个空的或空的回应。有人可以向我解释这种行为吗?

Is it possible to create a mutable shared data structure without using accumulators in spark?

没有

This contradicts the underlying theory as when I access the value of

不是,因为驱动程序外部没有修改该值。 transformToPair 的闭包是在驱动程序而不是执行程序上执行的。

因此 offsetRanges.set(offsets) 在原始 offsetRanges 值所在的同一 JVM 上执行。