如何只发出一致的计算?
How to only emit consistent calculations?
我正在使用响应式编程来进行大量计算。这是一个跟踪两个数字及其总和的简单示例:
static void Main(string[] args) {
BehaviorSubject<int> x = new BehaviorSubject<int>(1);
BehaviorSubject<int> y = new BehaviorSubject<int>(2);
var sum = Observable.CombineLatest(x, y, (num1, num2) => num1 + num2);
Observable
.CombineLatest(x, y, sum, (xx, yy, sumsum) => new { X = xx, Y = yy, Sum = sumsum })
.Subscribe(i => Console.WriteLine($"X:{i.X} Y:{i.Y} Sum:{i.Sum}"));
x.OnNext(3);
Console.ReadLine();
}
这会生成以下输出:
X:1 Y:2 Sum:3
X:3 Y:2 Sum:3
X:3 Y:2 Sum:5
注意第二个输出结果是"incorrect",因为它表明3+2=3。我理解为什么会发生这种情况(x 在总和更新之前更新)但我希望我的输出计算为 atomic/consistent - 在所有相关计算完成之前不应发出任何值。我的第一个方法是...
Observable.When(sum.And(Observable.CombineLatest(x, y)).Then((s, xy) => new { Sum = s, X = xy[0], Y = xy[1] } ));
这似乎适用于我的简单示例。但是我的实际代码有很多计算值,我不知道如何缩放它。例如,如果有一个 sum 和 squaredSum,我不知道如何在采取行动之前等待它们中的每一个发出一些东西。
一种应该可行的方法(理论上)是为我关心的所有值加上时间戳,如下所示。
Observable
.CombineLatest(x.Timestamp(), y.Timestamp(), sum.Timestamp(), (xx, yy, sumsum) => new { X = xx, Y = yy, Sum = sumsum })
.Where(i=>i.Sum.Timestamp>i.X.Timestamp && i.Sum.Timestamp>i.Y.Timestamp)
// do the calculation and subscribe
此方法适用于非常复杂的模型。我所要做的就是确保不会发出比任何核心数据值更旧的计算值。我觉得这有点麻烦。它实际上在我的控制台应用程序中不起作用。当我用分配了顺序 int64 的自定义扩展替换 Timestamp 时,它确实起作用了。
一般来说,处理这种事情的简单、干净的方法是什么?
=======
我在这方面取得了一些进展。这会等待 sum 和 sumSquared 在获取触发计算的数据值之前发出一个值。
var all = Observable.When(sum.And(sumSquared).And(Observable.CombineLatest(x, y)).Then((s, q, data)
=> new { Sum = s, SumSquared = q, X = data[0], Y = data[1] }));
这应该可以满足您的要求:
Observable.CombineLatest(x, y, sum)
.DistinctUntilChanged(list => list[2])
.Subscribe(list => Console.WriteLine("{0}+{1}={2}", list[0], list[1], list[2]));
它一直等到总和被更新,这意味着它的所有源也必须被更新。
你的问题不是因为 x
在总和 本身更新之前更新了。这实际上与您构建查询的方式有关。
您已经有效地创建了两个查询:Observable.CombineLatest(x, y, (num1, num2) => num1 + num2)
和 Observable.CombineLatest(x, y, sum, (xx, yy, sumsum) => new { X = xx, Y = yy, Sum = sumsum })
。由于您在每个订阅中都订阅了 x
,因此您创建了两个订阅。这意味着当 x
更新时会发生两次更新。
您需要避免创建两个订阅。
如果你这样写代码:
BehaviorSubject<int> x = new BehaviorSubject<int>(1);
BehaviorSubject<int> y = new BehaviorSubject<int>(2);
Observable
.CombineLatest(x, y, (num1, num2) => new
{
X = num1,
Y = num2,
Sum = num1 + num2
})
.Subscribe(i => Console.WriteLine($"X:{i.X} Y:{i.Y} Sum:{i.Sum}"));
x.OnNext(3);
...然后你正确地得到这个输出:
X:1 Y:2 Sum:3
X:3 Y:2 Sum:5
我已经开始思考这个问题了。这是我要完成的更详细的示例。这是一些验证名字和姓氏的代码,并且应该只在两个部分都有效时生成全名。如您所见,我正在尝试使用一堆独立定义的小函数,例如 "firstIsValid",然后将它们组合在一起以计算更复杂的东西。
我在这里面临的挑战似乎是尝试关联函数中的输入和输出。例如,"firstIsValid" 生成的输出表明某个名字有效,但不会告诉您是哪一个。在下面的选项 2 中,我可以使用 Zip 将它们关联起来。
如果验证函数不为每个输入生成一个输出,则此策略将不起作用。例如,如果用户正在输入网址,而我们正试图在网络上验证它们,也许我们会做一个 Throttle and/or Switch。单个 "webAddressIsValid" 可能有 10 个网址。在那种情况下,我认为我必须将输出与输入包括在一起。也许有一个 IObservable>,其中字符串是网址,布尔值是它是否有效。
static void Main(string[] args) {
var first = new BehaviorSubject<string>(null);
var last = new BehaviorSubject<string>(null);
var firstIsValid = first.Select(i => string.IsNullOrEmpty(i) || i.Length < 3 ? false : true);
var lastIsValid = last.Select(i => string.IsNullOrEmpty(i) || i.Length < 3 ? false : true);
// OPTION 1 : Does not work
// Output: bob smith, bob, bob roberts, roberts
// firstIsValid and lastIsValid are not in sync with first and last
//var whole = Observable
// .CombineLatest(first, firstIsValid, last, lastIsValid, (f, fv, l, lv) => new {
// First = f,
// Last = l,
// FirstIsValid = fv,
// LastIsValid = lv
// })
// .Where(i => i.FirstIsValid && i.LastIsValid)
// .Select(i => $"{i.First} {i.Last}");
// OPTION 2 : Works as long as every change in a core data value generates one calculated value
// Output: bob smith, bob robert
var firstValidity = Observable.Zip(first, firstIsValid, (f, fv) => new { Name = f, IsValid = fv });
var lastValidity = Observable.Zip(last, lastIsValid, (l, lv) => new { Name = l, IsValid = lv });
var whole =
Observable.CombineLatest(firstValidity, lastValidity, (f, l) => new { First = f, Last = l })
.Where(i => i.First.IsValid && i.Last.IsValid)
.Select(i => $"{i.First.Name} {i.Last.Name}");
whole.Subscribe(i => Console.WriteLine(i));
first.OnNext("bob");
last.OnNext("smith");
last.OnNext(null);
last.OnNext("roberts");
first.OnNext(null);
Console.ReadLine();
}
另一种方法。每个值都有一个版本号(如时间戳)。任何时候计算值早于数据(或它所依赖的其他计算值)我们都可以忽略它。
public class VersionedValue {
static long _version;
public VersionedValue() { Version = Interlocked.Increment(ref _version); }
public long Version { get; }
}
public class VersionedValue<T> : VersionedValue {
public VersionedValue(T value) { Value = value; }
public T Value { get; }
public override string ToString() => $"{Value} {Version}";
}
public static class ExtensionMethods {
public static IObservable<VersionedValue<T>> Versioned<T>(this IObservable<T> values) => values.Select(i => new VersionedValue<T>(i));
public static VersionedValue<T> AsVersionedValue<T>(this T obj) => new VersionedValue<T>(obj);
}
static void Main(string[] args) {
// same as before
//
var whole = Observable
.CombineLatest(first.Versioned(), firstIsValid.Versioned(), last.Versioned(), lastIsValid.Versioned(), (f, fv, l, lv) => new {
First = f,
Last = l,
FirstIsValid = fv,
LastIsValid = lv
})
.Where(i => i.FirstIsValid.Version > i.First.Version && i.LastIsValid.Version > i.Last.Version)
.Where(i => i.FirstIsValid.Value && i.LastIsValid.Value)
.Select(i => $"{i.First.Value} {i.Last.Value}");
我正在使用响应式编程来进行大量计算。这是一个跟踪两个数字及其总和的简单示例:
static void Main(string[] args) {
BehaviorSubject<int> x = new BehaviorSubject<int>(1);
BehaviorSubject<int> y = new BehaviorSubject<int>(2);
var sum = Observable.CombineLatest(x, y, (num1, num2) => num1 + num2);
Observable
.CombineLatest(x, y, sum, (xx, yy, sumsum) => new { X = xx, Y = yy, Sum = sumsum })
.Subscribe(i => Console.WriteLine($"X:{i.X} Y:{i.Y} Sum:{i.Sum}"));
x.OnNext(3);
Console.ReadLine();
}
这会生成以下输出:
X:1 Y:2 Sum:3
X:3 Y:2 Sum:3
X:3 Y:2 Sum:5
注意第二个输出结果是"incorrect",因为它表明3+2=3。我理解为什么会发生这种情况(x 在总和更新之前更新)但我希望我的输出计算为 atomic/consistent - 在所有相关计算完成之前不应发出任何值。我的第一个方法是...
Observable.When(sum.And(Observable.CombineLatest(x, y)).Then((s, xy) => new { Sum = s, X = xy[0], Y = xy[1] } ));
这似乎适用于我的简单示例。但是我的实际代码有很多计算值,我不知道如何缩放它。例如,如果有一个 sum 和 squaredSum,我不知道如何在采取行动之前等待它们中的每一个发出一些东西。
一种应该可行的方法(理论上)是为我关心的所有值加上时间戳,如下所示。
Observable
.CombineLatest(x.Timestamp(), y.Timestamp(), sum.Timestamp(), (xx, yy, sumsum) => new { X = xx, Y = yy, Sum = sumsum })
.Where(i=>i.Sum.Timestamp>i.X.Timestamp && i.Sum.Timestamp>i.Y.Timestamp)
// do the calculation and subscribe
此方法适用于非常复杂的模型。我所要做的就是确保不会发出比任何核心数据值更旧的计算值。我觉得这有点麻烦。它实际上在我的控制台应用程序中不起作用。当我用分配了顺序 int64 的自定义扩展替换 Timestamp 时,它确实起作用了。
一般来说,处理这种事情的简单、干净的方法是什么?
=======
我在这方面取得了一些进展。这会等待 sum 和 sumSquared 在获取触发计算的数据值之前发出一个值。
var all = Observable.When(sum.And(sumSquared).And(Observable.CombineLatest(x, y)).Then((s, q, data)
=> new { Sum = s, SumSquared = q, X = data[0], Y = data[1] }));
这应该可以满足您的要求:
Observable.CombineLatest(x, y, sum)
.DistinctUntilChanged(list => list[2])
.Subscribe(list => Console.WriteLine("{0}+{1}={2}", list[0], list[1], list[2]));
它一直等到总和被更新,这意味着它的所有源也必须被更新。
你的问题不是因为 x
在总和 本身更新之前更新了。这实际上与您构建查询的方式有关。
您已经有效地创建了两个查询:Observable.CombineLatest(x, y, (num1, num2) => num1 + num2)
和 Observable.CombineLatest(x, y, sum, (xx, yy, sumsum) => new { X = xx, Y = yy, Sum = sumsum })
。由于您在每个订阅中都订阅了 x
,因此您创建了两个订阅。这意味着当 x
更新时会发生两次更新。
您需要避免创建两个订阅。
如果你这样写代码:
BehaviorSubject<int> x = new BehaviorSubject<int>(1);
BehaviorSubject<int> y = new BehaviorSubject<int>(2);
Observable
.CombineLatest(x, y, (num1, num2) => new
{
X = num1,
Y = num2,
Sum = num1 + num2
})
.Subscribe(i => Console.WriteLine($"X:{i.X} Y:{i.Y} Sum:{i.Sum}"));
x.OnNext(3);
...然后你正确地得到这个输出:
X:1 Y:2 Sum:3 X:3 Y:2 Sum:5
我已经开始思考这个问题了。这是我要完成的更详细的示例。这是一些验证名字和姓氏的代码,并且应该只在两个部分都有效时生成全名。如您所见,我正在尝试使用一堆独立定义的小函数,例如 "firstIsValid",然后将它们组合在一起以计算更复杂的东西。
我在这里面临的挑战似乎是尝试关联函数中的输入和输出。例如,"firstIsValid" 生成的输出表明某个名字有效,但不会告诉您是哪一个。在下面的选项 2 中,我可以使用 Zip 将它们关联起来。
如果验证函数不为每个输入生成一个输出,则此策略将不起作用。例如,如果用户正在输入网址,而我们正试图在网络上验证它们,也许我们会做一个 Throttle and/or Switch。单个 "webAddressIsValid" 可能有 10 个网址。在那种情况下,我认为我必须将输出与输入包括在一起。也许有一个 IObservable>,其中字符串是网址,布尔值是它是否有效。
static void Main(string[] args) {
var first = new BehaviorSubject<string>(null);
var last = new BehaviorSubject<string>(null);
var firstIsValid = first.Select(i => string.IsNullOrEmpty(i) || i.Length < 3 ? false : true);
var lastIsValid = last.Select(i => string.IsNullOrEmpty(i) || i.Length < 3 ? false : true);
// OPTION 1 : Does not work
// Output: bob smith, bob, bob roberts, roberts
// firstIsValid and lastIsValid are not in sync with first and last
//var whole = Observable
// .CombineLatest(first, firstIsValid, last, lastIsValid, (f, fv, l, lv) => new {
// First = f,
// Last = l,
// FirstIsValid = fv,
// LastIsValid = lv
// })
// .Where(i => i.FirstIsValid && i.LastIsValid)
// .Select(i => $"{i.First} {i.Last}");
// OPTION 2 : Works as long as every change in a core data value generates one calculated value
// Output: bob smith, bob robert
var firstValidity = Observable.Zip(first, firstIsValid, (f, fv) => new { Name = f, IsValid = fv });
var lastValidity = Observable.Zip(last, lastIsValid, (l, lv) => new { Name = l, IsValid = lv });
var whole =
Observable.CombineLatest(firstValidity, lastValidity, (f, l) => new { First = f, Last = l })
.Where(i => i.First.IsValid && i.Last.IsValid)
.Select(i => $"{i.First.Name} {i.Last.Name}");
whole.Subscribe(i => Console.WriteLine(i));
first.OnNext("bob");
last.OnNext("smith");
last.OnNext(null);
last.OnNext("roberts");
first.OnNext(null);
Console.ReadLine();
}
另一种方法。每个值都有一个版本号(如时间戳)。任何时候计算值早于数据(或它所依赖的其他计算值)我们都可以忽略它。
public class VersionedValue {
static long _version;
public VersionedValue() { Version = Interlocked.Increment(ref _version); }
public long Version { get; }
}
public class VersionedValue<T> : VersionedValue {
public VersionedValue(T value) { Value = value; }
public T Value { get; }
public override string ToString() => $"{Value} {Version}";
}
public static class ExtensionMethods {
public static IObservable<VersionedValue<T>> Versioned<T>(this IObservable<T> values) => values.Select(i => new VersionedValue<T>(i));
public static VersionedValue<T> AsVersionedValue<T>(this T obj) => new VersionedValue<T>(obj);
}
static void Main(string[] args) {
// same as before
//
var whole = Observable
.CombineLatest(first.Versioned(), firstIsValid.Versioned(), last.Versioned(), lastIsValid.Versioned(), (f, fv, l, lv) => new {
First = f,
Last = l,
FirstIsValid = fv,
LastIsValid = lv
})
.Where(i => i.FirstIsValid.Version > i.First.Version && i.LastIsValid.Version > i.Last.Version)
.Where(i => i.FirstIsValid.Value && i.LastIsValid.Value)
.Select(i => $"{i.First.Value} {i.Last.Value}");