C# Reactive Extensions - 订阅聚合流

C# Reactive Extensions - Subscribe to stream of aggregation

我有一连串的点,想将每 2 个点组合起来画一条线。

public class MyPoint
{
    public int X { get; set; }
    public int Y { get; set; }
}

我正在寻找可以结合 Aggregate 和 Select 功能的东西,这意味着我想稍后订阅结合这两个点的复杂类型的以太币,或者接收一个聚合作为参数致我的 Observer 的 OnNext 代表:

类似于:

    pointObservable.Subscribe((prev, curr) => { }); 

    pointObservable.Subscribe((myLineStruct) => { }); 

构建样本:

  List<MyPoint> points = new List<MyPoint>();

  for (int i = 0; i < 10; i++)
  {
       points.Add(new MyPoint{ X = i , Y = i * 10});
  }

  IObservable<MyPoint> pointObservable = points.ToObservable();

在尝试了 2 种解决方案后,我遇到了一些问题:

首先是我的实际流:

observable  // Stream of 250 points arriving every interval 
     .Take(_xmax + 10)   // for test purposes take only the Graph + 10 
     .Select(NormalizeSampleByX) // Nomalize X  ( override Real X with display X (
     .Scan(new PlotterEcgSample(-1, 0), MergeSamplesWithDistinctX)  // which returns current only if current.X > prev.X 
     .DistinctUntilChanged() // remove all redundant previous points elements 

     // here i end up with a stream of normalized points 

     .Zip(observable.Skip(1), (prev, curr) => new {Prev = prev, Curr = curr})
             // Dmitry Ledentsov 's addition  
            .Subscribe(res =>
            {
               Debug.WriteLine(" {0} {1}  , {2} {3}", res.Prev.X, res.Prev.Y, res.Curr.X , res.Curr.Y);   
            });

加上德米特里,我得到了以下结果。

0 862  , 252 -21 
1 888  , 253 -24 
2 908  , 254 -28 
3 931  , 255 -31 
4 941  , 256 -35 
5 890  , 257 -38 
6 802  , 258 -41 
7 676  , 259 -44 
8 491  , 260 -48 
9 289  , 261 -51 
10 231  , 262 -55 

@Enigmativity 的建议:

 observable.Take(_xmax + 10)
            .Select(NormalizeSample)
            .Scan(new PlotterEcgSample(-1, 0), MergeSamplesWithDistinctX)
            .DistinctUntilChanged()
            .Publish(obs => obs.Zip(observable.Skip(1), (prev, curr) => new {Prev = prev, Curr = curr}))
            .Subscribe(res =>
            {
               Debug.WriteLine(" {0} {1}  , {2} {3}", res.Prev.X, res.Prev.Y, res.Curr.X , res.Curr.Y);   
            });

结果:

 59 862  , 1 -21 
 60 867  , 2 -24 
 61 893  , 3 -28 
 62 912  , 4 -31 
 63 937  , 5 -35 
 64 937  , 6 -38 
 65 870  , 7 -41 
 66 777  , 8 -44 
 67 632  , 9 -48 
 68 444  , 10 -51 
 69 289  , 11 -55  
 ...
 ...

最简单的方法可能是用移位的原始序列压缩序列:

var res = pointObservable.Zip(
    pointObservable.Skip(1),
    (p1, p2) => new { A = p1, B = p2 }
);

res.Subscribe(Console.WriteLine);

导致

{ A = (0,0), B = (1,10) }
{ A = (1,10), B = (2,20) }
{ A = (2,20), B = (3,30) }
{ A = (3,30), B = (4,40) }
{ A = (4,40), B = (5,50) }
{ A = (5,50), B = (6,60) }
{ A = (6,60), B = (7,70) }
{ A = (7,70), B = (8,80) }
{ A = (8,80), B = (9,90) }

给定 ToString 方法 MyPoint

更新:

根据评论,为了避免对订阅产生不必要的副作用,原始序列必须在压缩前 Published。因此, 使用 Scan 可能是您应该使用的。

使用詹姆斯的 CombineWithPrevious:

var res = pointObservable
    .CombineWithPrevious((p1, p2) => new { A = p1, B = p2 })
    .Skip(1);

给出相同的结果

或 Engimativity 的更简洁版本:

var res = pointObservable
    .Publish(po => 
        po.Zip(
            po.Skip(1),
            (p1, p2) => new { A = p1, B = p2 }
        )
    );

你想要Scan:

points
    .Scan((LineSegment)null, (prev, point) => new LineSegment(prev == null ? point : prev.End, point))
    .Skip(1) // skip the first line segment which will not be valid
    .Subscribe(lineSegment => ... );

Observable.Scan 是折叠或比较当前和先前项目的最简单方法。我写了这篇博文 here with some nice diagrams。这是那篇文章的代码,其中有一个具体的例子。扩展方法非常灵活,它适用于任何源和结果类型:

public static IObservable<TResult> CombineWithPrevious<TSource,TResult>(
    this IObservable<TSource> source,
    Func<TSource, TSource, TResult> resultSelector)
{
    return source.Scan(
        Tuple.Create(default(TSource), default(TSource)),
        (previous, current) => Tuple.Create(previous.Item2, current))
        .Select(t => resultSelector(t.Item1, t.Item2));
}

因此,如果您有像这样的复杂类型 Delta:

public class Delta
{
    public Point P1 { get;set; }
    public Point P2 { get;set; }

    public static Delta Create(Point P1, Point P2)
    {
        return new Delta {
            P1 = P1,
            P2 = P2
        };
    }

    public override string ToString()
    {
        return string.Format("Delta is (" + (P2.X - P1.X)
            + "," + (P2.Y - P1.Y) + ")");
    }
}

您可以如下使用:

Subject<Point> ps = new Subject<Point>();

ps.CombineWithPrevious(Delta.Create)
  .Subscribe(d => Console.WriteLine(d));

ps.OnNext(new Point(1,1));
ps.OnNext(new Point(2,2));
ps.OnNext(new Point(2,3));

您的输出将是:

Delta is (0,0)
Delta is (1,1)
Delta is (1,1)
Delta is (2,3)

请注意 default(TSource) 用于设置初始默认值 - 您可以轻松修改它以指定初始默认值,或者在结果选择器中处理它,或者跳过第一个元素等(.Skip(1)) - 有很多选择。