对 Publish().Refcount() 行为的混淆

Confusion over behavior of Publish().Refcount()

我这里有一个简单的程序,可以显示各种单词中的字母数。它按预期工作。

static void Main(string[] args) {
    var word = new Subject<string>();
    var wordPub = word.Publish().RefCount();
    var length = word.Select(i => i.Length);
    var report =
        wordPub
        .GroupJoin(length,
            s => wordPub,
            s => Observable.Empty<int>(),
            (w, a) => new { Word = w, Lengths = a })
        .SelectMany(i => i.Lengths.Select(j => new { Word = i.Word, Length = j }));
    report.Subscribe(i => Console.WriteLine($"{i.Word} {i.Length}"));
    word.OnNext("Apple");
    word.OnNext("Banana");
    word.OnNext("Cat");
    word.OnNext("Donkey");
    word.OnNext("Elephant");
    word.OnNext("Zebra");
    Console.ReadLine();
}

输出为:

Apple 5
Banana 6
Cat 3
Donkey 6
Elephant 8
Zebra 5

我使用了 Publish().RefCount(),因为 "wordpub" 包含在 "report" 中两次。没有它,当一个词发出时,报告的一部分将首先收到回调通知,然后报告的另一部分将收到通知,通知加倍。这就是发生的事情;输出最终有 11 个项目而不是 6 个。至少这是我认为正在发生的事情。我认为在这种情况下使用 Publish().RefCount() 可以同时更新报告的两个部分。

但是,如果我将长度函数更改为也像这样使用已发布的源代码:

var length = wordPub.Select(i => i.Length);

那么输出是这样的:

Apple 5
Apple 6
Banana 6
Cat 3
Banana 3
Cat 6
Donkey 6
Elephant 8
Donkey 8
Elephant 5
Zebra 5

为什么长度函数不能也使用相同的已发布源?

RefCount returns 一种 Observable,只要至少有一个对返回的 Observable 的订阅,它就会与源保持连接。处理完最后一个订阅后,RefCount 会处理它与源的连接,并在进行新订阅时重新连接。您的报告查询可能会在查询完成之前处理掉对 'wordPub' 的所有订阅。

您可以简单地代替复杂的 GroupJoin 查询:

var report = word.Select(x => new { Word = x, Length = x.Length });

编辑: 如果您想使用 GroupJoin 运算符,请将报告查询更改为此:

    var report =
        wordPub
        .GroupJoin(length,
            s => wordPub,
            s => Observable.Empty<int>(),
            (w, a) => new { Word = w, Lengths = a })
        .SelectMany(i => i.Lengths.FirstAsync().Select(j => new { Word = i.Word, Length = j }));

这是一个需要解决的巨大挑战! 发生这种情况的条件如此微妙。 提前为冗长的解释道歉,但请耐心等待!

TL;DR

对已发布源的订阅按顺序处理,但在任何其他直接对未发布源的订阅之前。也就是说,你可以插队! 使用 GroupJoin 认购顺序对于确定 windows 何时打开和关闭很重要。


我首先担心的是您发布的是对主题的引用。 这应该是一个空操作。 Subject<T> 没有订阅费用。

因此,当您删除 Publish().RefCount() 时:

var word = new Subject<string>();
var wordPub = word;//.Publish().RefCount();
var length = word.Select(i => i.Length);

然后你会遇到同样的问题。

然后我查看 GroupJoin(因为我的直觉表明 Publish().Refcount() 是一个转移注意力的问题)。 对我来说,仅靠目测很难合理化,所以我也依靠简单的调试我多年来使用了几十次 - TraceLog 扩展方法。

public interface ILogger
{
    void Log(string input);
}
public class DumpLogger : ILogger
{
    public void Log(string input)
    {
        //LinqPad `Dump()` extension method. 
        //  Could use Console.Write instead.
        input.Dump();
    }
}


public static class ObservableLoggingExtensions
{
    private static int _index = 0;

    public static IObservable<T> Log<T>(this IObservable<T> source, ILogger logger, string name)
    {
        return Observable.Create<T>(o =>
        {
            var index = Interlocked.Increment(ref _index);
            var label = $"{index:0000}{name}";
            logger.Log($"{label}.Subscribe()");
            var disposed = Disposable.Create(() => logger.Log($"{label}.Dispose()"));
            var subscription = source
                .Do(
                    x => logger.Log($"{label}.OnNext({x.ToString()})"),
                    ex => logger.Log($"{label}.OnError({ex})"),
                    () => logger.Log($"{label}.OnCompleted()")
                )
                .Subscribe(o);

            return new CompositeDisposable(subscription, disposed);
        });
    }
}

当我将日志记录添加到您提供的代码时,它看起来像这样:

var logger = new DumpLogger();

var word = new Subject<string>();
var wordPub = word.Publish().RefCount();
var length = word.Select(i => i.Length);
var report =
    wordPub.Log(logger, "lhs")
    .GroupJoin(word.Select(i => i.Length).Log(logger, "rhs"),
        s => wordPub.Log(logger, "lhsDuration"),
        s => Observable.Empty<int>().Log(logger, "rhsDuration"),
        (w, a) => new { Word = w, Lengths = a })
    .SelectMany(i => i.Lengths.Select(j => new { Word = i.Word, Length = j }));
report.Subscribe(i => ($"{i.Word} {i.Length}").Dump("OnNext"));
word.OnNext("Apple");
word.OnNext("Banana");
word.OnNext("Cat");
word.OnNext("Donkey");
word.OnNext("Elephant");
word.OnNext("Zebra");

这将在我的日志中输出如下内容

使用 Publish().RefCount() 记录

0001lhs.Subscribe()             
0002rhs.Subscribe()             
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe()     
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()

    OnNext
    Apple 5 

0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe()     
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose()       
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()

    OnNext
    Banana 6 
...

然而,当我删除使用 Publish().RefCount() 时,新的日志输出如下:

没有主题的日志

0001lhs.Subscribe()                 
0002rhs.Subscribe()                 
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe()         
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()

    OnNext
    Apple 5 

0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe()         
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()

    OnNext
    Apple 6 

    OnNext
    Banana 6 

0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose()
...

这给了我们一些见解,但是当我们开始使用逻辑订阅列表注释我们的日志时,问题才真正变得清晰。

在带有 RefCount 的原始(工作)代码中,我们的注释可能如下所示

//word.Subsribers.Add(wordPub)

0001lhs.Subscribe()             //wordPub.Subsribers.Add(0001lhs)
0002rhs.Subscribe()             //word.Subsribers.Add(0002rhs)
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe()     //wordPub.Subsribers.Add(0003lhsDuration)
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()

    OnNext
    Apple 5 

0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe()     //wordPub.Subsribers.Add(0005lhsDuration)
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose()       //wordPub.Subsribers.Remove(0003lhsDuration)
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()

    OnNext
    Banana 6 

所以在这个例子中,当 word.OnNext("Banana"); 被执行时,观察者链是按照这个顺序链接的

  1. wordPub
  2. 0002rhs

不过wordPub有子订阅! 所以真正的订阅列表看起来像

  1. wordPub
    1. 0001lhs
    2. 0003lhsDuration
    3. 0005lhsDuration
  2. 0002rhs

如果我们注释主题日志,我们就会看到微妙之处

0001lhs.Subscribe()                 //word.Subsribers.Add(0001lhs)
0002rhs.Subscribe()                 //word.Subsribers.Add(0002rhs)
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe()         //word.Subsribers.Add(0003lhsDuration)
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()

    OnNext
    Apple 5 

0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe()         //word.Subsribers.Add(0005lhsDuration)
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()

    OnNext
    Apple 6 

    OnNext
    Banana 6 

0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose()

所以在这个例子中,当 word.OnNext("Banana"); 被执行时,观察者链是按照这个顺序链接的

1. 0001lhs
2. 0002rhs
3. 0003lhsDuration
4. 0005lhsDuration

由于 0003lhsDuration 订阅在 0002rhs 之后激活,它不会看到 "Banana" 值来终止 window,直到 之后rhs 已发送值,因此在仍然打开的 window.

中产生它

正如@francezu13k50 所指出的,解决您的问题的明显而简单的方法就是使用 word.Select(x => new { Word = x, Length = x.Length });,但我认为您已经为我们提供了您实际问题的简化版本(感谢),我理解为什么会这样不适合。 但是,因为我不知道你真正的问题 space 是什么,所以我不确定向你建议什么来提供解决方案,除了你有一个与你当前的代码,现在你应该知道它为什么这样工作确实如此。

因为 GroupJoin 似乎很难使用,这里是另一种关联函数输入和输出的方法。

static void Main(string[] args) {
    var word = new Subject<string>();
    var length = new Subject<int>();
    var report =
        word
        .CombineLatest(length, (w, l) => new { Word = w, Length = l })
        .Scan((a, b) => new { Word = b.Word, Length = a.Word == b.Word ? b.Length : -1 })
        .Where(i => i.Length != -1);
    report.Subscribe(i => Console.WriteLine($"{i.Word} {i.Length}"));
    word.OnNext("Apple"); length.OnNext(5);
    word.OnNext("Banana");
    word.OnNext("Cat"); length.OnNext(3);
    word.OnNext("Donkey");
    word.OnNext("Elephant"); length.OnNext(8);
    word.OnNext("Zebra"); length.OnNext(5);
    Console.ReadLine();
}

如果每个输入都有 0 个或多个输出,但受制于 (1) 输出仅以与输入相同的顺序到达并且 (2) 每个输出对应于其最新输入的约束,则此方法有效。这就像 LeftJoin - 第一个列表(单词)中的每个项目都与随后到达的右侧列表(长度)中的项目配对,直到发出第一个列表中的另一个项目。

正在尝试使用常规 Join 而不是 GroupJoin。我认为问题在于,当创建一个新词时,Join 内部在创建新 window 和结束当前词之间存在竞争条件。所以在这里我试图通过将每个单词与表示 window 结束的空配对来消除它。不起作用,就像第一个版本没有一样。在没有先关闭前一个单词的情况下,如何为每个单词创建一个新的 window ?完全糊涂了。

static void Main(string[] args) {
    var lgr = new DelegateLogger(Console.WriteLine);
    var word = new Subject<string>();
    var wordDelimited =
        word
        .Select(i => Observable.Return<string>(null).StartWith(i))
        .SelectMany(i => i);
    var wordStart = wordDelimited.Where(i => i != null);
    var wordEnd = wordDelimited.Where(i => i == null);
    var report = Observable
        .Join(
            wordStart.Log(lgr, "word"), // starts window
            wordStart.Select(i => i.Length),
            s => wordEnd.Log(lgr, "expireWord"), // ends current window
            s => Observable.Empty<int>(),
            (l, r) => new { Word = l, Length = r });
    report.Subscribe(i => Console.WriteLine($"{i.Word} {i.Length}"));
    word.OnNext("Apple");
    word.OnNext("Banana");
    word.OnNext("Cat");
    word.OnNext("Zebra");
    word.OnNext("Elephant");
    word.OnNext("Bear");
    Console.ReadLine();
}