在 Rx 中保持可观察状态的最佳方式
Optimal way for keeping an observable state in Rx
我有一个问题,关于为了保持可观察状态,更好的 RxJava 模式是什么。
为简单起见,假设我们有一个 StateManager
class 需要跟踪系统中的某个状态(假设它是一个简单的布尔标志)并将其暴露在一种可观察的方式。因此,它将有一个如下所示的方法:
class StateManager {
Observable<Boolean> state();
...
}
此管理器的生命周期很长,可能有多个 "clients"(例如视图、其他管理器等)可以随时订阅或取消订阅。状态会根据一些内部事件而改变。
处理这个问题最明显的方法是将状态保存在 BehaviourSubject
消费者直接挂钩的状态中:
class StateManager {
Subject mStateSubject = BehaviourSubject.create(true);
Observable<Boolean> state() {
return mStateSubject.asObservable();
}
...
}
有更好的方法吗?
Subjects
可能是使用反应式库最不理想的方式,尽管它确实可以工作。
Functional-reactive 编程在没有状态的情况下效果最好。 Subjects
是一种状态。我建议更改您的代码,以便将 Observable 定义为功能运算符的组合。这使得测试和管理您的可观察对象发出的消息变得容易。
我更像是一名 C# 开发人员,所以我希望您能原谅不同的语法。这是一个例子:
void Main()
{
var tracker = new AddTracker();
tracker.getSums().Subscribe(i => Console.WriteLine(i));
Observable.Interval(TimeSpan.FromMilliseconds(100))
.Timestamp()
.Select(t => t.Timestamp.Second)
.Take(20)
.Subscribe(i => tracker.setA(i % 7));
Observable.Interval(TimeSpan.FromMilliseconds(75))
.Timestamp()
.Select(t => t.Timestamp.Millisecond)
.Take(30)
.Subscribe(i => tracker.setB(i % 9));
}
public class AddTracker
{
private readonly ISubject<int> _a;
private readonly ISubject<int> _b;
private readonly IObservable<int> _sums;
private readonly IDisposable _dummySub;
public AddTracker()
{
_a = new BehaviorSubject<int>(0);
_b = new BehaviorSubject<int>(0);
_sums = _a
.CombineLatest(_b, (a, b) => a + b)
.Replay(1)
.RefCount();
_dummySub = _sums.Subscribe(_ => { });
}
public void setA(int value)
{
_a.OnNext(value);
}
public void setB(int value)
{
_b.OnNext(value);
}
public IObservable<int> getSums()
{
return _sums;
}
}
在 C# 领域,您可以将 _a
和 _b
主题换成事件,这是一个轻微的改进。我知道 Java 中没有 first-class 事件,所以我不确定是否可以翻译。
然而,更根本的是,在 C# 和 Java 中,您应该问的问题是...导致 setA
和 setB
调用的原因是什么?你能用这个代替它们吗:
void Main()
{
var aStream = Observable.Interval(TimeSpan.FromMilliseconds(100))
.Timestamp()
.Select(t => t.Timestamp.Second)
.Take(20);
var bStream = Observable.Interval(TimeSpan.FromMilliseconds(75))
.Timestamp()
.Select(t => t.Timestamp.Millisecond)
.Take(30);
var tracker = new AddTracker(aStream, bStream);
tracker.getSums().Subscribe(i => Console.WriteLine(i));
}
public class AddTracker
{
private readonly IObservable<int> _sums;
private readonly IDisposable _dummySub;
public AddTracker(IObservable<int> a, IObservable<int> b)
{
_sums = a
.CombineLatest(b, (aItem, bItem) => (aItem % 9) + (bItem % 7))
.Replay(1)
.RefCount();
_dummySub = _sums.Subscribe(_ => {});
}
public IObservable<int> getSums()
{
return _sums;
}
}
简而言之,如果必须,请从科目开始。然后带上你的主题,让他们尽可能远离你的逻辑。
所描述的情况就是所谓的 'Hot' 可观察 - 其生产者(排放源)在订阅之外创建的可观察,
(如 Ben Lesh 在 Hot vs Cold Observables 中所述 - 推荐阅读)。
正如 Shlomo 所说,Subjects 是 Rx 世界的 "mutable variables",您可以通过使用 Obsevable.create(它监听事件并产生基于它们的发射),然后使用将其转换为 ConnectableObservable 的运算符(如共享、发布)使其 'hot',以便将其多播给在不同时间订阅的多个观察者。
但是,在这种情况下,由于生产者在您的 class 本地(事件由此 class 生成),因此可以为此目的使用 Subject,因为您的 class 本身就是排放源和产生事件的 mutual/state 变量。
(基于 answer given by Erik Meijer, and this blog post)
我有一个问题,关于为了保持可观察状态,更好的 RxJava 模式是什么。
为简单起见,假设我们有一个 StateManager
class 需要跟踪系统中的某个状态(假设它是一个简单的布尔标志)并将其暴露在一种可观察的方式。因此,它将有一个如下所示的方法:
class StateManager {
Observable<Boolean> state();
...
}
此管理器的生命周期很长,可能有多个 "clients"(例如视图、其他管理器等)可以随时订阅或取消订阅。状态会根据一些内部事件而改变。
处理这个问题最明显的方法是将状态保存在 BehaviourSubject
消费者直接挂钩的状态中:
class StateManager {
Subject mStateSubject = BehaviourSubject.create(true);
Observable<Boolean> state() {
return mStateSubject.asObservable();
}
...
}
有更好的方法吗?
Subjects
可能是使用反应式库最不理想的方式,尽管它确实可以工作。
Functional-reactive 编程在没有状态的情况下效果最好。 Subjects
是一种状态。我建议更改您的代码,以便将 Observable 定义为功能运算符的组合。这使得测试和管理您的可观察对象发出的消息变得容易。
我更像是一名 C# 开发人员,所以我希望您能原谅不同的语法。这是一个例子:
void Main()
{
var tracker = new AddTracker();
tracker.getSums().Subscribe(i => Console.WriteLine(i));
Observable.Interval(TimeSpan.FromMilliseconds(100))
.Timestamp()
.Select(t => t.Timestamp.Second)
.Take(20)
.Subscribe(i => tracker.setA(i % 7));
Observable.Interval(TimeSpan.FromMilliseconds(75))
.Timestamp()
.Select(t => t.Timestamp.Millisecond)
.Take(30)
.Subscribe(i => tracker.setB(i % 9));
}
public class AddTracker
{
private readonly ISubject<int> _a;
private readonly ISubject<int> _b;
private readonly IObservable<int> _sums;
private readonly IDisposable _dummySub;
public AddTracker()
{
_a = new BehaviorSubject<int>(0);
_b = new BehaviorSubject<int>(0);
_sums = _a
.CombineLatest(_b, (a, b) => a + b)
.Replay(1)
.RefCount();
_dummySub = _sums.Subscribe(_ => { });
}
public void setA(int value)
{
_a.OnNext(value);
}
public void setB(int value)
{
_b.OnNext(value);
}
public IObservable<int> getSums()
{
return _sums;
}
}
在 C# 领域,您可以将 _a
和 _b
主题换成事件,这是一个轻微的改进。我知道 Java 中没有 first-class 事件,所以我不确定是否可以翻译。
然而,更根本的是,在 C# 和 Java 中,您应该问的问题是...导致 setA
和 setB
调用的原因是什么?你能用这个代替它们吗:
void Main()
{
var aStream = Observable.Interval(TimeSpan.FromMilliseconds(100))
.Timestamp()
.Select(t => t.Timestamp.Second)
.Take(20);
var bStream = Observable.Interval(TimeSpan.FromMilliseconds(75))
.Timestamp()
.Select(t => t.Timestamp.Millisecond)
.Take(30);
var tracker = new AddTracker(aStream, bStream);
tracker.getSums().Subscribe(i => Console.WriteLine(i));
}
public class AddTracker
{
private readonly IObservable<int> _sums;
private readonly IDisposable _dummySub;
public AddTracker(IObservable<int> a, IObservable<int> b)
{
_sums = a
.CombineLatest(b, (aItem, bItem) => (aItem % 9) + (bItem % 7))
.Replay(1)
.RefCount();
_dummySub = _sums.Subscribe(_ => {});
}
public IObservable<int> getSums()
{
return _sums;
}
}
简而言之,如果必须,请从科目开始。然后带上你的主题,让他们尽可能远离你的逻辑。
所描述的情况就是所谓的 'Hot' 可观察 - 其生产者(排放源)在订阅之外创建的可观察, (如 Ben Lesh 在 Hot vs Cold Observables 中所述 - 推荐阅读)。
正如 Shlomo 所说,Subjects 是 Rx 世界的 "mutable variables",您可以通过使用 Obsevable.create(它监听事件并产生基于它们的发射),然后使用将其转换为 ConnectableObservable 的运算符(如共享、发布)使其 'hot',以便将其多播给在不同时间订阅的多个观察者。
但是,在这种情况下,由于生产者在您的 class 本地(事件由此 class 生成),因此可以为此目的使用 Subject,因为您的 class 本身就是排放源和产生事件的 mutual/state 变量。 (基于 answer given by Erik Meijer, and this blog post)