Observable.Interval 与 select 到有状态服务导致奇怪的行为
Observable.Interval with select to stateful service causes strange behaviour
我正在尝试结合使用 Observable.Interval 和 select 来轮询服务,这似乎是一种语法上不错的方法。但是,每当我尝试实现一种等待可观察对象完成的方法时,我都会得到奇怪的行为,其中 select 内部调用的服务被多次调用。
无需等待,我得到了我正在寻找的正确行为...
代码
private static ConcurrentQueue<string> _data = new ConcurrentQueue<string>(new [] { "a", "b", "c", "d" });
static void Main(string[] args)
{
var observable = Observable
.Interval(TimeSpan.FromSeconds(2))
.Select(Transform)
.TakeWhile(x => x != null);
Console.WriteLine("Starting subcription");
var disposable = observable.Subscribe(x => Console.WriteLine("Event raised for {0}", x));
Console.WriteLine("Waiting for subcription to complete");
// need to wait here
Console.WriteLine("Press any key to exit. . .");
Console.ReadKey();
}
private static string Transform(long x)
{
string result;
_data.TryDequeue(out result);
Console.WriteLine("Transform invoked [x: {0}, Result: {1}]", x, result ?? "NULL");
return result;
}
输出
Starting subcription
Waiting for subcription to complete
Press any key to exit. . .
Transform invoked [x: 0, Result: a]
Event raised for a
Transform invoked [x: 1, Result: b]
Event raised for b
Transform invoked [x: 2, Result: c]
Event raised for c
Transform invoked [x: 3, Result: d]
Event raised for d
Transform invoked [x: 4, Result: NULL]
如果我在可观察对象上调用扩展方法等待,它似乎会导致每个时间间隔调用 Transform 两次,并且只有一个值返回给事件...
代码
private static ConcurrentQueue<string> _data = new ConcurrentQueue<string>(new [] { "a", "b", "c", "d" });
static void Main(string[] args)
{
var observable = Observable
.Interval(TimeSpan.FromSeconds(2))
.Select(Transform)
.TakeWhile(x => x != null);
Console.WriteLine("Starting subcription");
var disposable = observable.Subscribe(x => Console.WriteLine("Event raised for {0}", x));
Console.WriteLine("Waiting for subcription to complete");
observable.Wait();
Console.WriteLine("Press any key to exit. . .");
Console.ReadKey();
}
private static string Transform(long x)
{
string result;
_data.TryDequeue(out result);
Console.WriteLine("Transform invoked [x: {0}, Result: {1}]", x, result ?? "NULL");
return result;
}
输出
Starting subcription
Waiting for subcription to complete
Transform invoked [x: 0, Result: a]
Event raised for a
Transform invoked [x: 0, Result: b]
Transform invoked [x: 1, Result: c]
Event raised for c
Transform invoked [x: 1, Result: d]
Transform invoked [x: 2, Result: NULL]
Transform invoked [x: 2, Result: NULL]
Press any key to exit. . .
我怀疑这是因为 Wait 正在幕后创建第二个订阅,而我的 observable 背后的服务是有状态的。
我见过有人推荐使用 ToTask 来等待 observable 完成,这有同样奇怪的行为。
那么,在所有订阅者接收同一组数据的同时,在可观察对象后面轮询有状态服务的正确方法是什么?
确保您只订阅一次可观察对象。省略对 Subscribe
的第一次调用,只留下对 Wait
的调用。如果您仍想发出一些日志消息(就像您在订阅中所做的那样),请添加 Do
-step:
private static ConcurrentQueue<string> _data = new ConcurrentQueue<string>(new [] { "a", "b", "c", "d" });
static void Main(string[] args)
{
var observable = Observable
.Interval(TimeSpan.FromSeconds(2))
.Select(Transform)
.TakeWhile(x => x != null);
Console.WriteLine("Wait for the observable to complete.");
observable
.Do(x => Console.WriteLine("Event raised for {0}", x))
.Wait();
Console.WriteLine("Press any key to exit. . .");
Console.ReadKey();
}
private static string Transform(long x)
{
string result;
_data.TryDequeue(out result);
Console.WriteLine("Transform invoked [x: {0}, Result: {1}]", x, result ?? "NULL");
return result;
}
注意 Wait
会阻塞(这在 Main
方法中是不可避免的)。此外,当您的可观察对象为空时,它会抛出。如果您对 observable 的任何值都不感兴趣,请添加 LastOrDefault
-step。
等待可观察对象本质上是一种异步操作,因此您应该检查是否可以使用 ToTask
而不是 Wait
并在异步方法中等待它。
几件事:
- 建议仅在
Subscribe
期间更改状态,而不是 Select
等其他运算符。不过,我不确定这对您的示例是否实用。
- 你的 observable 是一个 cold observable,这意味着每个订阅 re-creates 一切:一个每秒在后台滴答作响的新计时器,一个新的
Select
运算符,等等。你有两个订阅,一个来自 Subscribe
,另一个来自 Wait
,因此您将有两个计时器,两个 Select
操作员附加调用 Transform
。
您可以通过以下两种方式之一解决此问题:
- 将您的 Observable 变成热门 Observable
- 取消订阅(@Daniel Weber 概述的解决方案)
你的 observable 作为热 observable 看起来像这样:
var observable = Observable
.Interval(TimeSpan.FromSeconds(2))
.Select(Transform)
.TakeWhile(x => x != null)
.Publish()
.RefCount();
我的建议是,既然你在你的 observable 中改变状态,我会把它加热以确保你不会结束 运行 那两次。
我正在尝试结合使用 Observable.Interval 和 select 来轮询服务,这似乎是一种语法上不错的方法。但是,每当我尝试实现一种等待可观察对象完成的方法时,我都会得到奇怪的行为,其中 select 内部调用的服务被多次调用。
无需等待,我得到了我正在寻找的正确行为...
代码
private static ConcurrentQueue<string> _data = new ConcurrentQueue<string>(new [] { "a", "b", "c", "d" });
static void Main(string[] args)
{
var observable = Observable
.Interval(TimeSpan.FromSeconds(2))
.Select(Transform)
.TakeWhile(x => x != null);
Console.WriteLine("Starting subcription");
var disposable = observable.Subscribe(x => Console.WriteLine("Event raised for {0}", x));
Console.WriteLine("Waiting for subcription to complete");
// need to wait here
Console.WriteLine("Press any key to exit. . .");
Console.ReadKey();
}
private static string Transform(long x)
{
string result;
_data.TryDequeue(out result);
Console.WriteLine("Transform invoked [x: {0}, Result: {1}]", x, result ?? "NULL");
return result;
}
输出
Starting subcription
Waiting for subcription to complete
Press any key to exit. . .
Transform invoked [x: 0, Result: a]
Event raised for a
Transform invoked [x: 1, Result: b]
Event raised for b
Transform invoked [x: 2, Result: c]
Event raised for c
Transform invoked [x: 3, Result: d]
Event raised for d
Transform invoked [x: 4, Result: NULL]
如果我在可观察对象上调用扩展方法等待,它似乎会导致每个时间间隔调用 Transform 两次,并且只有一个值返回给事件...
代码
private static ConcurrentQueue<string> _data = new ConcurrentQueue<string>(new [] { "a", "b", "c", "d" });
static void Main(string[] args)
{
var observable = Observable
.Interval(TimeSpan.FromSeconds(2))
.Select(Transform)
.TakeWhile(x => x != null);
Console.WriteLine("Starting subcription");
var disposable = observable.Subscribe(x => Console.WriteLine("Event raised for {0}", x));
Console.WriteLine("Waiting for subcription to complete");
observable.Wait();
Console.WriteLine("Press any key to exit. . .");
Console.ReadKey();
}
private static string Transform(long x)
{
string result;
_data.TryDequeue(out result);
Console.WriteLine("Transform invoked [x: {0}, Result: {1}]", x, result ?? "NULL");
return result;
}
输出
Starting subcription
Waiting for subcription to complete
Transform invoked [x: 0, Result: a]
Event raised for a
Transform invoked [x: 0, Result: b]
Transform invoked [x: 1, Result: c]
Event raised for c
Transform invoked [x: 1, Result: d]
Transform invoked [x: 2, Result: NULL]
Transform invoked [x: 2, Result: NULL]
Press any key to exit. . .
我怀疑这是因为 Wait 正在幕后创建第二个订阅,而我的 observable 背后的服务是有状态的。
我见过有人推荐使用 ToTask 来等待 observable 完成,这有同样奇怪的行为。
那么,在所有订阅者接收同一组数据的同时,在可观察对象后面轮询有状态服务的正确方法是什么?
确保您只订阅一次可观察对象。省略对 Subscribe
的第一次调用,只留下对 Wait
的调用。如果您仍想发出一些日志消息(就像您在订阅中所做的那样),请添加 Do
-step:
private static ConcurrentQueue<string> _data = new ConcurrentQueue<string>(new [] { "a", "b", "c", "d" });
static void Main(string[] args)
{
var observable = Observable
.Interval(TimeSpan.FromSeconds(2))
.Select(Transform)
.TakeWhile(x => x != null);
Console.WriteLine("Wait for the observable to complete.");
observable
.Do(x => Console.WriteLine("Event raised for {0}", x))
.Wait();
Console.WriteLine("Press any key to exit. . .");
Console.ReadKey();
}
private static string Transform(long x)
{
string result;
_data.TryDequeue(out result);
Console.WriteLine("Transform invoked [x: {0}, Result: {1}]", x, result ?? "NULL");
return result;
}
注意 Wait
会阻塞(这在 Main
方法中是不可避免的)。此外,当您的可观察对象为空时,它会抛出。如果您对 observable 的任何值都不感兴趣,请添加 LastOrDefault
-step。
等待可观察对象本质上是一种异步操作,因此您应该检查是否可以使用 ToTask
而不是 Wait
并在异步方法中等待它。
几件事:
- 建议仅在
Subscribe
期间更改状态,而不是Select
等其他运算符。不过,我不确定这对您的示例是否实用。 - 你的 observable 是一个 cold observable,这意味着每个订阅 re-creates 一切:一个每秒在后台滴答作响的新计时器,一个新的
Select
运算符,等等。你有两个订阅,一个来自Subscribe
,另一个来自Wait
,因此您将有两个计时器,两个Select
操作员附加调用Transform
。
您可以通过以下两种方式之一解决此问题:
- 将您的 Observable 变成热门 Observable
- 取消订阅(@Daniel Weber 概述的解决方案)
你的 observable 作为热 observable 看起来像这样:
var observable = Observable
.Interval(TimeSpan.FromSeconds(2))
.Select(Transform)
.TakeWhile(x => x != null)
.Publish()
.RefCount();
我的建议是,既然你在你的 observable 中改变状态,我会把它加热以确保你不会结束 运行 那两次。