Rx Convert use of subject to Observable.Create 方法
Rx Convert use of subject to Observable.Create method
我正在尝试使用反应式扩展 (Rx) 来创建一个热可观察对象,多个用户都可以订阅该对象,所有这些用户都可以获得推送给他们的值。我可以使用以下主题来做到这一点:
var subj = new Subject<int>();
var observable = subj.AsObservable();
observable.Subscribe(x => Console.WriteLine("1 Number: {0}", x));
observable.Subscribe(x => Console.WriteLine("2 Number: {0}", x));
subj.OnNext(1);
subj.OnNext(2);
subj.OnNext(3);
//and so on
但我读到主题是供 "experimental" 使用的,我想使用 Observable.Create 工厂方法做同样的事情。我环顾四周,发现有很多使用 Create 方法创建冷可观察对象的示例,但我希望具有与上面代码产生的行为相同的行为。
感谢您的帮助。
尼克
您的问题可以分为两个单独的问题。
1.如何创建没有主题的 Observable?
有很多方法,listed in this perfect book。 Observable.Create
只是其中之一,但为了获得像您的示例 (1, 2, 3) 中那样的一系列值,我会简单地使用
var source = Observable.Range(1, 3);
但是,您可能已经注意到,以这种方式创建的流将是冷可观察对象。这就引出了第二个问题:
2。如何将冷观察变成热观察并在订阅者之间共享结果?
为此,您需要一个 Publish
函数。它允许在订阅者之间共享 Rx 流。试试这个:
var sourceHot = Observable.Range(1, 3).Publish();
sourceHot.Subscribe(x => Console.WriteLine("1 Number: {0}", x));
sourceHot.Subscribe(x => Console.WriteLine("2 Number: {0}", x));
var disp = sourceHot.Connect();
注意,如果不想手动调用Connect
/Disconnect
,可以使用
一个 RefCount 函数。另请注意 stream.Publish()
与调用 stream.Multicast(new Subject<T>())
.
完全相同
我强烈推荐阅读 Sharing in RX: Publish, Replay, and Multicast article,它深入解释了这个主题。
我正在尝试使用反应式扩展 (Rx) 来创建一个热可观察对象,多个用户都可以订阅该对象,所有这些用户都可以获得推送给他们的值。我可以使用以下主题来做到这一点:
var subj = new Subject<int>();
var observable = subj.AsObservable();
observable.Subscribe(x => Console.WriteLine("1 Number: {0}", x));
observable.Subscribe(x => Console.WriteLine("2 Number: {0}", x));
subj.OnNext(1);
subj.OnNext(2);
subj.OnNext(3);
//and so on
但我读到主题是供 "experimental" 使用的,我想使用 Observable.Create 工厂方法做同样的事情。我环顾四周,发现有很多使用 Create 方法创建冷可观察对象的示例,但我希望具有与上面代码产生的行为相同的行为。
感谢您的帮助。
尼克
您的问题可以分为两个单独的问题。
1.如何创建没有主题的 Observable?
有很多方法,listed in this perfect book。 Observable.Create
只是其中之一,但为了获得像您的示例 (1, 2, 3) 中那样的一系列值,我会简单地使用
var source = Observable.Range(1, 3);
但是,您可能已经注意到,以这种方式创建的流将是冷可观察对象。这就引出了第二个问题:
2。如何将冷观察变成热观察并在订阅者之间共享结果?
为此,您需要一个 Publish
函数。它允许在订阅者之间共享 Rx 流。试试这个:
var sourceHot = Observable.Range(1, 3).Publish();
sourceHot.Subscribe(x => Console.WriteLine("1 Number: {0}", x));
sourceHot.Subscribe(x => Console.WriteLine("2 Number: {0}", x));
var disp = sourceHot.Connect();
注意,如果不想手动调用Connect
/Disconnect
,可以使用
一个 RefCount 函数。另请注意 stream.Publish()
与调用 stream.Multicast(new Subject<T>())
.
我强烈推荐阅读 Sharing in RX: Publish, Replay, and Multicast article,它深入解释了这个主题。