BehaviorSubject 元素的顺序与预期输出相反

Order of BehaviorSubject elements is reversing expected output

我有以下代码创建事件,在下面解释的情况下,从后到前发出:

var feed = new Subject<double>();
var levels = new BehaviorSubject<double[]>(new[] { 400.0, 500.0, 600.0, 700.0 });

levels
    .Select(thresholds => feed
        .Buffer(2, 1)
        .Where(x => x.Count == 2)
        .Select(x => new { LevelsCrossed = thresholds.GetCrossovers(x[0], x[1]), Previous = x[0], Current = x[1] })
        .Where(x => x.LevelsCrossed.Any())
        .SelectMany(x => x.LevelsCrossed.Select(level => new ThresholdCrossedEvent(level, x.Previous, x.Current))))
    .Switch()
    .DistinctUntilChanged(x => x.Threshold)
    .Subscribe(x => Console.WriteLine(JsonConvert.SerializeObject(x)));

feed.OnNext(520.0);
feed.OnNext(400.0);
feed.OnNext(450.0);
feed.OnNext(650.0);  

交叉检测扩展如下:

public static class ThresholdExtensions
{
    public static IEnumerable<double> GetCrossovers(this double[] self, double previous, double current)
    {
        return self
            .Where(level => level >= previous && level <= current || level <= previous && level >= current);
    }
}

前2个发射对象顺序,第2个发射对象向后

// the order of first 2 emitted objects should be reversed
{"Threshold":400.0,"Previous":520.0,"Current":400.0,"SlopeDirection":-1}
{"Threshold":500.0,"Previous":520.0,"Current":400.0,"SlopeDirection":-1}

// the below ordered is OK
{"Threshold":400.0,"Previous":400.0,"Current":450.0,"SlopeDirection":1}
{"Threshold":500.0,"Previous":450.0,"Current":650.0,"SlopeDirection":1}
{"Threshold":600.0,"Previous":450.0,"Current":650.0,"SlopeDirection":1}

前 2 个元素的反转输出与级别的顺序有关 BehaviorSubject - 但反转此顺序会导致最后 3 个元素反转。

如何修改 DetectThresholds 以在传入的 feed 穿过时以正确的顺序发射对象?

上面的工作示例如下:

using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace ConsoleApp1
{
    public class ThresholdCrossedEvent
    {
        public ThresholdCrossedEvent(double level, double previous, double current)
        {

            Threshold = level;
            Previous = previous;
            Current = current;
        }

        public double Threshold { get; set; }
        public double Previous { get; set; }
        public double Current { get; set; }
        public int SlopeDirection => Current >= Previous ? 1 : -1;
    }

    public static class ThresholdExtensions
    {
        public static IEnumerable<double> GetCrossovers(this double[] self, double previous, double current)
        {
            return self
                .Where(level => level >= previous && level <= current || level <= previous && level >= current);
        }
    }

    class Program
    {

        static void Main(string[] args)
        {
            var feed = new Subject<double>();
            var levels = new BehaviorSubject<double[]>(new[] { 400.0, 500.0, 600.0, 700.0 });

            levels
                .Select(thresholds => feed
                    .Buffer(2, 1)
                    .Where(x => x.Count == 2)
                    .Select(x => new { LevelsCrossed = thresholds.GetCrossovers(x[0], x[1]), Previous = x[0], Current = x[1] })
                    .Where(x => x.LevelsCrossed.Any())
                    .SelectMany(x => x.LevelsCrossed.Select(level => new ThresholdCrossedEvent(level, x.Previous, x.Current))))
                .Switch()
                .DistinctUntilChanged(x => x.Threshold)
                .Subscribe(x => Console.WriteLine(JsonConvert.SerializeObject(x)));

            feed.OnNext(520.0);
            feed.OnNext(400.0);
            feed.OnNext(450.0);
            feed.OnNext(650.0);            

            Console.ReadKey();
        }
    }
}

订购交叉 LevelsCrossed = thresholds.GetCrossovers(x[0], x[1]).OrderByWithDirection(x => x, x[0] > x[1]) 需要以下内容:

static class OrderByExtensions
{
    public static IOrderedEnumerable<TSource> OrderByWithDirection<TSource, TKey>
           (this IEnumerable<TSource> source,
            Func<TSource, TKey> keySelector,
            bool descending)
    {
        return descending ? source.OrderByDescending(keySelector)
                          : source.OrderBy(keySelector);
    }

    public static IOrderedQueryable<TSource> OrderByWithDirection<TSource, TKey>
        (this IQueryable<TSource> source,
         Expression<Func<TSource, TKey>> keySelector,
         bool descending)
    {
        return descending ? source.OrderByDescending(keySelector)
                          : source.OrderBy(keySelector);
    }
}

levels
    .Select(thresholds => feed
        .Buffer(2, 1)
        .Where(x => x.Count == 2)
        .Select(x => new {
                       LevelsCrossed = thresholds.GetCrossovers(x[0], x[1]).OrderByWithDirection(x => x, x[0] > x[1]),
                       Previous = x[0],
                       Current = x[1] })                    
        .Where(x => x.LevelsCrossed.Any())
        .SelectMany(x => x.LevelsCrossed.Select(level => new ThresholdCrossedEvent(level, x.Previous, x.Current))))
     .Switch()
     .DistinctUntilChanged(x => x.Threshold)
     .Subscribe(x => Console.WriteLine(JsonConvert.SerializeObject(x)));

使用 { 400.0, 500.0, 600.0, 700.0 } 的级别输出现在是正确的:

{"Threshold":500.0,"Previous":520.0,"Current":400.0,"SlopeDirection":-1}
{"Threshold":400.0,"Previous":520.0,"Current":400.0,"SlopeDirection":-1}
{"Threshold":500.0,"Previous":450.0,"Current":750.0,"SlopeDirection":1}
{"Threshold":600.0,"Previous":450.0,"Current":750.0,"SlopeDirection":1}
{"Threshold":700.0,"Previous":450.0,"Current":750.0,"SlopeDirection":1}

下面的例子:

using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace ConsoleApp1
{
    public class ThresholdCrossedEvent
    {
        public ThresholdCrossedEvent(double level, double previous, double current)
        {

            Threshold = level;
            Previous = previous;
            Current = current;
        }

        public double Threshold { get; set; }
        public double Previous { get; set; }
        public double Current { get; set; }
        public int SlopeDirection => Current >= Previous ? 1 : -1;
    }

    public static class ThresholdExtensions
    {
        public static IEnumerable<double> GetCrossovers(this double[] self, double previous, double current)
        {
            return self
                .Where(level => level >= previous && level <= current || level <= previous && level >= current);
        }
    }

    static class OrderByExtensions
    {
        public static IOrderedEnumerable<TSource> OrderByWithDirection<TSource, TKey>
               (this IEnumerable<TSource> source,
                Func<TSource, TKey> keySelector,
                bool descending)
        {
            return descending ? source.OrderByDescending(keySelector)
                              : source.OrderBy(keySelector);
        }

        public static IOrderedQueryable<TSource> OrderByWithDirection<TSource, TKey>
            (this IQueryable<TSource> source,
             Expression<Func<TSource, TKey>> keySelector,
             bool descending)
        {
            return descending ? source.OrderByDescending(keySelector)
                              : source.OrderBy(keySelector);
        }
    }

    class Program
    {
   
        static void Main(string[] args)
        {
            var feed = new Subject<double>();
            var levels = new BehaviorSubject<double[]>(new[] { 400.0, 500.0, 600.0, 700.0 });

            levels
                .Select(thresholds => feed
                    .Buffer(2, 1)
                    .Where(x => x.Count == 2)
                    .Select(x => new { LevelsCrossed = thresholds.GetCrossovers(x[0], x[1]).OrderByWithDirection(x => x, x[0] > x[1]), Previous = x[0], Current = x[1] })                    
                    .Where(x => x.LevelsCrossed.Any())
                    .SelectMany(x => x.LevelsCrossed.Select(level => new ThresholdCrossedEvent(level, x.Previous, x.Current))))
                .Switch()
                .DistinctUntilChanged(x => x.Threshold)
                .Subscribe(x => Console.WriteLine(JsonConvert.SerializeObject(x)));

            feed.OnNext(520.0);
            feed.OnNext(400.0);
            feed.OnNext(450.0);
            feed.OnNext(750.0);            

            Console.ReadKey();
        }
    }
}