C# Reactive Extensions - 订阅聚合流
C# Reactive Extensions - Subscribe to stream of aggregation
我有一连串的点,想将每 2 个点组合起来画一条线。
public class MyPoint
{
public int X { get; set; }
public int Y { get; set; }
}
我正在寻找可以结合 Aggregate 和 Select 功能的东西,这意味着我想稍后订阅结合这两个点的复杂类型的以太币,或者接收一个聚合作为参数致我的 Observer 的 OnNext 代表:
类似于:
pointObservable.Subscribe((prev, curr) => { });
或
pointObservable.Subscribe((myLineStruct) => { });
构建样本:
List<MyPoint> points = new List<MyPoint>();
for (int i = 0; i < 10; i++)
{
points.Add(new MyPoint{ X = i , Y = i * 10});
}
IObservable<MyPoint> pointObservable = points.ToObservable();
在尝试了 2 种解决方案后,我遇到了一些问题:
首先是我的实际流:
observable // Stream of 250 points arriving every interval
.Take(_xmax + 10) // for test purposes take only the Graph + 10
.Select(NormalizeSampleByX) // Nomalize X ( override Real X with display X (
.Scan(new PlotterEcgSample(-1, 0), MergeSamplesWithDistinctX) // which returns current only if current.X > prev.X
.DistinctUntilChanged() // remove all redundant previous points elements
// here i end up with a stream of normalized points
.Zip(observable.Skip(1), (prev, curr) => new {Prev = prev, Curr = curr})
// Dmitry Ledentsov 's addition
.Subscribe(res =>
{
Debug.WriteLine(" {0} {1} , {2} {3}", res.Prev.X, res.Prev.Y, res.Curr.X , res.Curr.Y);
});
加上德米特里,我得到了以下结果。
0 862 , 252 -21
1 888 , 253 -24
2 908 , 254 -28
3 931 , 255 -31
4 941 , 256 -35
5 890 , 257 -38
6 802 , 258 -41
7 676 , 259 -44
8 491 , 260 -48
9 289 , 261 -51
10 231 , 262 -55
@Enigmativity 的建议:
observable.Take(_xmax + 10)
.Select(NormalizeSample)
.Scan(new PlotterEcgSample(-1, 0), MergeSamplesWithDistinctX)
.DistinctUntilChanged()
.Publish(obs => obs.Zip(observable.Skip(1), (prev, curr) => new {Prev = prev, Curr = curr}))
.Subscribe(res =>
{
Debug.WriteLine(" {0} {1} , {2} {3}", res.Prev.X, res.Prev.Y, res.Curr.X , res.Curr.Y);
});
结果:
59 862 , 1 -21
60 867 , 2 -24
61 893 , 3 -28
62 912 , 4 -31
63 937 , 5 -35
64 937 , 6 -38
65 870 , 7 -41
66 777 , 8 -44
67 632 , 9 -48
68 444 , 10 -51
69 289 , 11 -55
...
...
最简单的方法可能是用移位的原始序列压缩序列:
var res = pointObservable.Zip(
pointObservable.Skip(1),
(p1, p2) => new { A = p1, B = p2 }
);
res.Subscribe(Console.WriteLine);
导致
{ A = (0,0), B = (1,10) }
{ A = (1,10), B = (2,20) }
{ A = (2,20), B = (3,30) }
{ A = (3,30), B = (4,40) }
{ A = (4,40), B = (5,50) }
{ A = (5,50), B = (6,60) }
{ A = (6,60), B = (7,70) }
{ A = (7,70), B = (8,80) }
{ A = (8,80), B = (9,90) }
给定 ToString
方法 MyPoint
更新:
根据评论,为了避免对订阅产生不必要的副作用,原始序列必须在压缩前 Publish
ed。因此, 使用 Scan
可能是您应该使用的。
使用詹姆斯的 CombineWithPrevious
:
var res = pointObservable
.CombineWithPrevious((p1, p2) => new { A = p1, B = p2 })
.Skip(1);
给出相同的结果
或 Engimativity 的更简洁版本:
var res = pointObservable
.Publish(po =>
po.Zip(
po.Skip(1),
(p1, p2) => new { A = p1, B = p2 }
)
);
你想要Scan
:
points
.Scan((LineSegment)null, (prev, point) => new LineSegment(prev == null ? point : prev.End, point))
.Skip(1) // skip the first line segment which will not be valid
.Subscribe(lineSegment => ... );
Observable.Scan
是折叠或比较当前和先前项目的最简单方法。我写了这篇博文 here with some nice diagrams。这是那篇文章的代码,其中有一个具体的例子。扩展方法非常灵活,它适用于任何源和结果类型:
public static IObservable<TResult> CombineWithPrevious<TSource,TResult>(
this IObservable<TSource> source,
Func<TSource, TSource, TResult> resultSelector)
{
return source.Scan(
Tuple.Create(default(TSource), default(TSource)),
(previous, current) => Tuple.Create(previous.Item2, current))
.Select(t => resultSelector(t.Item1, t.Item2));
}
因此,如果您有像这样的复杂类型 Delta:
public class Delta
{
public Point P1 { get;set; }
public Point P2 { get;set; }
public static Delta Create(Point P1, Point P2)
{
return new Delta {
P1 = P1,
P2 = P2
};
}
public override string ToString()
{
return string.Format("Delta is (" + (P2.X - P1.X)
+ "," + (P2.Y - P1.Y) + ")");
}
}
您可以如下使用:
Subject<Point> ps = new Subject<Point>();
ps.CombineWithPrevious(Delta.Create)
.Subscribe(d => Console.WriteLine(d));
ps.OnNext(new Point(1,1));
ps.OnNext(new Point(2,2));
ps.OnNext(new Point(2,3));
您的输出将是:
Delta is (0,0)
Delta is (1,1)
Delta is (1,1)
Delta is (2,3)
请注意 default(TSource)
用于设置初始默认值 - 您可以轻松修改它以指定初始默认值,或者在结果选择器中处理它,或者跳过第一个元素等(.Skip(1)
) - 有很多选择。
我有一连串的点,想将每 2 个点组合起来画一条线。
public class MyPoint
{
public int X { get; set; }
public int Y { get; set; }
}
我正在寻找可以结合 Aggregate 和 Select 功能的东西,这意味着我想稍后订阅结合这两个点的复杂类型的以太币,或者接收一个聚合作为参数致我的 Observer 的 OnNext 代表:
类似于:
pointObservable.Subscribe((prev, curr) => { });
或
pointObservable.Subscribe((myLineStruct) => { });
构建样本:
List<MyPoint> points = new List<MyPoint>();
for (int i = 0; i < 10; i++)
{
points.Add(new MyPoint{ X = i , Y = i * 10});
}
IObservable<MyPoint> pointObservable = points.ToObservable();
在尝试了 2 种解决方案后,我遇到了一些问题:
首先是我的实际流:
observable // Stream of 250 points arriving every interval
.Take(_xmax + 10) // for test purposes take only the Graph + 10
.Select(NormalizeSampleByX) // Nomalize X ( override Real X with display X (
.Scan(new PlotterEcgSample(-1, 0), MergeSamplesWithDistinctX) // which returns current only if current.X > prev.X
.DistinctUntilChanged() // remove all redundant previous points elements
// here i end up with a stream of normalized points
.Zip(observable.Skip(1), (prev, curr) => new {Prev = prev, Curr = curr})
// Dmitry Ledentsov 's addition
.Subscribe(res =>
{
Debug.WriteLine(" {0} {1} , {2} {3}", res.Prev.X, res.Prev.Y, res.Curr.X , res.Curr.Y);
});
加上德米特里,我得到了以下结果。
0 862 , 252 -21
1 888 , 253 -24
2 908 , 254 -28
3 931 , 255 -31
4 941 , 256 -35
5 890 , 257 -38
6 802 , 258 -41
7 676 , 259 -44
8 491 , 260 -48
9 289 , 261 -51
10 231 , 262 -55
@Enigmativity 的建议:
observable.Take(_xmax + 10)
.Select(NormalizeSample)
.Scan(new PlotterEcgSample(-1, 0), MergeSamplesWithDistinctX)
.DistinctUntilChanged()
.Publish(obs => obs.Zip(observable.Skip(1), (prev, curr) => new {Prev = prev, Curr = curr}))
.Subscribe(res =>
{
Debug.WriteLine(" {0} {1} , {2} {3}", res.Prev.X, res.Prev.Y, res.Curr.X , res.Curr.Y);
});
结果:
59 862 , 1 -21
60 867 , 2 -24
61 893 , 3 -28
62 912 , 4 -31
63 937 , 5 -35
64 937 , 6 -38
65 870 , 7 -41
66 777 , 8 -44
67 632 , 9 -48
68 444 , 10 -51
69 289 , 11 -55
...
...
最简单的方法可能是用移位的原始序列压缩序列:
var res = pointObservable.Zip(
pointObservable.Skip(1),
(p1, p2) => new { A = p1, B = p2 }
);
res.Subscribe(Console.WriteLine);
导致
{ A = (0,0), B = (1,10) }
{ A = (1,10), B = (2,20) }
{ A = (2,20), B = (3,30) }
{ A = (3,30), B = (4,40) }
{ A = (4,40), B = (5,50) }
{ A = (5,50), B = (6,60) }
{ A = (6,60), B = (7,70) }
{ A = (7,70), B = (8,80) }
{ A = (8,80), B = (9,90) }
给定 ToString
方法 MyPoint
更新:
根据评论,为了避免对订阅产生不必要的副作用,原始序列必须在压缩前 Publish
ed。因此,Scan
可能是您应该使用的。
使用詹姆斯的 CombineWithPrevious
:
var res = pointObservable
.CombineWithPrevious((p1, p2) => new { A = p1, B = p2 })
.Skip(1);
给出相同的结果
或 Engimativity 的更简洁版本:
var res = pointObservable
.Publish(po =>
po.Zip(
po.Skip(1),
(p1, p2) => new { A = p1, B = p2 }
)
);
你想要Scan
:
points
.Scan((LineSegment)null, (prev, point) => new LineSegment(prev == null ? point : prev.End, point))
.Skip(1) // skip the first line segment which will not be valid
.Subscribe(lineSegment => ... );
Observable.Scan
是折叠或比较当前和先前项目的最简单方法。我写了这篇博文 here with some nice diagrams。这是那篇文章的代码,其中有一个具体的例子。扩展方法非常灵活,它适用于任何源和结果类型:
public static IObservable<TResult> CombineWithPrevious<TSource,TResult>(
this IObservable<TSource> source,
Func<TSource, TSource, TResult> resultSelector)
{
return source.Scan(
Tuple.Create(default(TSource), default(TSource)),
(previous, current) => Tuple.Create(previous.Item2, current))
.Select(t => resultSelector(t.Item1, t.Item2));
}
因此,如果您有像这样的复杂类型 Delta:
public class Delta
{
public Point P1 { get;set; }
public Point P2 { get;set; }
public static Delta Create(Point P1, Point P2)
{
return new Delta {
P1 = P1,
P2 = P2
};
}
public override string ToString()
{
return string.Format("Delta is (" + (P2.X - P1.X)
+ "," + (P2.Y - P1.Y) + ")");
}
}
您可以如下使用:
Subject<Point> ps = new Subject<Point>();
ps.CombineWithPrevious(Delta.Create)
.Subscribe(d => Console.WriteLine(d));
ps.OnNext(new Point(1,1));
ps.OnNext(new Point(2,2));
ps.OnNext(new Point(2,3));
您的输出将是:
Delta is (0,0)
Delta is (1,1)
Delta is (1,1)
Delta is (2,3)
请注意 default(TSource)
用于设置初始默认值 - 您可以轻松修改它以指定初始默认值,或者在结果选择器中处理它,或者跳过第一个元素等(.Skip(1)
) - 有很多选择。