BehaviorSubject 元素的顺序与预期输出相反
Order of BehaviorSubject elements is reversing expected output
我有以下代码创建事件,在下面解释的情况下,从后到前发出:
var feed = new Subject<double>();
var levels = new BehaviorSubject<double[]>(new[] { 400.0, 500.0, 600.0, 700.0 });
levels
.Select(thresholds => feed
.Buffer(2, 1)
.Where(x => x.Count == 2)
.Select(x => new { LevelsCrossed = thresholds.GetCrossovers(x[0], x[1]), Previous = x[0], Current = x[1] })
.Where(x => x.LevelsCrossed.Any())
.SelectMany(x => x.LevelsCrossed.Select(level => new ThresholdCrossedEvent(level, x.Previous, x.Current))))
.Switch()
.DistinctUntilChanged(x => x.Threshold)
.Subscribe(x => Console.WriteLine(JsonConvert.SerializeObject(x)));
feed.OnNext(520.0);
feed.OnNext(400.0);
feed.OnNext(450.0);
feed.OnNext(650.0);
交叉检测扩展如下:
public static class ThresholdExtensions
{
public static IEnumerable<double> GetCrossovers(this double[] self, double previous, double current)
{
return self
.Where(level => level >= previous && level <= current || level <= previous && level >= current);
}
}
前2个发射对象顺序,第2个发射对象向后
// the order of first 2 emitted objects should be reversed
{"Threshold":400.0,"Previous":520.0,"Current":400.0,"SlopeDirection":-1}
{"Threshold":500.0,"Previous":520.0,"Current":400.0,"SlopeDirection":-1}
// the below ordered is OK
{"Threshold":400.0,"Previous":400.0,"Current":450.0,"SlopeDirection":1}
{"Threshold":500.0,"Previous":450.0,"Current":650.0,"SlopeDirection":1}
{"Threshold":600.0,"Previous":450.0,"Current":650.0,"SlopeDirection":1}
前 2 个元素的反转输出与级别的顺序有关 BehaviorSubject
- 但反转此顺序会导致最后 3 个元素反转。
如何修改 DetectThresholds
以在传入的 feed 穿过时以正确的顺序发射对象?
上面的工作示例如下:
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
namespace ConsoleApp1
{
public class ThresholdCrossedEvent
{
public ThresholdCrossedEvent(double level, double previous, double current)
{
Threshold = level;
Previous = previous;
Current = current;
}
public double Threshold { get; set; }
public double Previous { get; set; }
public double Current { get; set; }
public int SlopeDirection => Current >= Previous ? 1 : -1;
}
public static class ThresholdExtensions
{
public static IEnumerable<double> GetCrossovers(this double[] self, double previous, double current)
{
return self
.Where(level => level >= previous && level <= current || level <= previous && level >= current);
}
}
class Program
{
static void Main(string[] args)
{
var feed = new Subject<double>();
var levels = new BehaviorSubject<double[]>(new[] { 400.0, 500.0, 600.0, 700.0 });
levels
.Select(thresholds => feed
.Buffer(2, 1)
.Where(x => x.Count == 2)
.Select(x => new { LevelsCrossed = thresholds.GetCrossovers(x[0], x[1]), Previous = x[0], Current = x[1] })
.Where(x => x.LevelsCrossed.Any())
.SelectMany(x => x.LevelsCrossed.Select(level => new ThresholdCrossedEvent(level, x.Previous, x.Current))))
.Switch()
.DistinctUntilChanged(x => x.Threshold)
.Subscribe(x => Console.WriteLine(JsonConvert.SerializeObject(x)));
feed.OnNext(520.0);
feed.OnNext(400.0);
feed.OnNext(450.0);
feed.OnNext(650.0);
Console.ReadKey();
}
}
}
订购交叉 LevelsCrossed = thresholds.GetCrossovers(x[0], x[1]).OrderByWithDirection(x => x, x[0] > x[1])
需要以下内容:
static class OrderByExtensions
{
public static IOrderedEnumerable<TSource> OrderByWithDirection<TSource, TKey>
(this IEnumerable<TSource> source,
Func<TSource, TKey> keySelector,
bool descending)
{
return descending ? source.OrderByDescending(keySelector)
: source.OrderBy(keySelector);
}
public static IOrderedQueryable<TSource> OrderByWithDirection<TSource, TKey>
(this IQueryable<TSource> source,
Expression<Func<TSource, TKey>> keySelector,
bool descending)
{
return descending ? source.OrderByDescending(keySelector)
: source.OrderBy(keySelector);
}
}
levels
.Select(thresholds => feed
.Buffer(2, 1)
.Where(x => x.Count == 2)
.Select(x => new {
LevelsCrossed = thresholds.GetCrossovers(x[0], x[1]).OrderByWithDirection(x => x, x[0] > x[1]),
Previous = x[0],
Current = x[1] })
.Where(x => x.LevelsCrossed.Any())
.SelectMany(x => x.LevelsCrossed.Select(level => new ThresholdCrossedEvent(level, x.Previous, x.Current))))
.Switch()
.DistinctUntilChanged(x => x.Threshold)
.Subscribe(x => Console.WriteLine(JsonConvert.SerializeObject(x)));
使用 { 400.0, 500.0, 600.0, 700.0 }
的级别输出现在是正确的:
{"Threshold":500.0,"Previous":520.0,"Current":400.0,"SlopeDirection":-1}
{"Threshold":400.0,"Previous":520.0,"Current":400.0,"SlopeDirection":-1}
{"Threshold":500.0,"Previous":450.0,"Current":750.0,"SlopeDirection":1}
{"Threshold":600.0,"Previous":450.0,"Current":750.0,"SlopeDirection":1}
{"Threshold":700.0,"Previous":450.0,"Current":750.0,"SlopeDirection":1}
下面的例子:
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Reactive.Linq;
using System.Reactive.Subjects;
namespace ConsoleApp1
{
public class ThresholdCrossedEvent
{
public ThresholdCrossedEvent(double level, double previous, double current)
{
Threshold = level;
Previous = previous;
Current = current;
}
public double Threshold { get; set; }
public double Previous { get; set; }
public double Current { get; set; }
public int SlopeDirection => Current >= Previous ? 1 : -1;
}
public static class ThresholdExtensions
{
public static IEnumerable<double> GetCrossovers(this double[] self, double previous, double current)
{
return self
.Where(level => level >= previous && level <= current || level <= previous && level >= current);
}
}
static class OrderByExtensions
{
public static IOrderedEnumerable<TSource> OrderByWithDirection<TSource, TKey>
(this IEnumerable<TSource> source,
Func<TSource, TKey> keySelector,
bool descending)
{
return descending ? source.OrderByDescending(keySelector)
: source.OrderBy(keySelector);
}
public static IOrderedQueryable<TSource> OrderByWithDirection<TSource, TKey>
(this IQueryable<TSource> source,
Expression<Func<TSource, TKey>> keySelector,
bool descending)
{
return descending ? source.OrderByDescending(keySelector)
: source.OrderBy(keySelector);
}
}
class Program
{
static void Main(string[] args)
{
var feed = new Subject<double>();
var levels = new BehaviorSubject<double[]>(new[] { 400.0, 500.0, 600.0, 700.0 });
levels
.Select(thresholds => feed
.Buffer(2, 1)
.Where(x => x.Count == 2)
.Select(x => new { LevelsCrossed = thresholds.GetCrossovers(x[0], x[1]).OrderByWithDirection(x => x, x[0] > x[1]), Previous = x[0], Current = x[1] })
.Where(x => x.LevelsCrossed.Any())
.SelectMany(x => x.LevelsCrossed.Select(level => new ThresholdCrossedEvent(level, x.Previous, x.Current))))
.Switch()
.DistinctUntilChanged(x => x.Threshold)
.Subscribe(x => Console.WriteLine(JsonConvert.SerializeObject(x)));
feed.OnNext(520.0);
feed.OnNext(400.0);
feed.OnNext(450.0);
feed.OnNext(750.0);
Console.ReadKey();
}
}
}
我有以下代码创建事件,在下面解释的情况下,从后到前发出:
var feed = new Subject<double>();
var levels = new BehaviorSubject<double[]>(new[] { 400.0, 500.0, 600.0, 700.0 });
levels
.Select(thresholds => feed
.Buffer(2, 1)
.Where(x => x.Count == 2)
.Select(x => new { LevelsCrossed = thresholds.GetCrossovers(x[0], x[1]), Previous = x[0], Current = x[1] })
.Where(x => x.LevelsCrossed.Any())
.SelectMany(x => x.LevelsCrossed.Select(level => new ThresholdCrossedEvent(level, x.Previous, x.Current))))
.Switch()
.DistinctUntilChanged(x => x.Threshold)
.Subscribe(x => Console.WriteLine(JsonConvert.SerializeObject(x)));
feed.OnNext(520.0);
feed.OnNext(400.0);
feed.OnNext(450.0);
feed.OnNext(650.0);
交叉检测扩展如下:
public static class ThresholdExtensions
{
public static IEnumerable<double> GetCrossovers(this double[] self, double previous, double current)
{
return self
.Where(level => level >= previous && level <= current || level <= previous && level >= current);
}
}
前2个发射对象顺序,第2个发射对象向后
// the order of first 2 emitted objects should be reversed
{"Threshold":400.0,"Previous":520.0,"Current":400.0,"SlopeDirection":-1}
{"Threshold":500.0,"Previous":520.0,"Current":400.0,"SlopeDirection":-1}
// the below ordered is OK
{"Threshold":400.0,"Previous":400.0,"Current":450.0,"SlopeDirection":1}
{"Threshold":500.0,"Previous":450.0,"Current":650.0,"SlopeDirection":1}
{"Threshold":600.0,"Previous":450.0,"Current":650.0,"SlopeDirection":1}
前 2 个元素的反转输出与级别的顺序有关 BehaviorSubject
- 但反转此顺序会导致最后 3 个元素反转。
如何修改 DetectThresholds
以在传入的 feed 穿过时以正确的顺序发射对象?
上面的工作示例如下:
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
namespace ConsoleApp1
{
public class ThresholdCrossedEvent
{
public ThresholdCrossedEvent(double level, double previous, double current)
{
Threshold = level;
Previous = previous;
Current = current;
}
public double Threshold { get; set; }
public double Previous { get; set; }
public double Current { get; set; }
public int SlopeDirection => Current >= Previous ? 1 : -1;
}
public static class ThresholdExtensions
{
public static IEnumerable<double> GetCrossovers(this double[] self, double previous, double current)
{
return self
.Where(level => level >= previous && level <= current || level <= previous && level >= current);
}
}
class Program
{
static void Main(string[] args)
{
var feed = new Subject<double>();
var levels = new BehaviorSubject<double[]>(new[] { 400.0, 500.0, 600.0, 700.0 });
levels
.Select(thresholds => feed
.Buffer(2, 1)
.Where(x => x.Count == 2)
.Select(x => new { LevelsCrossed = thresholds.GetCrossovers(x[0], x[1]), Previous = x[0], Current = x[1] })
.Where(x => x.LevelsCrossed.Any())
.SelectMany(x => x.LevelsCrossed.Select(level => new ThresholdCrossedEvent(level, x.Previous, x.Current))))
.Switch()
.DistinctUntilChanged(x => x.Threshold)
.Subscribe(x => Console.WriteLine(JsonConvert.SerializeObject(x)));
feed.OnNext(520.0);
feed.OnNext(400.0);
feed.OnNext(450.0);
feed.OnNext(650.0);
Console.ReadKey();
}
}
}
订购交叉 LevelsCrossed = thresholds.GetCrossovers(x[0], x[1]).OrderByWithDirection(x => x, x[0] > x[1])
需要以下内容:
static class OrderByExtensions
{
public static IOrderedEnumerable<TSource> OrderByWithDirection<TSource, TKey>
(this IEnumerable<TSource> source,
Func<TSource, TKey> keySelector,
bool descending)
{
return descending ? source.OrderByDescending(keySelector)
: source.OrderBy(keySelector);
}
public static IOrderedQueryable<TSource> OrderByWithDirection<TSource, TKey>
(this IQueryable<TSource> source,
Expression<Func<TSource, TKey>> keySelector,
bool descending)
{
return descending ? source.OrderByDescending(keySelector)
: source.OrderBy(keySelector);
}
}
levels
.Select(thresholds => feed
.Buffer(2, 1)
.Where(x => x.Count == 2)
.Select(x => new {
LevelsCrossed = thresholds.GetCrossovers(x[0], x[1]).OrderByWithDirection(x => x, x[0] > x[1]),
Previous = x[0],
Current = x[1] })
.Where(x => x.LevelsCrossed.Any())
.SelectMany(x => x.LevelsCrossed.Select(level => new ThresholdCrossedEvent(level, x.Previous, x.Current))))
.Switch()
.DistinctUntilChanged(x => x.Threshold)
.Subscribe(x => Console.WriteLine(JsonConvert.SerializeObject(x)));
使用 { 400.0, 500.0, 600.0, 700.0 }
的级别输出现在是正确的:
{"Threshold":500.0,"Previous":520.0,"Current":400.0,"SlopeDirection":-1}
{"Threshold":400.0,"Previous":520.0,"Current":400.0,"SlopeDirection":-1}
{"Threshold":500.0,"Previous":450.0,"Current":750.0,"SlopeDirection":1}
{"Threshold":600.0,"Previous":450.0,"Current":750.0,"SlopeDirection":1}
{"Threshold":700.0,"Previous":450.0,"Current":750.0,"SlopeDirection":1}
下面的例子:
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Reactive.Linq;
using System.Reactive.Subjects;
namespace ConsoleApp1
{
public class ThresholdCrossedEvent
{
public ThresholdCrossedEvent(double level, double previous, double current)
{
Threshold = level;
Previous = previous;
Current = current;
}
public double Threshold { get; set; }
public double Previous { get; set; }
public double Current { get; set; }
public int SlopeDirection => Current >= Previous ? 1 : -1;
}
public static class ThresholdExtensions
{
public static IEnumerable<double> GetCrossovers(this double[] self, double previous, double current)
{
return self
.Where(level => level >= previous && level <= current || level <= previous && level >= current);
}
}
static class OrderByExtensions
{
public static IOrderedEnumerable<TSource> OrderByWithDirection<TSource, TKey>
(this IEnumerable<TSource> source,
Func<TSource, TKey> keySelector,
bool descending)
{
return descending ? source.OrderByDescending(keySelector)
: source.OrderBy(keySelector);
}
public static IOrderedQueryable<TSource> OrderByWithDirection<TSource, TKey>
(this IQueryable<TSource> source,
Expression<Func<TSource, TKey>> keySelector,
bool descending)
{
return descending ? source.OrderByDescending(keySelector)
: source.OrderBy(keySelector);
}
}
class Program
{
static void Main(string[] args)
{
var feed = new Subject<double>();
var levels = new BehaviorSubject<double[]>(new[] { 400.0, 500.0, 600.0, 700.0 });
levels
.Select(thresholds => feed
.Buffer(2, 1)
.Where(x => x.Count == 2)
.Select(x => new { LevelsCrossed = thresholds.GetCrossovers(x[0], x[1]).OrderByWithDirection(x => x, x[0] > x[1]), Previous = x[0], Current = x[1] })
.Where(x => x.LevelsCrossed.Any())
.SelectMany(x => x.LevelsCrossed.Select(level => new ThresholdCrossedEvent(level, x.Previous, x.Current))))
.Switch()
.DistinctUntilChanged(x => x.Threshold)
.Subscribe(x => Console.WriteLine(JsonConvert.SerializeObject(x)));
feed.OnNext(520.0);
feed.OnNext(400.0);
feed.OnNext(450.0);
feed.OnNext(750.0);
Console.ReadKey();
}
}
}