需要状态的自包含 Reactive Extensions 辅助方法

Self contained Reactive Extensions helper methods that require state

查看 https://eprystupa.wordpress.com/2009/12/18/detecting-running-highlow-prices-using-reactive-extensions-for-net/ 它有一个有趣的代码块:

var rnd = new Random();
var feed = Observable.Defer(() =>
    Observable.Return(Math.Round(30.0 + rnd.NextDouble(), 2))
    .Delay(TimeSpan.FromSeconds(1 * rnd.NextDouble())))
    .Repeat();

// Daily low price feed
double min = double.MaxValue;
var feedLo = feed
    .Where(p => p < min)
    .Do(p => min = Math.Min(min, p))
    .Select(p => "New LO: " + p);

// Daily high price feed
double max = double.MinValue;
var feedHi = feed
    .Where(p => p > max)
    .Do(p => max = Math.Max(max, p))
    .Select(p => "New HI: " + p);

// Combine hi and lo in one feed and subscribe to it
feedLo.Merge(feedHi).Subscribe(Console.WriteLine);

以上是可以的,但局部变量 maxmin 意味着代码非常具体,而我想附上 NewLowHi code/indicator 到现有的 IObservable<double> 很像 https://github.com/fiatsasia/Financier 有:

public static IObservable<TSource> SimpleMovingAverage<TSource>(this IObservable<TSource> source, int period)
{
    return source.Buffer(period, 1).Select(e => e.Average());
}

创建一个自包含的 NewLowHi 指标的最佳实践是什么,我可以在不使用(或至少在内部隐藏)局部变量 maxmin 的情况下订阅它?

您在 WordPress 网站上提到的代码有一些缺陷。

由于他们创建 feed 的方式,它是一个热门观察,因为每个订阅都会收到一组不同的数字。所以 feedLofeedHi observables 将从不同的变量集工作。

但情况变得更糟。例如feedLo有两个订阅,那么feed会有两个订阅,而min只有一个状态变量,这意味着出来的值将是最小值两个订阅的数量,而不是每个订阅的最小值。

我将展示如何正确执行此操作,但首先您的问题是关于如何封装状态。方法如下:

IObservable<T> feed =
    Observable
        .Defer(() =>
        {
            int state = 42;
            return Observable... // define your observable here.
        });

现在,feed 源使用 Random 作为其状态。我们可以继续使用上面的模式重写 feed

var feed =
    Observable
        .Defer(() =>
        {
            var rnd = new Random();
            return
                Observable
                    .Generate(
                        0, x => true, x => x,
                        x => Math.Round(30.0 + rnd.NextDouble(), 2),
                        x => TimeSpan.FromSeconds(rnd.NextDouble()));
        });

Defer/Return/Delay/Repeat 模式相比,我更喜欢使用 Observable.Generate

现在介绍如何获取最小值和最大值。

我想要一个 IObservable<(State state, double value)>,它可以给我来自对源可观察对象的单个订阅的高值和低值。这是 State 的样子:

public enum State
{
    High,
    Low,
}

这是我的观察结果:

IObservable<(State state, double value)> feedHighLow(IObservable<double> source) =>
    source.Publish(xs => Observable.Merge(
        xs.Scan(Math.Min).DistinctUntilChanged().Select(x => (state: State.Low, value: x)),
        xs.Scan(Math.Max).DistinctUntilChanged().Select(x => (state: State.High, value: x))));

现在我可以调用 feedHighLow(feed) 并从对源提要的单个订阅中获取 High/Low 值的流。 Publish 调用确保对源的单一订阅,Merge 意味着我可以 运行 两个不同的可观察值分别获得最小值和最大值。

我得到这样的结果: