围绕一组不断变化的依赖可观察对象创建一个可观察对象

Creating an Observable around sets of continously changing depended observables

下面的代码片段是我尝试创建以下功能:

关于我的实现的问题:

更新:这个示例实际上是从 RxJS-Typescript 向后移植的,目的是为了让更广泛的观众了解这个问题。原始版本在使用 Javascript 的单线程浏览器环境中运行的事实应该更清楚为什么这个 "observable capturing" 可能有效(它确实有效并且没有像弄乱 RxJs 内部结构这样的肮脏黑客)。


    class Program
    {
      private static readonly Queue<IObservable<Unit>[]> observableDependencies = new Queue<IObservable<Unit>[]>();

      private static IObservable<Unit>[] EvaluateExpressionAndCaptureTouchedObservables(Func<object> expression)
      {
        // wire some traps for capturing any observables "touched" by expression
        expression();

        // return observables touched by expression (not in this example of course)
        if (observableDependencies.Count > 0)
          return observableDependencies.Dequeue();

        return new[] {Observable.Never<Unit>()}; // keep going
      }

      private static IObservable<Unit> CreateObservable(
Subject<IObservable<Unit>[]> capturedObservables, Stopwatch sw)
      {
        return Observable.Create<Unit>(observer =>
        {
          var isComplete = new Subject<Unit>();
          var isAborted = false;

          var disp = Scheduler.Default.Schedule(self =>
          {
            Console.WriteLine("** Next iteration at {0}", sw.Elapsed);

            capturedObservables.SelectMany(x => x).Merge().TakeUntil(isComplete).Subscribe(x =>
            {
              observer.OnNext(Unit.Default);

              // self-destruct
              isComplete.OnNext(Unit.Default);
            },
            () =>
            {
              Console.WriteLine("completed");

              if (!isAborted)
                self();
            });

            capturedObservables.OnNext(EvaluateExpressionAndCaptureTouchedObservables());
          });

          return new CompositeDisposable(Disposable.Create(() =>
          {
            isAborted = true;

            // self-destruct
            isComplete.OnNext(Unit.Default);
          }), disp);
        });
      }

      private static void Main(string[] args)
      {
        var sw = new Stopwatch();
        sw.Start();

        observableDependencies.Enqueue(new[]
        {
          Observable.Timer(TimeSpan.FromSeconds(10)).Select(x => Unit.Default)
        });

        observableDependencies.Enqueue(new[]
        {
          Observable.Timer(TimeSpan.FromSeconds(5)).Select(x => Unit.Default),
          Observable.Return(10).Select(x => Unit.Default)
        });

        observableDependencies.Enqueue(new[] {Observable.Timer(TimeSpan.FromSeconds(3)).Select(x => Unit.Default)});

        var capturedObservables = new Subject<IObservable<Unit>[]>();
        var obs = CreateObservable(capturedObservables, sw);

        var disp = obs.Subscribe(x => Console.WriteLine("** fired at {0}", sw.Elapsed));
        Console.ReadLine();

        disp.Dispose();
        Console.ReadLine();
      }
    }

要回答您的第一个问题,SelectMany 是必需的,因为您有一个 3 级深度可观察对象:Observable 数组的主题。 Merge 只压平一层。 SelectMany 只是 Select + Merge 的简写。所以 SelectMany.Merge 正在应用 2 个展平运算符,这正是您所需要的。

第二个答案...看起来你可以只使用 Merge + FirstOrDefault + Defer + Repeat 甚至不使用主题:

var disp = Observable
    .Defer(() => EvaluateExpressionAndCaptureTouchedObservables()
        .Merge()
        .FirstOrDefault(Unit.Default))
    .Repeat()
    .Subscribe(...);

Defer每次订阅都运行捕获函数

Merge 展平 observables 数组

FirstOrDefault 一旦任何 observables 产生一个值就结束流。如果所有这些都完成但没有产生值,那么它会产生一个你可以观察到的 Unit.Default

Repeat 在结束时重新订阅(由于 FirstOrDefault),这会触发另一次捕获(由于 Defer)。

转换回 TypeScript 显然很简单...

CreateObservable 的最终版本或多或少受到了 Brandon 的建议的启发。这表明 99% 的时间你认为你必须求助于安排事情,你做错了™

private static IObservable<Unit> CreateObservable()
{
  return Observable.Create<Unit>(observer =>
  {
    var innerDisp = Observable.Defer(() =>
    {
      return Observable.Merge(
        EvaluateExpressionAndCaptureTouchedObservables(() => false))
      .Take(1);  // done when any observable produces a value
    })
    .Repeat()
    .Subscribe(x =>
    {
      observer.OnNext(Unit.Default);
    });

    return innerDisp;
  });
}