围绕一组不断变化的依赖可观察对象创建一个可观察对象
Creating an Observable around sets of continously changing depended observables
下面的代码片段是我尝试创建以下功能:
- 创建订阅主题集合的可观察序列
- 当集合中的一个主题产生一个值时,序列结束,调用一个方法 returns 一组新的主题并从 1 重新开始。
- 当对外部可观察对象的订阅被处理掉时,整个事情就停止了
关于我的实现的问题:
- 为什么使用 subjectsSub.SelectMany(x => x).Merge() 但不使用 subjectsSub.Merge() ? (我本来希望后一种方法起作用)
- 在庞大的 Rx 功能库中是否有更简单、更优雅的解决方案?
更新:这个示例实际上是从 RxJS-Typescript 向后移植的,目的是为了让更广泛的观众了解这个问题。原始版本在使用 Javascript 的单线程浏览器环境中运行的事实应该更清楚为什么这个 "observable capturing" 可能有效(它确实有效并且没有像弄乱 RxJs 内部结构这样的肮脏黑客)。
class Program
{
private static readonly Queue<IObservable<Unit>[]> observableDependencies = new Queue<IObservable<Unit>[]>();
private static IObservable<Unit>[] EvaluateExpressionAndCaptureTouchedObservables(Func<object> expression)
{
// wire some traps for capturing any observables "touched" by expression
expression();
// return observables touched by expression (not in this example of course)
if (observableDependencies.Count > 0)
return observableDependencies.Dequeue();
return new[] {Observable.Never<Unit>()}; // keep going
}
private static IObservable<Unit> CreateObservable(
Subject<IObservable<Unit>[]> capturedObservables, Stopwatch sw)
{
return Observable.Create<Unit>(observer =>
{
var isComplete = new Subject<Unit>();
var isAborted = false;
var disp = Scheduler.Default.Schedule(self =>
{
Console.WriteLine("** Next iteration at {0}", sw.Elapsed);
capturedObservables.SelectMany(x => x).Merge().TakeUntil(isComplete).Subscribe(x =>
{
observer.OnNext(Unit.Default);
// self-destruct
isComplete.OnNext(Unit.Default);
},
() =>
{
Console.WriteLine("completed");
if (!isAborted)
self();
});
capturedObservables.OnNext(EvaluateExpressionAndCaptureTouchedObservables());
});
return new CompositeDisposable(Disposable.Create(() =>
{
isAborted = true;
// self-destruct
isComplete.OnNext(Unit.Default);
}), disp);
});
}
private static void Main(string[] args)
{
var sw = new Stopwatch();
sw.Start();
observableDependencies.Enqueue(new[]
{
Observable.Timer(TimeSpan.FromSeconds(10)).Select(x => Unit.Default)
});
observableDependencies.Enqueue(new[]
{
Observable.Timer(TimeSpan.FromSeconds(5)).Select(x => Unit.Default),
Observable.Return(10).Select(x => Unit.Default)
});
observableDependencies.Enqueue(new[] {Observable.Timer(TimeSpan.FromSeconds(3)).Select(x => Unit.Default)});
var capturedObservables = new Subject<IObservable<Unit>[]>();
var obs = CreateObservable(capturedObservables, sw);
var disp = obs.Subscribe(x => Console.WriteLine("** fired at {0}", sw.Elapsed));
Console.ReadLine();
disp.Dispose();
Console.ReadLine();
}
}
要回答您的第一个问题,SelectMany
是必需的,因为您有一个 3 级深度可观察对象:Observable 数组的主题。 Merge
只压平一层。 SelectMany
只是 Select
+ Merge
的简写。所以 SelectMany.Merge
正在应用 2 个展平运算符,这正是您所需要的。
第二个答案...看起来你可以只使用 Merge
+ FirstOrDefault
+ Defer
+ Repeat
甚至不使用主题:
var disp = Observable
.Defer(() => EvaluateExpressionAndCaptureTouchedObservables()
.Merge()
.FirstOrDefault(Unit.Default))
.Repeat()
.Subscribe(...);
Defer
每次订阅都运行捕获函数
Merge
展平 observables 数组
FirstOrDefault
一旦任何 observables 产生一个值就结束流。如果所有这些都完成但没有产生值,那么它会产生一个你可以观察到的 Unit.Default
。
Repeat
在结束时重新订阅(由于 FirstOrDefault
),这会触发另一次捕获(由于 Defer
)。
转换回 TypeScript 显然很简单...
CreateObservable 的最终版本或多或少受到了 Brandon 的建议的启发。这表明 99% 的时间你认为你必须求助于安排事情,你做错了™
private static IObservable<Unit> CreateObservable()
{
return Observable.Create<Unit>(observer =>
{
var innerDisp = Observable.Defer(() =>
{
return Observable.Merge(
EvaluateExpressionAndCaptureTouchedObservables(() => false))
.Take(1); // done when any observable produces a value
})
.Repeat()
.Subscribe(x =>
{
observer.OnNext(Unit.Default);
});
return innerDisp;
});
}
下面的代码片段是我尝试创建以下功能:
- 创建订阅主题集合的可观察序列
- 当集合中的一个主题产生一个值时,序列结束,调用一个方法 returns 一组新的主题并从 1 重新开始。
- 当对外部可观察对象的订阅被处理掉时,整个事情就停止了
关于我的实现的问题:
- 为什么使用 subjectsSub.SelectMany(x => x).Merge() 但不使用 subjectsSub.Merge() ? (我本来希望后一种方法起作用)
- 在庞大的 Rx 功能库中是否有更简单、更优雅的解决方案?
更新:这个示例实际上是从 RxJS-Typescript 向后移植的,目的是为了让更广泛的观众了解这个问题。原始版本在使用 Javascript 的单线程浏览器环境中运行的事实应该更清楚为什么这个 "observable capturing" 可能有效(它确实有效并且没有像弄乱 RxJs 内部结构这样的肮脏黑客)。
class Program
{
private static readonly Queue<IObservable<Unit>[]> observableDependencies = new Queue<IObservable<Unit>[]>();
private static IObservable<Unit>[] EvaluateExpressionAndCaptureTouchedObservables(Func<object> expression)
{
// wire some traps for capturing any observables "touched" by expression
expression();
// return observables touched by expression (not in this example of course)
if (observableDependencies.Count > 0)
return observableDependencies.Dequeue();
return new[] {Observable.Never<Unit>()}; // keep going
}
private static IObservable<Unit> CreateObservable(
Subject<IObservable<Unit>[]> capturedObservables, Stopwatch sw)
{
return Observable.Create<Unit>(observer =>
{
var isComplete = new Subject<Unit>();
var isAborted = false;
var disp = Scheduler.Default.Schedule(self =>
{
Console.WriteLine("** Next iteration at {0}", sw.Elapsed);
capturedObservables.SelectMany(x => x).Merge().TakeUntil(isComplete).Subscribe(x =>
{
observer.OnNext(Unit.Default);
// self-destruct
isComplete.OnNext(Unit.Default);
},
() =>
{
Console.WriteLine("completed");
if (!isAborted)
self();
});
capturedObservables.OnNext(EvaluateExpressionAndCaptureTouchedObservables());
});
return new CompositeDisposable(Disposable.Create(() =>
{
isAborted = true;
// self-destruct
isComplete.OnNext(Unit.Default);
}), disp);
});
}
private static void Main(string[] args)
{
var sw = new Stopwatch();
sw.Start();
observableDependencies.Enqueue(new[]
{
Observable.Timer(TimeSpan.FromSeconds(10)).Select(x => Unit.Default)
});
observableDependencies.Enqueue(new[]
{
Observable.Timer(TimeSpan.FromSeconds(5)).Select(x => Unit.Default),
Observable.Return(10).Select(x => Unit.Default)
});
observableDependencies.Enqueue(new[] {Observable.Timer(TimeSpan.FromSeconds(3)).Select(x => Unit.Default)});
var capturedObservables = new Subject<IObservable<Unit>[]>();
var obs = CreateObservable(capturedObservables, sw);
var disp = obs.Subscribe(x => Console.WriteLine("** fired at {0}", sw.Elapsed));
Console.ReadLine();
disp.Dispose();
Console.ReadLine();
}
}
要回答您的第一个问题,SelectMany
是必需的,因为您有一个 3 级深度可观察对象:Observable 数组的主题。 Merge
只压平一层。 SelectMany
只是 Select
+ Merge
的简写。所以 SelectMany.Merge
正在应用 2 个展平运算符,这正是您所需要的。
第二个答案...看起来你可以只使用 Merge
+ FirstOrDefault
+ Defer
+ Repeat
甚至不使用主题:
var disp = Observable
.Defer(() => EvaluateExpressionAndCaptureTouchedObservables()
.Merge()
.FirstOrDefault(Unit.Default))
.Repeat()
.Subscribe(...);
Defer
每次订阅都运行捕获函数
Merge
展平 observables 数组
FirstOrDefault
一旦任何 observables 产生一个值就结束流。如果所有这些都完成但没有产生值,那么它会产生一个你可以观察到的 Unit.Default
。
Repeat
在结束时重新订阅(由于 FirstOrDefault
),这会触发另一次捕获(由于 Defer
)。
转换回 TypeScript 显然很简单...
CreateObservable 的最终版本或多或少受到了 Brandon 的建议的启发。这表明 99% 的时间你认为你必须求助于安排事情,你做错了™
private static IObservable<Unit> CreateObservable()
{
return Observable.Create<Unit>(observer =>
{
var innerDisp = Observable.Defer(() =>
{
return Observable.Merge(
EvaluateExpressionAndCaptureTouchedObservables(() => false))
.Take(1); // done when any observable produces a value
})
.Repeat()
.Subscribe(x =>
{
observer.OnNext(Unit.Default);
});
return innerDisp;
});
}