如何只发出一致的计算?

How to only emit consistent calculations?

我正在使用响应式编程来进行大量计算。这是一个跟踪两个数字及其总和的简单示例:

static void Main(string[] args) {
    BehaviorSubject<int> x = new BehaviorSubject<int>(1);
    BehaviorSubject<int> y = new BehaviorSubject<int>(2);
    var sum = Observable.CombineLatest(x, y, (num1, num2) => num1 + num2);
    Observable
        .CombineLatest(x, y, sum, (xx, yy, sumsum) => new { X = xx, Y = yy, Sum = sumsum })
        .Subscribe(i => Console.WriteLine($"X:{i.X} Y:{i.Y} Sum:{i.Sum}"));
    x.OnNext(3);
    Console.ReadLine();
}

这会生成以下输出:

X:1 Y:2 Sum:3
X:3 Y:2 Sum:3 
X:3 Y:2 Sum:5

注意第二个输出结果是"incorrect",因为它表明3+2=3。我理解为什么会发生这种情况(x 在总和更新之前更新)但我希望我的输出计算为 atomic/consistent - 在所有相关计算完成之前不应发出任何值。我的第一个方法是...

Observable.When(sum.And(Observable.CombineLatest(x, y)).Then((s, xy) => new { Sum = s, X = xy[0], Y = xy[1] } ));

这似乎适用于我的简单示例。但是我的实际代码有很多计算值,我不知道如何缩放它。例如,如果有一个 sum 和 squaredSum,我不知道如何在采取行动之前等待它们中的每一个发出一些东西。

一种应该可行的方法(理论上)是为我关心的所有值加上时间戳,如下所示。

Observable
    .CombineLatest(x.Timestamp(), y.Timestamp(), sum.Timestamp(), (xx, yy, sumsum) => new { X = xx, Y = yy, Sum = sumsum })
    .Where(i=>i.Sum.Timestamp>i.X.Timestamp && i.Sum.Timestamp>i.Y.Timestamp)
    // do the calculation and subscribe

此方法适用于非常复杂的模型。我所要做的就是确保不会发出比任何核心数据值更旧的计算值。我觉得这有点麻烦。它实际上在我的控制台应用程序中不起作用。当我用分配了顺序 int64 的自定义扩展替换 Timestamp 时,它确实起作用了。

一般来说,处理这种事情的简单、干净的方法是什么?

=======

我在这方面取得了一些进展。这会等待 sum 和 sumSquared 在获取触发计算的数据值之前发出一个值。

var all = Observable.When(sum.And(sumSquared).And(Observable.CombineLatest(x, y)).Then((s, q, data) 
    => new { Sum = s, SumSquared = q, X = data[0], Y = data[1] }));

这应该可以满足您的要求:

Observable.CombineLatest(x, y, sum)
    .DistinctUntilChanged(list => list[2])
    .Subscribe(list => Console.WriteLine("{0}+{1}={2}", list[0], list[1], list[2]));

它一直等到总和被更新,这意味着它的所有源也必须被更新。

你的问题不是因为 x 在总和 本身更新之前更新了。这实际上与您构建查询的方式有关。

您已经有效地创建了两个查询:Observable.CombineLatest(x, y, (num1, num2) => num1 + num2)Observable.CombineLatest(x, y, sum, (xx, yy, sumsum) => new { X = xx, Y = yy, Sum = sumsum })。由于您在每个订阅中都订阅了 x,因此您创建了两个订阅。这意味着当 x 更新时会发生两次更新。

您需要避免创建两个订阅。

如果你这样写代码:

BehaviorSubject<int> x = new BehaviorSubject<int>(1);
BehaviorSubject<int> y = new BehaviorSubject<int>(2);

Observable
    .CombineLatest(x, y, (num1, num2) => new
    {
        X = num1,
        Y = num2,
        Sum = num1 + num2
    })
    .Subscribe(i => Console.WriteLine($"X:{i.X} Y:{i.Y} Sum:{i.Sum}"));

x.OnNext(3);

...然后你正确地得到这个输出:

X:1 Y:2 Sum:3
X:3 Y:2 Sum:5

我已经开始思考这个问题了。这是我要完成的更详细的示例。这是一些验证名字和姓氏的代码,并且应该只在两个部分都有效时生成全名。如您所见,我正在尝试使用一堆独立定义的小函数,例如 "firstIsValid",然后将它们组合在一起以计算更复杂的东西。

我在这里面临的挑战似乎是尝试关联函数中的输入和输出。例如,"firstIsValid" 生成的输出表明某个名字有效,但不会告诉您是哪一个。在下面的选项 2 中,我可以使用 Zip 将它们关联起来。

如果验证函数不为每个输入生成一个输出,则此策略将不起作用。例如,如果用户正在输入网址,而我们正试图在网络上验证它们,也许我们会做一个 Throttle and/or Switch。单个 "webAddressIsValid" 可能有 10 个网址。在那种情况下,我认为我必须将输出与输入包括在一起。也许有一个 IObservable>,其中字符串是网址,布尔值是它是否有效。

static void Main(string[] args) {
    var first = new BehaviorSubject<string>(null);
    var last = new BehaviorSubject<string>(null);
    var firstIsValid = first.Select(i => string.IsNullOrEmpty(i) || i.Length < 3 ? false : true);
    var lastIsValid = last.Select(i => string.IsNullOrEmpty(i) || i.Length < 3 ? false : true);

    // OPTION 1 : Does not work
    // Output: bob smith, bob, bob roberts, roberts
    // firstIsValid and lastIsValid are not in sync with first and last
    //var whole = Observable
    //    .CombineLatest(first, firstIsValid, last, lastIsValid, (f, fv, l, lv) => new {
    //        First = f,
    //        Last = l,
    //        FirstIsValid = fv,
    //        LastIsValid = lv
    //    })
    //    .Where(i => i.FirstIsValid && i.LastIsValid)
    //    .Select(i => $"{i.First} {i.Last}");

    // OPTION 2 : Works as long as every change in a core data value generates one calculated value
    // Output: bob smith, bob robert
    var firstValidity = Observable.Zip(first, firstIsValid, (f, fv) => new { Name = f, IsValid = fv });
    var lastValidity = Observable.Zip(last, lastIsValid, (l, lv) => new { Name = l, IsValid = lv });
    var whole =
        Observable.CombineLatest(firstValidity, lastValidity, (f, l) => new { First = f, Last = l })
        .Where(i => i.First.IsValid && i.Last.IsValid)
        .Select(i => $"{i.First.Name} {i.Last.Name}");

    whole.Subscribe(i => Console.WriteLine(i));

    first.OnNext("bob");
    last.OnNext("smith");
    last.OnNext(null);
    last.OnNext("roberts");
    first.OnNext(null);

    Console.ReadLine();
}

另一种方法。每个值都有一个版本号(如时间戳)。任何时候计算值早于数据(或它所依赖的其他计算值)我们都可以忽略它。

public class VersionedValue {
    static long _version;
    public VersionedValue() { Version = Interlocked.Increment(ref _version); }
    public long Version { get; }
}

public class VersionedValue<T> : VersionedValue {
    public VersionedValue(T value) { Value = value; }
    public T Value { get; }
    public override string ToString() => $"{Value} {Version}";
}

public static class ExtensionMethods {
    public static IObservable<VersionedValue<T>> Versioned<T>(this IObservable<T> values) => values.Select(i => new VersionedValue<T>(i));
    public static VersionedValue<T> AsVersionedValue<T>(this T obj) => new VersionedValue<T>(obj);
}

static void Main(string[] args) {
    // same as before
    //
    var whole = Observable
        .CombineLatest(first.Versioned(), firstIsValid.Versioned(), last.Versioned(), lastIsValid.Versioned(), (f, fv, l, lv) => new {
            First = f,
            Last = l,
            FirstIsValid = fv,
            LastIsValid = lv
        })
        .Where(i => i.FirstIsValid.Version > i.First.Version && i.LastIsValid.Version > i.Last.Version)
        .Where(i => i.FirstIsValid.Value && i.LastIsValid.Value)
        .Select(i => $"{i.First.Value} {i.Last.Value}");