这个 observable 会很热吗?
Would this observable be hot?
这不会使 observable 变热吗?
using System;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
namespace ObservableNumberGenerator.ObservableImplementationReliesOnOperator.Hot
{
public class RandomNumbers : IObservable<int>, IDisposable
{
protected Random _random = null;
protected int _maxNumbersToGenerate;
protected int _startAfterMilliseconds = 1000;
protected int _generateEveryMilliseconds = 1000;
protected IObservable<int> _innerObservable = null;
protected IDisposable _innerSubscription = null;
protected bool _completed = false;
private bool disposedValue = false;
public RandomNumbers(int maxNumbersToGenerate,
int startAfterMilliseconds = 1000,
int generateEveryMilliseconds = 1000)
{
_maxNumbersToGenerate = maxNumbersToGenerate;
_startAfterMilliseconds = startAfterMilliseconds;
_generateEveryMilliseconds = generateEveryMilliseconds;
_random = new Random();
_innerObservable = Observable.Timer(
TimeSpan.FromMilliseconds(_startAfterMilliseconds),
TimeSpan.FromMilliseconds(_generateEveryMilliseconds))
.Select(v => GenerateNumber())
.Take(_maxNumbersToGenerate);
_innerSubscription = _innerObservable.Subscribe(OnNext, OnError, OnCompleted);
}
protected virtual void OnCompleted()
{
_completed = true;
}
protected virtual void OnError(Exception ex)
{
}
protected virtual void OnNext(int value)
{
}
protected virtual int GenerateNumber()
{
return _random.Next();
}
public IDisposable Subscribe(IObserver<int> observer)
{
if (observer == null) throw new ArgumentNullException("observer");
if (_completed)
{
observer.OnCompleted();
return Disposable.Empty;
}
else
{
return _innerObservable.Subscribe(observer);
}
}
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
_innerSubscription.Dispose();
}
disposedValue = true;
}
}
public void Dispose()
{
Dispose(true);
}
}
}
或者我会 到 Publish
和 Connect
让它变热吗?
public class RandomNumbers : IObservable<int>, IDisposable
{
protected IConnectableObservable<int> _innerObservable = null;
public RandomNumbers(int maxNumbersToGenerate,
int startAfterMilliseconds = 1000,
int generateEveryMilliseconds = 1000)
{
_maxNumbersToGenerate = maxNumbersToGenerate;
_startAfterMilliseconds = startAfterMilliseconds;
_generateEveryMilliseconds = generateEveryMilliseconds;
_random = new Random();
_innerObservable = Observable
.Timer(TimeSpan.FromMilliseconds(_startAfterMilliseconds),
TimeSpan.FromMilliseconds(_generateEveryMilliseconds))
.Select(v => GenerateNumber())
.Take(_maxNumbersToGenerate)
.Publish();
_innerObservable.Connect();
_innerSubscription = _innerObservable.Subscribe(OnNext, OnError, OnCompleted);
}
...
}
您的第一个代码是冷可观察对象。这里有一些简单的客户端代码来演示:
static void Main(string[] args)
{
var x = new RandomNumbers(10, 0, 500);
x.Timestamp().Subscribe(i => Console.WriteLine("First sub: {0} {1}", i.Timestamp.DateTime.ToString("O"), i.Value));
Thread.Sleep(1000);
x.Timestamp().Subscribe(i => Console.WriteLine("Second sub: {0} {1}", i.Timestamp.DateTime.ToString("O"), i.Value));
Console.Read();
}
对于 cold observable,您会看到 "First sub" 行与 "Second sub" 行不匹配。随着热实施,他们做到了。您还会注意到冷实现中有相同数量的 "First sub" 和 "Second sub" 行。通过热实现,全局观察到最大值 10。
只是为了澄清热与冷。假设一个 static/stable 数据流,你会得到这个弹珠图,显示在 t0
和 t1
时订阅有何不同:
hot source: A---B---C---D---
sub at t0 : A---B---C---D---
sub at t1 : B---C---D---
cold source: A---B---C---D---
sub at t0 : A---B---C---D---
sub at t1 : A---B---C---
但在我们的例子中,我们有一个随机流:
hot source: R()-R()-R()-R()-
sub at t0 : R1--R2--R3--R4--
sub at t1 : R2--R3--R4--
cold source: R()-R()-R()-R()-
sub at t0 : R1--R2--R3--R4--
sub at t1 : R5--R6--R7--
Random()
调用的结果在热可观察对象中共享,而在冷对象中则不会。虽然如果您按索引测量,热可观察订阅会获得不同的值,但它们在给定时间 t 会收到相同的值。在稳定源中,冷可观察订阅按索引匹配,但不按时间匹配。在一个不稳定的源中,就像我们的例子一样,冷的可观察量在索引或时间上都不匹配。
正如 Enigmativity 指出的那样,并且正如 Shlomo 的回答似乎也暗示的那样,仅仅因为您在构造函数中订阅了 observable 不会使它成为 "shared",因此想要让它成为热门。
我认真思考了为什么会这样,然后我查看了 Timer
运算符上的 Subscribe
方法的源代码,并意识到一些我之前知道但忘记的事情,因为我是太难看了
大多数运算符都是这样实现的:每次在 Timer
运算符上调用 Subscribe
方法时,都会创建一个内部 class 的新实例来启动新序列在默认池调度程序上。
所以,Timer
class 的某处写着:
namespace System.Reactive.Linq.ObservableImpl
{
internal class Timer : Producer<long>
{
protected override IDisposable Run(
IObserver<long> observer,
IDisposable cancel,
Action<IDisposable> setSink)
{
if (this._period.HasValue)
{
Timer.TimerImpl timerImpl =
new Timer.TimerImpl(this, observer, cancel);
setSink(timerImpl);
return timerImpl.Run();
}
...
}
}
}
将这个事实与我自定义可观察对象的 Subscribe
方法中的事实结合起来,我正在这样做:
public IDisposable Subscribe(IObserver<int> observer)
{
if (_completed)
{
...
}
else
{
return _innerObservable.Subscribe(observer);
}
}
如果我像这样通过内部观察器引导所有观察,我可以在不使用 Publish
和 Connect
或 RefCount
运算符的情况下使这个可观察到热:
using System;
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Reactive.Linq;
namespace ObservableNumberGenerator.ObservableImplementationReliesOnOperator.Hot
{
public class RandomNumbers : IObservable<int>, IDisposable
{
protected Random _random = null;
protected int _maxNumbersToGenerate;
protected int _startAfterMilliseconds = 1000;
protected int _generateEveryMilliseconds = 1000;
protected List<IObserver<int>> _observers =
new List<IObserver<int>>();
protected IObservable<int> _innerObservable = null;
protected IDisposable _innerSubscription = null;
protected bool _completed = false;
private bool disposedValue = false;
public RandomNumbers(int maxNumbersToGenerate,
int startAfterMilliseconds = 1000, int generateEveryMilliseconds = 1000)
{
_maxNumbersToGenerate = maxNumbersToGenerate;
_startAfterMilliseconds = startAfterMilliseconds;
_generateEveryMilliseconds = generateEveryMilliseconds;
_random = new Random();
_innerObservable = Observable
.Timer(
TimeSpan.FromMilliseconds(_startAfterMilliseconds),
TimeSpan.FromMilliseconds(_generateEveryMilliseconds))
.Select(v => GenerateNumber())
.Take(_maxNumbersToGenerate);
_innerSubscription =
_innerObservable.Subscribe(OnNext, OnError, OnCompleted);
}
protected virtual void OnCompleted()
{
_completed = true;
foreach (var observer in _observers)
observer.OnCompleted();
}
protected virtual void OnError(Exception ex)
{
_completed = true;
foreach (var observer in _observers)
observer.OnError(ex);
}
protected virtual void OnNext(int value)
{
foreach (var observer in _observers)
observer.OnNext(value);
}
protected virtual int GenerateNumber()
{
return _random.Next();
}
public IDisposable Subscribe(IObserver<int> observer)
{
if (observer == null) throw new ArgumentNullException("observer");
_observers.Add(observer);
if (_completed)
{
observer.OnCompleted();
}
return Disposable.Empty;
}
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
_innerSubscription.Dispose();
}
disposedValue = true;
}
}
public void Dispose()
{
Dispose(true);
}
}
}
任何想这样做的人请注意:
我所有的例子都只是练习,目的是让我自己了解 Rx 的内部工作原理。
正如 Enigmativity 正确指出的那样,不得在生产代码中以这种方式实现任何可观察值。
一方面,我的 List<IObserver<T>>
和 _innerObserver
中的 none 个观察者都是安全观察者。在 Rx 源代码中,它们将我们传递的每个观察者转换为一个安全的观察者,该观察者在每个 OnNext
、OnError
和 OnCompleted
实现中都有 try/finally
子句以防止异常。有关详细信息,请查看 System.Reactive.SafeObserver<TSource>
class 和 System.Reactive.AnonymousObserver<T>
class.
的源代码
这不会使 observable 变热吗?
using System;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
namespace ObservableNumberGenerator.ObservableImplementationReliesOnOperator.Hot
{
public class RandomNumbers : IObservable<int>, IDisposable
{
protected Random _random = null;
protected int _maxNumbersToGenerate;
protected int _startAfterMilliseconds = 1000;
protected int _generateEveryMilliseconds = 1000;
protected IObservable<int> _innerObservable = null;
protected IDisposable _innerSubscription = null;
protected bool _completed = false;
private bool disposedValue = false;
public RandomNumbers(int maxNumbersToGenerate,
int startAfterMilliseconds = 1000,
int generateEveryMilliseconds = 1000)
{
_maxNumbersToGenerate = maxNumbersToGenerate;
_startAfterMilliseconds = startAfterMilliseconds;
_generateEveryMilliseconds = generateEveryMilliseconds;
_random = new Random();
_innerObservable = Observable.Timer(
TimeSpan.FromMilliseconds(_startAfterMilliseconds),
TimeSpan.FromMilliseconds(_generateEveryMilliseconds))
.Select(v => GenerateNumber())
.Take(_maxNumbersToGenerate);
_innerSubscription = _innerObservable.Subscribe(OnNext, OnError, OnCompleted);
}
protected virtual void OnCompleted()
{
_completed = true;
}
protected virtual void OnError(Exception ex)
{
}
protected virtual void OnNext(int value)
{
}
protected virtual int GenerateNumber()
{
return _random.Next();
}
public IDisposable Subscribe(IObserver<int> observer)
{
if (observer == null) throw new ArgumentNullException("observer");
if (_completed)
{
observer.OnCompleted();
return Disposable.Empty;
}
else
{
return _innerObservable.Subscribe(observer);
}
}
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
_innerSubscription.Dispose();
}
disposedValue = true;
}
}
public void Dispose()
{
Dispose(true);
}
}
}
或者我会 到 Publish
和 Connect
让它变热吗?
public class RandomNumbers : IObservable<int>, IDisposable
{
protected IConnectableObservable<int> _innerObservable = null;
public RandomNumbers(int maxNumbersToGenerate,
int startAfterMilliseconds = 1000,
int generateEveryMilliseconds = 1000)
{
_maxNumbersToGenerate = maxNumbersToGenerate;
_startAfterMilliseconds = startAfterMilliseconds;
_generateEveryMilliseconds = generateEveryMilliseconds;
_random = new Random();
_innerObservable = Observable
.Timer(TimeSpan.FromMilliseconds(_startAfterMilliseconds),
TimeSpan.FromMilliseconds(_generateEveryMilliseconds))
.Select(v => GenerateNumber())
.Take(_maxNumbersToGenerate)
.Publish();
_innerObservable.Connect();
_innerSubscription = _innerObservable.Subscribe(OnNext, OnError, OnCompleted);
}
...
}
您的第一个代码是冷可观察对象。这里有一些简单的客户端代码来演示:
static void Main(string[] args)
{
var x = new RandomNumbers(10, 0, 500);
x.Timestamp().Subscribe(i => Console.WriteLine("First sub: {0} {1}", i.Timestamp.DateTime.ToString("O"), i.Value));
Thread.Sleep(1000);
x.Timestamp().Subscribe(i => Console.WriteLine("Second sub: {0} {1}", i.Timestamp.DateTime.ToString("O"), i.Value));
Console.Read();
}
对于 cold observable,您会看到 "First sub" 行与 "Second sub" 行不匹配。随着热实施,他们做到了。您还会注意到冷实现中有相同数量的 "First sub" 和 "Second sub" 行。通过热实现,全局观察到最大值 10。
只是为了澄清热与冷。假设一个 static/stable 数据流,你会得到这个弹珠图,显示在 t0
和 t1
时订阅有何不同:
hot source: A---B---C---D---
sub at t0 : A---B---C---D---
sub at t1 : B---C---D---
cold source: A---B---C---D---
sub at t0 : A---B---C---D---
sub at t1 : A---B---C---
但在我们的例子中,我们有一个随机流:
hot source: R()-R()-R()-R()-
sub at t0 : R1--R2--R3--R4--
sub at t1 : R2--R3--R4--
cold source: R()-R()-R()-R()-
sub at t0 : R1--R2--R3--R4--
sub at t1 : R5--R6--R7--
Random()
调用的结果在热可观察对象中共享,而在冷对象中则不会。虽然如果您按索引测量,热可观察订阅会获得不同的值,但它们在给定时间 t 会收到相同的值。在稳定源中,冷可观察订阅按索引匹配,但不按时间匹配。在一个不稳定的源中,就像我们的例子一样,冷的可观察量在索引或时间上都不匹配。
正如 Enigmativity 指出的那样,并且正如 Shlomo 的回答似乎也暗示的那样,仅仅因为您在构造函数中订阅了 observable 不会使它成为 "shared",因此想要让它成为热门。
我认真思考了为什么会这样,然后我查看了 Timer
运算符上的 Subscribe
方法的源代码,并意识到一些我之前知道但忘记的事情,因为我是太难看了
大多数运算符都是这样实现的:每次在 Timer
运算符上调用 Subscribe
方法时,都会创建一个内部 class 的新实例来启动新序列在默认池调度程序上。
所以,Timer
class 的某处写着:
namespace System.Reactive.Linq.ObservableImpl
{
internal class Timer : Producer<long>
{
protected override IDisposable Run(
IObserver<long> observer,
IDisposable cancel,
Action<IDisposable> setSink)
{
if (this._period.HasValue)
{
Timer.TimerImpl timerImpl =
new Timer.TimerImpl(this, observer, cancel);
setSink(timerImpl);
return timerImpl.Run();
}
...
}
}
}
将这个事实与我自定义可观察对象的 Subscribe
方法中的事实结合起来,我正在这样做:
public IDisposable Subscribe(IObserver<int> observer)
{
if (_completed)
{
...
}
else
{
return _innerObservable.Subscribe(observer);
}
}
如果我像这样通过内部观察器引导所有观察,我可以在不使用 Publish
和 Connect
或 RefCount
运算符的情况下使这个可观察到热:
using System;
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Reactive.Linq;
namespace ObservableNumberGenerator.ObservableImplementationReliesOnOperator.Hot
{
public class RandomNumbers : IObservable<int>, IDisposable
{
protected Random _random = null;
protected int _maxNumbersToGenerate;
protected int _startAfterMilliseconds = 1000;
protected int _generateEveryMilliseconds = 1000;
protected List<IObserver<int>> _observers =
new List<IObserver<int>>();
protected IObservable<int> _innerObservable = null;
protected IDisposable _innerSubscription = null;
protected bool _completed = false;
private bool disposedValue = false;
public RandomNumbers(int maxNumbersToGenerate,
int startAfterMilliseconds = 1000, int generateEveryMilliseconds = 1000)
{
_maxNumbersToGenerate = maxNumbersToGenerate;
_startAfterMilliseconds = startAfterMilliseconds;
_generateEveryMilliseconds = generateEveryMilliseconds;
_random = new Random();
_innerObservable = Observable
.Timer(
TimeSpan.FromMilliseconds(_startAfterMilliseconds),
TimeSpan.FromMilliseconds(_generateEveryMilliseconds))
.Select(v => GenerateNumber())
.Take(_maxNumbersToGenerate);
_innerSubscription =
_innerObservable.Subscribe(OnNext, OnError, OnCompleted);
}
protected virtual void OnCompleted()
{
_completed = true;
foreach (var observer in _observers)
observer.OnCompleted();
}
protected virtual void OnError(Exception ex)
{
_completed = true;
foreach (var observer in _observers)
observer.OnError(ex);
}
protected virtual void OnNext(int value)
{
foreach (var observer in _observers)
observer.OnNext(value);
}
protected virtual int GenerateNumber()
{
return _random.Next();
}
public IDisposable Subscribe(IObserver<int> observer)
{
if (observer == null) throw new ArgumentNullException("observer");
_observers.Add(observer);
if (_completed)
{
observer.OnCompleted();
}
return Disposable.Empty;
}
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
_innerSubscription.Dispose();
}
disposedValue = true;
}
}
public void Dispose()
{
Dispose(true);
}
}
}
任何想这样做的人请注意:
我所有的例子都只是练习,目的是让我自己了解 Rx 的内部工作原理。
正如 Enigmativity 正确指出的那样,不得在生产代码中以这种方式实现任何可观察值。
一方面,我的 List<IObserver<T>>
和 _innerObserver
中的 none 个观察者都是安全观察者。在 Rx 源代码中,它们将我们传递的每个观察者转换为一个安全的观察者,该观察者在每个 OnNext
、OnError
和 OnCompleted
实现中都有 try/finally
子句以防止异常。有关详细信息,请查看 System.Reactive.SafeObserver<TSource>
class 和 System.Reactive.AnonymousObserver<T>
class.