组合相关的可观察量

Combining dependent observables

我有一个容器 class Project,里面有很多 IItem。在项目启动之前,可以向其添加项目。除非停止,否则无法再向其添加项目。

每个项目都可以通过其 IsActive 属性 依次激活和停用。

public interface IItem 
{
    bool IsActive { get; set;}
}

public interface IFoo : IItem
{
}

public class Foo : IFoo
{
    public bool IsActive { get; set;}
}

public interface IBar : IItem
{ }

public class Bar : IBar
{ 
    public bool IsActive { get; set;}
}

public class Project
{
    public Project(params IItem [] items)
    {
        Items = new List<IItem>(items);
    }

    public List<IItem> Items { get;}
}

我还有两个可观察对象,一个用于项目状态,一个用于任何项目的更改。为了这个例子的目的,我用主题模拟了这些

var projectIsRunningObservable = new Subject<bool>();
var projectItemChangedObservable = new Subject<IItem>();

我正在尝试创建一个 IObservable<bool> 来发送一个值,该值指示是否(至少有一个项目处于活动状态并且项目已启动)。如果有活动项目并且项目已停止,它应该推送 false 值。

这是我目前的情况:

void Main()
{
    var bar1 = new Bar();   
    var bar2 = new Bar();   

    var foo1 = new Foo();
    var foo2 = new Foo();

    var projectIsRunningObservable = new Subject<bool>();
    var projectItemChangedObservable = new Subject<IItem>();

    var project = new Project(
        bar1, bar2, foo1, foo2);


    var observable = Observable.Create<bool>(obs =>
                {
                   IList<IItem> items = null;

                    var stateObservable = projectIsRunningObservable.StartWith(false).Subscribe(
                    (state) =>
                    {
                        if (!state)
                        {
                            items = null;
                            obs.OnNext(false);
                        }
                        else
                        {
                            items = project.Items.ToList();
                            obs.OnNext(items != null && items.Any(i => i.IsActive));
                        }
                    },
                    ex => obs.OnError(ex),
                    () => obs.OnCompleted());

                    var itemChangedObservable = projectItemChangedObservable.Subscribe(
                    x =>
                    {
                        obs.OnNext(items != null && items.Any(i => i.IsActive));
                    }
                    ,
                    ex => obs.OnError(ex),
                    () => obs.OnCompleted());

                    return new CompositeDisposable(stateObservable, itemChangedObservable);
                });


    var subscr = observable.Subscribe(Console.WriteLine);

    Console.WriteLine("Change bar1");
    bar1.IsActive = true;
    projectItemChangedObservable.OnNext(bar1);

    Console.WriteLine("Change bar2");
    bar2.IsActive = true;
    projectItemChangedObservable.OnNext(bar2);

    Console.WriteLine("Change foo1");
    foo1.IsActive = true;
    projectItemChangedObservable.OnNext(foo1);

    Console.WriteLine("Change foo2");
    foo2.IsActive = true;
    projectItemChangedObservable.OnNext(foo2);

    // Start project

    Console.WriteLine("Starting project");
    projectIsRunningObservable.OnNext(true);

    Console.WriteLine("Change bar1");
    bar1.IsActive = false;
    projectItemChangedObservable.OnNext(bar1);

    Console.WriteLine("Change bar2");
    bar2.IsActive = false;
    projectItemChangedObservable.OnNext(bar2);

    Console.WriteLine("Change foo1");
    foo1.IsActive = false;
    projectItemChangedObservable.OnNext(foo1);

    Console.WriteLine("Change foo2");
    foo2.IsActive = false;
    projectItemChangedObservable.OnNext(foo2);

    Console.WriteLine("Change foo2 back to true");
    foo2.IsActive = true;
    projectItemChangedObservable.OnNext(foo2);

    // Stop project
    Console.WriteLine("Stopping project");
    projectIsRunningObservable.OnNext(false);

    Console.WriteLine("Change bar1");
    bar1.IsActive = true;
    projectItemChangedObservable.OnNext(bar1);

    Console.WriteLine("Change bar2");
    bar2.IsActive = true;
    projectItemChangedObservable.OnNext(bar2);

    Console.WriteLine("Change foo1");
    foo1.IsActive = true;
    projectItemChangedObservable.OnNext(foo1);

    Console.WriteLine("Change foo2");
    foo2.IsActive = true;
    projectItemChangedObservable.OnNext(foo2);
}

这可行,但是我不确定这是否是最好的方法,也不确定是否可以发送多个 OnErrorOnCompleted 通知。

这是 Observable.Create 的一个很好的用例。您应该将 Subject 重构为 Observable,并使用 TestScheduler class 进行测试。

后一个问题的答案可以在 Rx Design Guidelines (PDF) 中找到。第 6.2 章指出 Observable.Create 提供了多种保护以使序列遵循 Rx 契约。

When the observable sequence has finished (either by firing OnError or Oncompleted), any subscription will automatically be unsubscribed. Any subscribed observer instance will only see a single OnError or OnCompleted message. No more messages are sent through. This ensures the Rx grammar of OnNext* (OnError|OnCompleted)?

注意:在指南示例中,他们使用 Observable.CreateWithDisposable。在最新的 Rx 版本中,它已被重构为 Observable.Create 的重载,您可能知道:)