C# Reactive Extensions - 内存管理和 Distinct 运算符

C# Reactive Extensions - Memory management and the Distinct operator

示例代码:

   public static IObservable<Order> ObserveOrders(this IProxy proxy,IEqualityComparer<Order> distinctPerDayComparer )
   {
        return Observable.FromEvent<Order>(ev => proxy.OrderUpdated += ev,ev => proxy.OrderUpdated -= ev)
                    .Distinct(distinctPerDayComparer);
   } 

   public class DistinctPerDayComparer : IEqualityComparer<Order>
   {
        public bool Equals(Order o1, Order o2)
        {
            if(o1.Id != o2.Id)
               return false;

            bool isSameDay = o1.Date.Day == o2.Date.Day;
            return isSameDay;
        }            

        public int GetHashCode(Order o)
        {
           return o.Id.GetHashCode();
        }
   }

public class Order
{
    public int Id { get; set; }
    public DateTime Date { get; set; }
} 

场景:

序列:

 {id:1,D:'5/25/2016'}-{id:1,D:'5/25/2016'}-{id:2,D:'5/25/2016'}-{id:1 ,D:'5/26/2016'}

不同的序列:

 {id:1,D:'5/25/2016'}-{id:2,D:'5/25/2016'}-{id:1,D:'5/26/2016'}

现在假设这个序列很长 运行,实际上永远不会调用 onComplete。

Rx 是如何管理它的,所以它不会在内存中保存所有不同的元素来与它进行比较?

我猜它为管道中的元素保留了一些后备存储空间。但我一直认为,在使用下一个项目调用 onNext 之后,该项目将被简单地处理掉。

仍然,如果它被处置,Rx 在调用 Distinct 运算符时将哪些元素用于 EqualityComparer ?

如果您查看 Rx 源代码,您会发现 distinct 使用哈希集并将值存储在其中。您认为物品只是被丢弃的假设是不正确的。

如果您的订单对象很重,您可以使用键选择器,RX 只会将该值存储在哈希集中。

.Distinct(o => Tuple.Create(o.id, o.Date), distinctPerDayComparer);

然后需要更改 distinctPerDayComparer

public class DistinctPerDayComparer : IEqualityComparer<Tuple<int, DateTime>>
{
    public bool Equals(Tuple<int, DateTime> o1, Tuple<int, DateTime> o2)
    {
        if(o1.Item1 != o2.Item1)
           return false;

        bool isSameDay = o1.Item2.Day == o2.Item2.Day;
        return isSameDay;
    }            

    public int GetHashCode(Tuple<int, DateTime> o)
    {
       return o.Item1.GetHashCode();
    }
}

没有测试代码,但应该是一个起点。现在将存储元组,直到序列完成,而不是您的 Order 对象。

否则,您可以使用 Window 函数对它们进行分组并按计划清理它们,但对于整个可观察序列来说,它并不是真正不同的。