Rx Convert use of subject to Observable.Create 方法

Rx Convert use of subject to Observable.Create method

我正在尝试使用反应式扩展 (Rx) 来创建一个热可观察对象,多个用户都可以订阅该对象,所有这些用户都可以获得推送给他们的值。我可以使用以下主题来做到这一点:

var subj = new Subject<int>();
var observable = subj.AsObservable();
observable.Subscribe(x => Console.WriteLine("1 Number: {0}", x));
observable.Subscribe(x => Console.WriteLine("2 Number: {0}", x));

subj.OnNext(1);
subj.OnNext(2);
subj.OnNext(3);
  //and so on

但我读到主题是供 "experimental" 使用的,我想使用 Observable.Create 工厂方法做同样的事情。我环顾四周,发现有很多使用 Create 方法创建冷可观察对象的示例,但我希望具有与上面代码产生的行为相同的行为。

感谢您的帮助。

尼克

您的问题可以分为两个单独的问题。

1.如何创建没有主题的 Observable?

有很多方法,listed in this perfect bookObservable.Create 只是其中之一,但为了获得像您的示例 (1, 2, 3) 中那样的一系列值,我会简单地使用

var source = Observable.Range(1, 3);

但是,您可能已经注意到,以这种方式创建的流将是冷可观察对象。这就引出了第二个问题:

2。如何将冷观察变成热观察并在订阅者之间共享结果?

为此,您需要一个 Publish 函数。它允许在订阅者之间共享 Rx 流。试试这个:

var sourceHot = Observable.Range(1, 3).Publish();
sourceHot.Subscribe(x => Console.WriteLine("1 Number: {0}", x));
sourceHot.Subscribe(x => Console.WriteLine("2 Number: {0}", x));
var disp = sourceHot.Connect();

注意,如果不想手动调用Connect/Disconnect,可以使用 一个 RefCount 函数。另请注意 stream.Publish() 与调用 stream.Multicast(new Subject<T>()).

完全相同

我强烈推荐阅读 Sharing in RX: Publish, Replay, and Multicast article,它深入解释了这个主题。