为新订阅者发布事件的最后一个值

Publish last value of event for new subscribers

我有一个 class Foo 和一个发布 FooState 枚举的事件。我想把这个事件变成一个可观察的,为新订阅者重播最后一个值。

即使没有订阅者,任何新订阅者也应该获得最后一个值。

public enum FooState
{
    Stopped = 0,
    Starting = 1,
    Running = 2,        
}

public delegate void FooEventHandler(object sender, FooEventArgs e);

public class FooEventArgs : EventArgs
{
    public FooEventArgs(FooState fooState)
    {
        this.State = fooState;
    }

    public FooState State {get; private set;}
}

public class Foo
{
    public event FooEventHandler FooEvent;

    public void OnFooEvent(FooState state)
    {
        var fooEvent = FooEvent;

        if(fooEvent != null)
        {
            fooEvent(this, new FooEventArgs(state));
        }
    }
}

到目前为止,我的尝试围绕着使用 PublishRefCountReplay。但是,如果我在触发事件后订阅可观察对象,我尝试的 none 组合会起作用。

Replay(1).RefCount() 只要已经有至少一个订阅就可以工作,但我也需要为第一个延迟订阅工作。

var foo = new Foo();

   var obs =  Observable.FromEventPattern<FooEventHandler, FooEventArgs>(
                                        h => foo.FooEvent += h,
                                        h => foo.FooEvent -= h)
                                    .DistinctUntilChanged()
                                    .Replay(1)
                                    .RefCount();

    // Works if this line is uncomented.
    //obs.Subscribe(x => Console.WriteLine("Early Subscriber = " + x.EventArgs.State));

    foo.OnFooEvent(FooState.Running);

    obs.Subscribe(x => Console.WriteLine("Late Subscriber = " + x.EventArgs.State));

有谁知道如何用 Rx 做到这一点?

Rx 正在做正确的事情,将您的事件通知转换为您的流并重播它们,但您要问的是: "Why when I subscribe to the event, don't I get the initial state".

事件不是那样的。如果我在 foo.FooEvent 上执行 +=,我不会立即触发当前值。我只会在它发生变化时收到通知。 正如您所注意到的,'Replay' 将重播后续事件,但不提供订阅时的状态。

要解决您的问题,您需要确保在连接流以获取更改通知之前将当前值放入流中。 查看 Observable.StartWith()。

即在 the.DistinctUntilChanged() 调用之前执行“.StartWith(foo.State)”(紧接在 .FromEventPattern 之后)。

RefCount 仅在第一次订阅后连接。如果你想对连接发生的时间进行细粒度控制,你应该使用 Replay + Connect.

所以这样做:

var publishedSource = eventSource.DistinctUntilChanged().Replay(1);

var connection = publishedSource.Connect();

//Subscribe to publishedSource to receive events and dispose of 
connection when you are done.

发自我的 phone 对于任何语法错误提前致歉。