为新订阅者发布事件的最后一个值
Publish last value of event for new subscribers
我有一个 class Foo
和一个发布 FooState
枚举的事件。我想把这个事件变成一个可观察的,为新订阅者重播最后一个值。
即使没有订阅者,任何新订阅者也应该获得最后一个值。
public enum FooState
{
Stopped = 0,
Starting = 1,
Running = 2,
}
public delegate void FooEventHandler(object sender, FooEventArgs e);
public class FooEventArgs : EventArgs
{
public FooEventArgs(FooState fooState)
{
this.State = fooState;
}
public FooState State {get; private set;}
}
public class Foo
{
public event FooEventHandler FooEvent;
public void OnFooEvent(FooState state)
{
var fooEvent = FooEvent;
if(fooEvent != null)
{
fooEvent(this, new FooEventArgs(state));
}
}
}
到目前为止,我的尝试围绕着使用 Publish
、RefCount
和 Replay
。但是,如果我在触发事件后订阅可观察对象,我尝试的 none 组合会起作用。
Replay(1).RefCount()
只要已经有至少一个订阅就可以工作,但我也需要为第一个延迟订阅工作。
var foo = new Foo();
var obs = Observable.FromEventPattern<FooEventHandler, FooEventArgs>(
h => foo.FooEvent += h,
h => foo.FooEvent -= h)
.DistinctUntilChanged()
.Replay(1)
.RefCount();
// Works if this line is uncomented.
//obs.Subscribe(x => Console.WriteLine("Early Subscriber = " + x.EventArgs.State));
foo.OnFooEvent(FooState.Running);
obs.Subscribe(x => Console.WriteLine("Late Subscriber = " + x.EventArgs.State));
有谁知道如何用 Rx 做到这一点?
Rx 正在做正确的事情,将您的事件通知转换为您的流并重播它们,但您要问的是:
"Why when I subscribe to the event, don't I get the initial state".
事件不是那样的。如果我在 foo.FooEvent 上执行 +=,我不会立即触发当前值。我只会在它发生变化时收到通知。
正如您所注意到的,'Replay' 将重播后续事件,但不提供订阅时的状态。
要解决您的问题,您需要确保在连接流以获取更改通知之前将当前值放入流中。
查看 Observable.StartWith()。
即在 the.DistinctUntilChanged() 调用之前执行“.StartWith(foo.State)”(紧接在 .FromEventPattern 之后)。
RefCount
仅在第一次订阅后连接。如果你想对连接发生的时间进行细粒度控制,你应该使用 Replay
+ Connect
.
所以这样做:
var publishedSource = eventSource.DistinctUntilChanged().Replay(1);
var connection = publishedSource.Connect();
//Subscribe to publishedSource to receive events and dispose of
connection when you are done.
发自我的 phone 对于任何语法错误提前致歉。
我有一个 class Foo
和一个发布 FooState
枚举的事件。我想把这个事件变成一个可观察的,为新订阅者重播最后一个值。
即使没有订阅者,任何新订阅者也应该获得最后一个值。
public enum FooState
{
Stopped = 0,
Starting = 1,
Running = 2,
}
public delegate void FooEventHandler(object sender, FooEventArgs e);
public class FooEventArgs : EventArgs
{
public FooEventArgs(FooState fooState)
{
this.State = fooState;
}
public FooState State {get; private set;}
}
public class Foo
{
public event FooEventHandler FooEvent;
public void OnFooEvent(FooState state)
{
var fooEvent = FooEvent;
if(fooEvent != null)
{
fooEvent(this, new FooEventArgs(state));
}
}
}
到目前为止,我的尝试围绕着使用 Publish
、RefCount
和 Replay
。但是,如果我在触发事件后订阅可观察对象,我尝试的 none 组合会起作用。
Replay(1).RefCount()
只要已经有至少一个订阅就可以工作,但我也需要为第一个延迟订阅工作。
var foo = new Foo();
var obs = Observable.FromEventPattern<FooEventHandler, FooEventArgs>(
h => foo.FooEvent += h,
h => foo.FooEvent -= h)
.DistinctUntilChanged()
.Replay(1)
.RefCount();
// Works if this line is uncomented.
//obs.Subscribe(x => Console.WriteLine("Early Subscriber = " + x.EventArgs.State));
foo.OnFooEvent(FooState.Running);
obs.Subscribe(x => Console.WriteLine("Late Subscriber = " + x.EventArgs.State));
有谁知道如何用 Rx 做到这一点?
Rx 正在做正确的事情,将您的事件通知转换为您的流并重播它们,但您要问的是: "Why when I subscribe to the event, don't I get the initial state".
事件不是那样的。如果我在 foo.FooEvent 上执行 +=,我不会立即触发当前值。我只会在它发生变化时收到通知。 正如您所注意到的,'Replay' 将重播后续事件,但不提供订阅时的状态。
要解决您的问题,您需要确保在连接流以获取更改通知之前将当前值放入流中。 查看 Observable.StartWith()。
即在 the.DistinctUntilChanged() 调用之前执行“.StartWith(foo.State)”(紧接在 .FromEventPattern 之后)。
RefCount
仅在第一次订阅后连接。如果你想对连接发生的时间进行细粒度控制,你应该使用 Replay
+ Connect
.
所以这样做:
var publishedSource = eventSource.DistinctUntilChanged().Replay(1);
var connection = publishedSource.Connect();
//Subscribe to publishedSource to receive events and dispose of
connection when you are done.
发自我的 phone 对于任何语法错误提前致歉。