在 Rx 中保持可观察状态的最佳方式

Optimal way for keeping an observable state in Rx

我有一个问题,关于为了保持可观察状态,更好的 RxJava 模式是什么。

为简单起见,假设我们有一个 StateManager class 需要跟踪系统中的某个状态(假设它是一个简单的布尔标志)并将其暴露在一种可观察的方式。因此,它将有一个如下所示的方法:

class StateManager {
    Observable<Boolean> state();
    ...
}

此管理器的生命周期很长,可能有多个 "clients"(例如视图、其他管理器等)可以随时订阅或取消订阅。状态会根据一些内部事件而改变。

处理这个问题最明显的方法是将状态保存在 BehaviourSubject 消费者直接挂钩的状态中:

class StateManager {

    Subject mStateSubject = BehaviourSubject.create(true);        

    Observable<Boolean> state() {
        return mStateSubject.asObservable();
    }    
    ...
}

有更好的方法吗?

Subjects 可能是使用反应式库最不理想的方式,尽管它确实可以工作。

Functional-reactive 编程在没有状态的情况下效果最好。 Subjects 是一种状态。我建议更改您的代码,以便将 Observable 定义为功能运算符的组合。这使得测试和管理您的可观察对象发出的消息变得容易。


我更像是一名 C# 开发人员,所以我希望您能原谅不同的语法。这是一个例子:

void Main()
{
    var tracker = new AddTracker();
    tracker.getSums().Subscribe(i => Console.WriteLine(i));
    Observable.Interval(TimeSpan.FromMilliseconds(100))
        .Timestamp()
        .Select(t => t.Timestamp.Second)
        .Take(20)
        .Subscribe(i => tracker.setA(i % 7));

    Observable.Interval(TimeSpan.FromMilliseconds(75))
        .Timestamp()
        .Select(t => t.Timestamp.Millisecond)
        .Take(30)
        .Subscribe(i => tracker.setB(i % 9));

}

public class AddTracker
{
    private readonly ISubject<int> _a;
    private readonly ISubject<int> _b;
    private readonly IObservable<int> _sums;
    private readonly IDisposable _dummySub;

    public AddTracker()
    {
        _a = new BehaviorSubject<int>(0);
        _b = new BehaviorSubject<int>(0);
        _sums = _a
            .CombineLatest(_b, (a, b) => a + b)
            .Replay(1)
            .RefCount();
        _dummySub = _sums.Subscribe(_ => { });
    }

    public void setA(int value)
    {
        _a.OnNext(value);
    }

    public void setB(int value)
    {
        _b.OnNext(value);
    }

    public IObservable<int> getSums()
    {
        return _sums;
    }
}

在 C# 领域,您可以将 _a_b 主题换成事件,这是一个轻微的改进。我知道 Java 中没有 first-class 事件,所以我不确定是否可以翻译。

然而,更根本的是,在 C# 和 Java 中,您应该问的问题是...导致 setAsetB 调用的原因是什么?你能用这个代替它们吗:

void Main()
{
    var aStream = Observable.Interval(TimeSpan.FromMilliseconds(100))
        .Timestamp()
        .Select(t => t.Timestamp.Second)
        .Take(20);

    var bStream = Observable.Interval(TimeSpan.FromMilliseconds(75))
        .Timestamp()
        .Select(t => t.Timestamp.Millisecond)
        .Take(30);

    var tracker = new AddTracker(aStream, bStream);
    tracker.getSums().Subscribe(i => Console.WriteLine(i));

}

public class AddTracker
{
    private readonly IObservable<int> _sums;
    private readonly IDisposable _dummySub;

    public AddTracker(IObservable<int> a, IObservable<int> b)
    {
        _sums = a
            .CombineLatest(b, (aItem, bItem) => (aItem % 9) + (bItem % 7))
            .Replay(1)
            .RefCount();
        _dummySub = _sums.Subscribe(_ => {});
    }

    public IObservable<int> getSums()
    {
        return _sums;
    }
}

简而言之,如果必须,请从科目开始。然后带上你的主题,让他们尽可能远离你的逻辑。

所描述的情况就是所谓的 'Hot' 可观察 - 其生产者(排放源)在订阅之外创建的可观察, (如 Ben Lesh 在 Hot vs Cold Observables 中所述 - 推荐阅读)。

正如 Shlomo 所说,Subjects 是 Rx 世界的 "mutable variables",您可以通过使用 Obsevable.create(它监听事件并产生基于它们的发射),然后使用将其转换为 ConnectableObservable 的运算符(如共享、发布)使其 'hot',以便将其多播给在不同时间订阅的多个观察者。

但是,在这种情况下,由于生产者在您的 class 本地(事件由此 class 生成),因此可以为此目的使用 Subject,因为您的 class 本身就是排放源和产生事件的 mutual/state 变量。 (基于 answer given by Erik Meijer, and this blog post