Rx.net - 同步与异步观察者 - 取决于来源?
Rx.net - synchronous vs async observers - depends on source?
我是 Rx 的新手,正在努力了解它。没有读过很多书,但首先尝试通过动手实验。
class Program
{
static void Main(string[] args)
{
// one source, produces values with delays
IObservable<int> source = Observable.Generate(0, i => i < 2, i => ++i, i => i*i, i => TimeSpan.FromMilliseconds(100));
IObserver<int> handler = null;
IDisposable subscription = source.Subscribe(
i =>
{
Console.WriteLine("Sub 1 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId,i);
Thread.Sleep(500);
},
exception => Console.WriteLine("Sub 1 Something went wrong {0}", exception),
() => Console.WriteLine("Sub 1 Completed observation"));
IDisposable subscription2 = source.Subscribe(
i => Console.WriteLine("Sub 2 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId, i),
exception => Console.WriteLine("Sub 2 Something went wrong {0}", exception),
() => Console.WriteLine("Sub 2 Completed observation"));
Console.WriteLine("press to cancel");
Console.ReadLine();
subscription.Dispose();
subscription2.Dispose();
}
}
这会按预期产生异步交错执行。
另一方面,如果我将源更改为同步,即使观察者也会变得阻塞和同步(相同的线程 ID,在没有完全消耗 sub1 的情况下不会转到 sub2)。
有人可以帮我理解这个吗?这是同步版本
class Program
{
static void Main(string[] args)
{
// one source, produces values
IObservable<int> source = Observable.Generate(0, i => i < 2, i => ++i, i => i*i);
IObserver<int> handler = null;
// two observers that consume - first with a delay and the second immediately.
// in this case, the behavior of the observers becomes synchronous?
IDisposable subscription = source.Subscribe(
i =>
{
Console.WriteLine("Sub 1 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId,i);
Thread.Sleep(500);
},
exception => Console.WriteLine("Sub 1 Something went wrong {0}", exception),
() => Console.WriteLine("Sub 1 Completed observation"));
IDisposable subscription2 = source.Subscribe(
i => Console.WriteLine("Sub 2 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId, i),
exception => Console.WriteLine("Sub 2 Something went wrong {0}", exception),
() => Console.WriteLine("Sub 2 Completed observation"));
Console.WriteLine("press to cancel");
Console.ReadLine();
subscription.Dispose();
subscription2.Dispose();
}
}
我认为原因是为操作员选择了默认值 IScheduler
。看看接受的答案 here.
对于Generate
,这取决于过载。根据答案,这些是使用的默认调度程序。如果你愿意,你可以验证它们的来源
- 时间运算符的默认
IScheduler
是 DefaultScheduler.Instance
- 后一个运算符的默认
IScheduler
是 CurrentThreadScheduler.Instance
您可以通过在您的同步版本中提供 "non-blocking" 调度程序来确认这一点
IObservable<int> source = Observable.Generate(0, i => i < 2, i => ++i, i => i * i, DefaultScheduler.Instance);
我是 Rx 的新手,正在努力了解它。没有读过很多书,但首先尝试通过动手实验。
class Program
{
static void Main(string[] args)
{
// one source, produces values with delays
IObservable<int> source = Observable.Generate(0, i => i < 2, i => ++i, i => i*i, i => TimeSpan.FromMilliseconds(100));
IObserver<int> handler = null;
IDisposable subscription = source.Subscribe(
i =>
{
Console.WriteLine("Sub 1 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId,i);
Thread.Sleep(500);
},
exception => Console.WriteLine("Sub 1 Something went wrong {0}", exception),
() => Console.WriteLine("Sub 1 Completed observation"));
IDisposable subscription2 = source.Subscribe(
i => Console.WriteLine("Sub 2 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId, i),
exception => Console.WriteLine("Sub 2 Something went wrong {0}", exception),
() => Console.WriteLine("Sub 2 Completed observation"));
Console.WriteLine("press to cancel");
Console.ReadLine();
subscription.Dispose();
subscription2.Dispose();
}
}
这会按预期产生异步交错执行。
另一方面,如果我将源更改为同步,即使观察者也会变得阻塞和同步(相同的线程 ID,在没有完全消耗 sub1 的情况下不会转到 sub2)。 有人可以帮我理解这个吗?这是同步版本
class Program
{
static void Main(string[] args)
{
// one source, produces values
IObservable<int> source = Observable.Generate(0, i => i < 2, i => ++i, i => i*i);
IObserver<int> handler = null;
// two observers that consume - first with a delay and the second immediately.
// in this case, the behavior of the observers becomes synchronous?
IDisposable subscription = source.Subscribe(
i =>
{
Console.WriteLine("Sub 1 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId,i);
Thread.Sleep(500);
},
exception => Console.WriteLine("Sub 1 Something went wrong {0}", exception),
() => Console.WriteLine("Sub 1 Completed observation"));
IDisposable subscription2 = source.Subscribe(
i => Console.WriteLine("Sub 2 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId, i),
exception => Console.WriteLine("Sub 2 Something went wrong {0}", exception),
() => Console.WriteLine("Sub 2 Completed observation"));
Console.WriteLine("press to cancel");
Console.ReadLine();
subscription.Dispose();
subscription2.Dispose();
}
}
我认为原因是为操作员选择了默认值 IScheduler
。看看接受的答案 here.
对于Generate
,这取决于过载。根据答案,这些是使用的默认调度程序。如果你愿意,你可以验证它们的来源
- 时间运算符的默认
IScheduler
是DefaultScheduler.Instance
- 后一个运算符的默认
IScheduler
是CurrentThreadScheduler.Instance
您可以通过在您的同步版本中提供 "non-blocking" 调度程序来确认这一点
IObservable<int> source = Observable.Generate(0, i => i < 2, i => ++i, i => i * i, DefaultScheduler.Instance);