如何正确观察非标准事件?

How to properly observe non-standard events?

我是 Reactive Extensions 的新手,正在处理具有如下定义的事件的 COM 库:

public delegate void MyDelegate(int requestId, double price, int amount);
public event MyDelegate MyEvent;

如何正确观察这个?我尝试使用 Observable.FromEvent(),但由于事件的参数不是 EventArgs 类型,我不知道 FromEvent()FromEventPattern() 是如何工作的。

我目前的解决方法是将自定义委托附加到事件,然后调用 Subject.OnNext(),但我猜我不应该这样做。

这是我当前解决方法的示例:

        MyEvent += new MyDelegate((int requestId, double price, int amount) =>
        {
            Task.Run(() =>
            {
                var args = new MyArgs()
                {
                    requestId = requestId,
                    price = price,
                    amount = amount,
                };
                this.mySubject.OnNext(args);
            });
        });

它有一个特殊的 FromEvent 重载。有点傻,但函数签名看起来像:

IObservable<TEventArgs> FromEvent<TDelegate, TEventArgs>(Func<Action<TEventArgs>, TDelegate> conversion, 
                                                         Action<TDelegate> addHandler, 
                                                         Action<TDelegate> removeHandler);

转换函数是这里的重要部分,基本上您是在告诉 Rx 您的委托如何映射到具体类型。

在您的场景中,它最终看起来像这样:

Observable.FromEvent<MyDelegate, MyArgs>(
  converter => new MyDelegate(
                  (id, price, amount) => converter(new MyArgs { 
                                                        RequestId = id, 
                                                        Price = price, 
                                                        Amount = amount
                                                       })
               ),
  handler => MyEvent += handler,
  handler => MyEvent -= handler);

那么这一切都在做什么?在内部,它类似于您正在做的事情(我将从概念上解释它的作用,因为实现稍微复杂一些)。进行新订阅时,将使用作为 converter 参数传入的 observer.OnNext 调用转换函数。这个 lambda 将 return 一个新的 MyDelegate 实例来包装我们提供的转换函数 ((id, price, amount) => ...)。这就是随后传递给 handler => MyEvent += handler 方法的内容。

之后,每次触发事件时,它都会调用我们的 lambda 方法并将传递的参数转换为 MyArgs 的实例,然后将其传递给 converter/observer.OnNext

此外,除此之外,它还会在您完成事件处理程序后负责清理事件处理程序,优雅地将异常传递到下游,并通过在多个事件处理程序之间共享单个事件处理程序来管理头顶内存观察员。

Source code