Observable.Interval 与 select 到有状态服务导致奇怪的行为

Observable.Interval with select to stateful service causes strange behaviour

我正在尝试结合使用 Observable.Interval 和 select 来轮询服务,这似乎是一种语法上不错的方法。但是,每当我尝试实现一种等待可观察对象完成的方法时,我都会得到奇怪的行为,其中 select 内部调用的服务被多次调用。

无需等待,我得到了我正在寻找的正确行为...

代码

private static ConcurrentQueue<string> _data = new ConcurrentQueue<string>(new [] { "a", "b", "c", "d" });

static void Main(string[] args)
{
    var observable = Observable
        .Interval(TimeSpan.FromSeconds(2))
        .Select(Transform)
        .TakeWhile(x => x != null);

    Console.WriteLine("Starting subcription");
    var disposable = observable.Subscribe(x => Console.WriteLine("Event raised for {0}", x));

    Console.WriteLine("Waiting for subcription to complete");
    // need to wait here

    Console.WriteLine("Press any key to exit. . .");
    Console.ReadKey();
}

private static string Transform(long x)
{
    string result;
    _data.TryDequeue(out result);

    Console.WriteLine("Transform invoked [x: {0}, Result: {1}]", x, result ?? "NULL");

    return result;
}

输出

Starting subcription
Waiting for subcription to complete
Press any key to exit. . .
Transform invoked [x: 0, Result: a]
Event raised for a
Transform invoked [x: 1, Result: b]
Event raised for b
Transform invoked [x: 2, Result: c]
Event raised for c
Transform invoked [x: 3, Result: d]
Event raised for d
Transform invoked [x: 4, Result: NULL]

如果我在可观察对象上调用扩展方法等待,它似乎会导致每个时间间隔调用 Transform 两次,并且只有一个值返回给事件...

代码

private static ConcurrentQueue<string> _data = new ConcurrentQueue<string>(new [] { "a", "b", "c", "d" });

static void Main(string[] args)
{
    var observable = Observable
        .Interval(TimeSpan.FromSeconds(2))
        .Select(Transform)
        .TakeWhile(x => x != null);

    Console.WriteLine("Starting subcription");
    var disposable = observable.Subscribe(x => Console.WriteLine("Event raised for {0}", x));

    Console.WriteLine("Waiting for subcription to complete");
    observable.Wait();

    Console.WriteLine("Press any key to exit. . .");
    Console.ReadKey();
}

private static string Transform(long x)
{
    string result;
    _data.TryDequeue(out result);

    Console.WriteLine("Transform invoked [x: {0}, Result: {1}]", x, result ?? "NULL");

    return result;
}

输出

Starting subcription
Waiting for subcription to complete
Transform invoked [x: 0, Result: a]
Event raised for a
Transform invoked [x: 0, Result: b]
Transform invoked [x: 1, Result: c]
Event raised for c
Transform invoked [x: 1, Result: d]
Transform invoked [x: 2, Result: NULL]
Transform invoked [x: 2, Result: NULL]
Press any key to exit. . .

我怀疑这是因为 Wait 正在幕后创建第二个订阅,而我的 observable 背后的服务是有状态的。

我见过有人推荐使用 ToTask 来等待 observable 完成,这有同样奇怪的行为。

那么,在所有订阅者接收同一组数据的同时,在可观察对象后面轮询有状态服务的正确方法是什么?

确保您只订阅一次可观察对象。省略对 Subscribe 的第一次调用,只留下对 Wait 的调用。如果您仍想发出一些日志消息(就像您在订阅中所做的那样),请添加 Do-step:

private static ConcurrentQueue<string> _data = new   ConcurrentQueue<string>(new [] { "a", "b", "c", "d" });

static void Main(string[] args)
{
    var observable = Observable
        .Interval(TimeSpan.FromSeconds(2))
        .Select(Transform)
        .TakeWhile(x => x != null);

    Console.WriteLine("Wait for the observable to complete.");
    observable
        .Do(x => Console.WriteLine("Event raised for {0}", x))
        .Wait();

    Console.WriteLine("Press any key to exit. . .");
    Console.ReadKey();
}

private static string Transform(long x)
{
    string result;
    _data.TryDequeue(out result);

    Console.WriteLine("Transform invoked [x: {0}, Result: {1}]", x, result ?? "NULL");

    return result;
}

注意 Wait 会阻塞(这在 Main 方法中是不可避免的)。此外,当您的可观察对象为空时,它会抛出。如果您对 observable 的任何值都不感兴趣,请添加 LastOrDefault-step。

等待可观察对象本质上是一种异步操作,因此您应该检查是否可以使用 ToTask 而不是 Wait 并在异步方法中等待它。

几件事:

  1. 建议仅在 Subscribe 期间更改状态,而不是 Select 等其他运算符。不过,我不确定这对您的示例是否实用。
  2. 你的 observable 是一个 cold observable,这意味着每个订阅 re-creates 一切:一个每秒在后台滴答作响的新计时器,一个新的 Select 运算符,等等。你有两个订阅,一个来自 Subscribe,另一个来自 Wait,因此您将有两个计时器,两个 Select 操作员附加调用 Transform

您可以通过以下两种方式之一解决此问题:

  1. 将您的 Observable 变成热门 Observable
  2. 取消订阅(@Daniel Weber 概述的解决方案)

你的 observable 作为热 observable 看起来像这样:

var observable = Observable
    .Interval(TimeSpan.FromSeconds(2))
    .Select(Transform)
    .TakeWhile(x => x != null)
    .Publish()
    .RefCount();

我的建议是,既然你在你的 observable 中改变状态,我会把它加热以确保你不会结束 运行 那两次。