将可观察数据聚合到多个桶中
Aggregating observable data into multiple buckets
我需要一个关于如何分发聚合更新的好主意...
假设我有一个 IObservable 的 Id 和一个产生永无止境的消息流(5-10,000/秒)的值。现在我想计算很多聚合(例如总和)
用于定期分发到其他系统 - 假设每个聚合每 10 秒一次。
聚合基于元组的 ID(字符串),但可能会落入多个聚合(聚合定义应包含哪些 ID - 因此会重叠)。
会有几千个聚合定义,所以有人知道如何解决这个问题吗?
概念上:
public struct Update
{
public string Id { get; }
public int Value { get; }
}
public class Aggregate
{
Dictionary<string, Update> latestValues = new Dictionary<string, Update>();
public void AddUpdate(Update update)
{
latestValues[update.Id] = update;
}
public int CalculateSum()
{
return latestValues.Values.Select(v => v.Value).Sum();
}
}
更新:
这个问题的目的是简化真正的问题 - 也许我没有做得那么好 - 抱歉。
假设我有多个产生温度的 IOT 设备并定期报告此温度(更新流)。然后不同的用户可以选择查看设备子集的聚合(例如平均)值。因此,一位客户可能希望查看设备 1、2 和 3 的平均值,而另一位客户可能希望查看设备 2、3 和 4 等的平均值(聚合定义)
我想你问的是如何使用 Rx 创建实时读取模型*。
鉴于我从你的问题中可以猜到的,我认为你希望能够用每条更新消息更新一些当前状态。对于您的 CalculateSum
方法,您不能只对所有消息的 Value
属性 求和,因为有些消息的目的是 update/override 现有值。
所以根据这个假设,看起来 GroupBy
会成为你的朋友。如果您首先将可观察到的值序列拆分为子序列,您可以分而治之。
input.GroupBy(i=>i.Id)
如果我们只考虑属于同一 ID 的单个值流,那么每个值的总和应该是多少?
-1--1--2-
在这个简单的例子中,答案总是直接通过的值。即
input -1--1--2-
result -1--1--2-
然而,当我们查看两个产生值的序列时,计算起来会稍微困难一些
inputA -1-1-2--------
inputB --1-2-2-3-5-2-
result -122344-5-7-4-
这里我们需要查看序列中每个值的增量,并将该增量推送到结果。可以这样形象化
inputA -1-1-2--------
delta -1-0-1--------
inputB --1-2-2-3-5-2-
delta --1-1-0-1-2-(-3)-
result -122344-5-7-4-
要创建这种增量投影,您可以编写类似
的内容
input.Scan(new { CurrentValue = 0, Delta = 0 }, (acc, cur) => new { CurrentValue = cur, Delta = cur - acc.CurrentValue }))
.Select(acc => acc.Delta);
将这些代码放在一起可能如下所示:
void Main()
{
var testScheduler = new TestScheduler();
var input = testScheduler.CreateColdObservable<Update>(
ReactiveTest.OnNext(010, new Update("a", 1)), //1
ReactiveTest.OnNext(020, new Update("b", 1)), //2
ReactiveTest.OnNext(030, new Update("c", 3)), //5
ReactiveTest.OnNext(040, new Update("a", 1)), //5
ReactiveTest.OnNext(050, new Update("b", 2)), //6
ReactiveTest.OnNext(060, new Update("a", 2)), //7
ReactiveTest.OnNext(070, new Update("b", 2)), //7
ReactiveTest.OnNext(080, new Update("b", 3)), //8
ReactiveTest.OnNext(090, new Update("b", 5)), //10
ReactiveTest.OnNext(100, new Update("b", 2)) //7
);
var currentSum = input.GroupBy(i => i.Id)
.SelectMany(grp => grp.Scan(new { CurrentValue = 0, Delta = 0 }, (acc, cur) => new { CurrentValue = cur.Value, Delta = cur.Value - acc.CurrentValue }))
.Select(acc => acc.Delta)
.Scan((acc, cur) => acc + cur);
var observer = testScheduler.CreateObserver<int>();
var subscription = currentSum.Subscribe(observer);
testScheduler.Start();
subscription.Dispose();
ReactiveAssert.AreElementsEqual(new[]
{
ReactiveTest.OnNext(010, 1),
ReactiveTest.OnNext(020, 2),
ReactiveTest.OnNext(030, 5),
ReactiveTest.OnNext(040, 5),
ReactiveTest.OnNext(050, 6),
ReactiveTest.OnNext(060, 7),
ReactiveTest.OnNext(070, 7),
ReactiveTest.OnNext(080, 8),
ReactiveTest.OnNext(090, 10),
ReactiveTest.OnNext(100, 7)}
,
observer.Messages);
}
// Define other methods and classes here
public struct Update
{
public Update(string id, int value)
{
Id = id;
Value = value;
}
public string Id { get; }
public int Value { get; }
}
如果你想创建多个聚合,那么每个新聚合就像上面那样只是一个查询。您可以在分组后通过 sharing/publishing 优化序列,但我首先要确保这是分析所必需的。
*阅读 CQRS/ES 术语中的模型。
我需要一个关于如何分发聚合更新的好主意...
假设我有一个 IObservable 的 Id 和一个产生永无止境的消息流(5-10,000/秒)的值。现在我想计算很多聚合(例如总和) 用于定期分发到其他系统 - 假设每个聚合每 10 秒一次。 聚合基于元组的 ID(字符串),但可能会落入多个聚合(聚合定义应包含哪些 ID - 因此会重叠)。
会有几千个聚合定义,所以有人知道如何解决这个问题吗?
概念上:
public struct Update
{
public string Id { get; }
public int Value { get; }
}
public class Aggregate
{
Dictionary<string, Update> latestValues = new Dictionary<string, Update>();
public void AddUpdate(Update update)
{
latestValues[update.Id] = update;
}
public int CalculateSum()
{
return latestValues.Values.Select(v => v.Value).Sum();
}
}
更新:
这个问题的目的是简化真正的问题 - 也许我没有做得那么好 - 抱歉。 假设我有多个产生温度的 IOT 设备并定期报告此温度(更新流)。然后不同的用户可以选择查看设备子集的聚合(例如平均)值。因此,一位客户可能希望查看设备 1、2 和 3 的平均值,而另一位客户可能希望查看设备 2、3 和 4 等的平均值(聚合定义)
我想你问的是如何使用 Rx 创建实时读取模型*。
鉴于我从你的问题中可以猜到的,我认为你希望能够用每条更新消息更新一些当前状态。对于您的 CalculateSum
方法,您不能只对所有消息的 Value
属性 求和,因为有些消息的目的是 update/override 现有值。
所以根据这个假设,看起来 GroupBy
会成为你的朋友。如果您首先将可观察到的值序列拆分为子序列,您可以分而治之。
input.GroupBy(i=>i.Id)
如果我们只考虑属于同一 ID 的单个值流,那么每个值的总和应该是多少?
-1--1--2-
在这个简单的例子中,答案总是直接通过的值。即
input -1--1--2-
result -1--1--2-
然而,当我们查看两个产生值的序列时,计算起来会稍微困难一些
inputA -1-1-2--------
inputB --1-2-2-3-5-2-
result -122344-5-7-4-
这里我们需要查看序列中每个值的增量,并将该增量推送到结果。可以这样形象化
inputA -1-1-2--------
delta -1-0-1--------
inputB --1-2-2-3-5-2-
delta --1-1-0-1-2-(-3)-
result -122344-5-7-4-
要创建这种增量投影,您可以编写类似
的内容input.Scan(new { CurrentValue = 0, Delta = 0 }, (acc, cur) => new { CurrentValue = cur, Delta = cur - acc.CurrentValue }))
.Select(acc => acc.Delta);
将这些代码放在一起可能如下所示:
void Main()
{
var testScheduler = new TestScheduler();
var input = testScheduler.CreateColdObservable<Update>(
ReactiveTest.OnNext(010, new Update("a", 1)), //1
ReactiveTest.OnNext(020, new Update("b", 1)), //2
ReactiveTest.OnNext(030, new Update("c", 3)), //5
ReactiveTest.OnNext(040, new Update("a", 1)), //5
ReactiveTest.OnNext(050, new Update("b", 2)), //6
ReactiveTest.OnNext(060, new Update("a", 2)), //7
ReactiveTest.OnNext(070, new Update("b", 2)), //7
ReactiveTest.OnNext(080, new Update("b", 3)), //8
ReactiveTest.OnNext(090, new Update("b", 5)), //10
ReactiveTest.OnNext(100, new Update("b", 2)) //7
);
var currentSum = input.GroupBy(i => i.Id)
.SelectMany(grp => grp.Scan(new { CurrentValue = 0, Delta = 0 }, (acc, cur) => new { CurrentValue = cur.Value, Delta = cur.Value - acc.CurrentValue }))
.Select(acc => acc.Delta)
.Scan((acc, cur) => acc + cur);
var observer = testScheduler.CreateObserver<int>();
var subscription = currentSum.Subscribe(observer);
testScheduler.Start();
subscription.Dispose();
ReactiveAssert.AreElementsEqual(new[]
{
ReactiveTest.OnNext(010, 1),
ReactiveTest.OnNext(020, 2),
ReactiveTest.OnNext(030, 5),
ReactiveTest.OnNext(040, 5),
ReactiveTest.OnNext(050, 6),
ReactiveTest.OnNext(060, 7),
ReactiveTest.OnNext(070, 7),
ReactiveTest.OnNext(080, 8),
ReactiveTest.OnNext(090, 10),
ReactiveTest.OnNext(100, 7)}
,
observer.Messages);
}
// Define other methods and classes here
public struct Update
{
public Update(string id, int value)
{
Id = id;
Value = value;
}
public string Id { get; }
public int Value { get; }
}
如果你想创建多个聚合,那么每个新聚合就像上面那样只是一个查询。您可以在分组后通过 sharing/publishing 优化序列,但我首先要确保这是分析所必需的。
*阅读 CQRS/ES 术语中的模型。