如果您在反应式 lambda 构造之间传递有状态对象,您是否需要关心线程可见性?

If you pass around a stateful object between reactive lambda constructs do you need to care about thread visibility?

假设我们正在使用 RxJava 或 Reactor,我们有一个包含几个属性的对象,这些属性可以通过 setter 来改变。假设我们 运行 在不同的线程池上执行某些操作。我们在我们的管道中多次使用这个状态对象并改变它的属性。我们还需要 "volatile" 来确保缓存的版本不被返回吗?或者有一种机制与管道的顺序性质一起确保修改后的对象通过 .flatMap 返回,例如?我们是否需要让这个对象不可变并在每次修改时重新创建?

如果您在多个管道中修改单个对象,请考虑以下事项:

// this example is in Kotlin but should be fairly analagous to java
fun main() {
    var count = 0

    Flowable.range(0, 1000)
        // splits the stream into "parallel pipelines"
        .parallel()
        .runOn(Schedulers.computation())
        .doAfterNext { count += 1 }
        // merge back into a single pipeline
        .sequential()
        .subscribe()

    println(count)
}

结果会有所不同,但在我的计算机上它会打印 600 - 850

中的任何内容

为了示例的缘故,我们可以通过使用 AtomicInteger 代替所用的 Int 来解决此问题的两种方法。更好的办法是不存储任何计数变量,只对每个并行进程执行加法,然后在我们返回一个流后将结果与 sequential 合并,但我不确定这是否适用于你的问题。

回顾一下,如果您要修改任意数量的对象,但一次只能在一个管道中修改,那么您不必担心线程问题,因为修改仅限于单个线程。如果它同时 分成多个管道,那么您确实需要担心这一点,就像通常使用任何多线程应用程序一样。解决方案可能涉及使用 Java 中可用的锁或同步对象,或者可能有一种更具反应性的方法来避免这些问题(比如一次只在一个管道中处理),但是如果没有具体例子。

只要你有从不同线程读取和写入的状态,你就需要使用一种同步机制来确保一个线程"sees"其他线程操作的影响。

如果没有适当的同步,线程可能会看到部分效果或以意外顺序看到效果。

有关详细信息,请参阅 https://docs.oracle.com/javase/specs/jls/se8/html/jls-17.html#jls-17.4.5

所以你应该确保对你的状态对象的所有访问都正确同步。或者,您可以避免管道内的状态修改。改用不可变数据结构。