对 Publish().Refcount() 行为的混淆
Confusion over behavior of Publish().Refcount()
我这里有一个简单的程序,可以显示各种单词中的字母数。它按预期工作。
static void Main(string[] args) {
var word = new Subject<string>();
var wordPub = word.Publish().RefCount();
var length = word.Select(i => i.Length);
var report =
wordPub
.GroupJoin(length,
s => wordPub,
s => Observable.Empty<int>(),
(w, a) => new { Word = w, Lengths = a })
.SelectMany(i => i.Lengths.Select(j => new { Word = i.Word, Length = j }));
report.Subscribe(i => Console.WriteLine($"{i.Word} {i.Length}"));
word.OnNext("Apple");
word.OnNext("Banana");
word.OnNext("Cat");
word.OnNext("Donkey");
word.OnNext("Elephant");
word.OnNext("Zebra");
Console.ReadLine();
}
输出为:
Apple 5
Banana 6
Cat 3
Donkey 6
Elephant 8
Zebra 5
我使用了 Publish().RefCount(),因为 "wordpub" 包含在 "report" 中两次。没有它,当一个词发出时,报告的一部分将首先收到回调通知,然后报告的另一部分将收到通知,通知加倍。这就是发生的事情;输出最终有 11 个项目而不是 6 个。至少这是我认为正在发生的事情。我认为在这种情况下使用 Publish().RefCount() 可以同时更新报告的两个部分。
但是,如果我将长度函数更改为也像这样使用已发布的源代码:
var length = wordPub.Select(i => i.Length);
那么输出是这样的:
Apple 5
Apple 6
Banana 6
Cat 3
Banana 3
Cat 6
Donkey 6
Elephant 8
Donkey 8
Elephant 5
Zebra 5
为什么长度函数不能也使用相同的已发布源?
RefCount
returns 一种 Observable,只要至少有一个对返回的 Observable 的订阅,它就会与源保持连接。处理完最后一个订阅后,RefCount
会处理它与源的连接,并在进行新订阅时重新连接。您的报告查询可能会在查询完成之前处理掉对 'wordPub' 的所有订阅。
您可以简单地代替复杂的 GroupJoin 查询:
var report = word.Select(x => new { Word = x, Length = x.Length });
编辑:
如果您想使用 GroupJoin
运算符,请将报告查询更改为此:
var report =
wordPub
.GroupJoin(length,
s => wordPub,
s => Observable.Empty<int>(),
(w, a) => new { Word = w, Lengths = a })
.SelectMany(i => i.Lengths.FirstAsync().Select(j => new { Word = i.Word, Length = j }));
这是一个需要解决的巨大挑战!
发生这种情况的条件如此微妙。
提前为冗长的解释道歉,但请耐心等待!
TL;DR
对已发布源的订阅按顺序处理,但在任何其他直接对未发布源的订阅之前。也就是说,你可以插队!
使用 GroupJoin
认购顺序对于确定 windows 何时打开和关闭很重要。
我首先担心的是您发布的是对主题的引用。
这应该是一个空操作。
Subject<T>
没有订阅费用。
因此,当您删除 Publish().RefCount()
时:
var word = new Subject<string>();
var wordPub = word;//.Publish().RefCount();
var length = word.Select(i => i.Length);
然后你会遇到同样的问题。
然后我查看 GroupJoin
(因为我的直觉表明 Publish().Refcount()
是一个转移注意力的问题)。
对我来说,仅靠目测很难合理化,所以我也依靠简单的调试我多年来使用了几十次 - Trace
或 Log
扩展方法。
public interface ILogger
{
void Log(string input);
}
public class DumpLogger : ILogger
{
public void Log(string input)
{
//LinqPad `Dump()` extension method.
// Could use Console.Write instead.
input.Dump();
}
}
public static class ObservableLoggingExtensions
{
private static int _index = 0;
public static IObservable<T> Log<T>(this IObservable<T> source, ILogger logger, string name)
{
return Observable.Create<T>(o =>
{
var index = Interlocked.Increment(ref _index);
var label = $"{index:0000}{name}";
logger.Log($"{label}.Subscribe()");
var disposed = Disposable.Create(() => logger.Log($"{label}.Dispose()"));
var subscription = source
.Do(
x => logger.Log($"{label}.OnNext({x.ToString()})"),
ex => logger.Log($"{label}.OnError({ex})"),
() => logger.Log($"{label}.OnCompleted()")
)
.Subscribe(o);
return new CompositeDisposable(subscription, disposed);
});
}
}
当我将日志记录添加到您提供的代码时,它看起来像这样:
var logger = new DumpLogger();
var word = new Subject<string>();
var wordPub = word.Publish().RefCount();
var length = word.Select(i => i.Length);
var report =
wordPub.Log(logger, "lhs")
.GroupJoin(word.Select(i => i.Length).Log(logger, "rhs"),
s => wordPub.Log(logger, "lhsDuration"),
s => Observable.Empty<int>().Log(logger, "rhsDuration"),
(w, a) => new { Word = w, Lengths = a })
.SelectMany(i => i.Lengths.Select(j => new { Word = i.Word, Length = j }));
report.Subscribe(i => ($"{i.Word} {i.Length}").Dump("OnNext"));
word.OnNext("Apple");
word.OnNext("Banana");
word.OnNext("Cat");
word.OnNext("Donkey");
word.OnNext("Elephant");
word.OnNext("Zebra");
这将在我的日志中输出如下内容
使用 Publish().RefCount() 记录
0001lhs.Subscribe()
0002rhs.Subscribe()
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe()
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()
OnNext
Apple 5
0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe()
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose()
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()
OnNext
Banana 6
...
然而,当我删除使用 Publish().RefCount()
时,新的日志输出如下:
没有主题的日志
0001lhs.Subscribe()
0002rhs.Subscribe()
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe()
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()
OnNext
Apple 5
0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe()
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()
OnNext
Apple 6
OnNext
Banana 6
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose()
...
这给了我们一些见解,但是当我们开始使用逻辑订阅列表注释我们的日志时,问题才真正变得清晰。
在带有 RefCount 的原始(工作)代码中,我们的注释可能如下所示
//word.Subsribers.Add(wordPub)
0001lhs.Subscribe() //wordPub.Subsribers.Add(0001lhs)
0002rhs.Subscribe() //word.Subsribers.Add(0002rhs)
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe() //wordPub.Subsribers.Add(0003lhsDuration)
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()
OnNext
Apple 5
0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe() //wordPub.Subsribers.Add(0005lhsDuration)
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose() //wordPub.Subsribers.Remove(0003lhsDuration)
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()
OnNext
Banana 6
所以在这个例子中,当 word.OnNext("Banana");
被执行时,观察者链是按照这个顺序链接的
- wordPub
- 0002rhs
不过,wordPub有子订阅!
所以真正的订阅列表看起来像
- wordPub
- 0001lhs
0003lhsDuration
- 0005lhsDuration
- 0002rhs
如果我们注释主题日志,我们就会看到微妙之处
0001lhs.Subscribe() //word.Subsribers.Add(0001lhs)
0002rhs.Subscribe() //word.Subsribers.Add(0002rhs)
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe() //word.Subsribers.Add(0003lhsDuration)
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()
OnNext
Apple 5
0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe() //word.Subsribers.Add(0005lhsDuration)
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()
OnNext
Apple 6
OnNext
Banana 6
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose()
所以在这个例子中,当 word.OnNext("Banana");
被执行时,观察者链是按照这个顺序链接的
1. 0001lhs
2. 0002rhs
3. 0003lhsDuration
4. 0005lhsDuration
由于 0003lhsDuration
订阅在 0002rhs
之后激活,它不会看到 "Banana" 值来终止 window,直到 之后rhs 已发送值,因此在仍然打开的 window.
中产生它
呼
正如@francezu13k50 所指出的,解决您的问题的明显而简单的方法就是使用 word.Select(x => new { Word = x, Length = x.Length });
,但我认为您已经为我们提供了您实际问题的简化版本(感谢),我理解为什么会这样不适合。
但是,因为我不知道你真正的问题 space 是什么,所以我不确定向你建议什么来提供解决方案,除了你有一个与你当前的代码,现在你应该知道它为什么这样工作确实如此。
因为 GroupJoin 似乎很难使用,这里是另一种关联函数输入和输出的方法。
static void Main(string[] args) {
var word = new Subject<string>();
var length = new Subject<int>();
var report =
word
.CombineLatest(length, (w, l) => new { Word = w, Length = l })
.Scan((a, b) => new { Word = b.Word, Length = a.Word == b.Word ? b.Length : -1 })
.Where(i => i.Length != -1);
report.Subscribe(i => Console.WriteLine($"{i.Word} {i.Length}"));
word.OnNext("Apple"); length.OnNext(5);
word.OnNext("Banana");
word.OnNext("Cat"); length.OnNext(3);
word.OnNext("Donkey");
word.OnNext("Elephant"); length.OnNext(8);
word.OnNext("Zebra"); length.OnNext(5);
Console.ReadLine();
}
如果每个输入都有 0 个或多个输出,但受制于 (1) 输出仅以与输入相同的顺序到达并且 (2) 每个输出对应于其最新输入的约束,则此方法有效。这就像 LeftJoin - 第一个列表(单词)中的每个项目都与随后到达的右侧列表(长度)中的项目配对,直到发出第一个列表中的另一个项目。
正在尝试使用常规 Join 而不是 GroupJoin。我认为问题在于,当创建一个新词时,Join 内部在创建新 window 和结束当前词之间存在竞争条件。所以在这里我试图通过将每个单词与表示 window 结束的空配对来消除它。不起作用,就像第一个版本没有一样。在没有先关闭前一个单词的情况下,如何为每个单词创建一个新的 window ?完全糊涂了。
static void Main(string[] args) {
var lgr = new DelegateLogger(Console.WriteLine);
var word = new Subject<string>();
var wordDelimited =
word
.Select(i => Observable.Return<string>(null).StartWith(i))
.SelectMany(i => i);
var wordStart = wordDelimited.Where(i => i != null);
var wordEnd = wordDelimited.Where(i => i == null);
var report = Observable
.Join(
wordStart.Log(lgr, "word"), // starts window
wordStart.Select(i => i.Length),
s => wordEnd.Log(lgr, "expireWord"), // ends current window
s => Observable.Empty<int>(),
(l, r) => new { Word = l, Length = r });
report.Subscribe(i => Console.WriteLine($"{i.Word} {i.Length}"));
word.OnNext("Apple");
word.OnNext("Banana");
word.OnNext("Cat");
word.OnNext("Zebra");
word.OnNext("Elephant");
word.OnNext("Bear");
Console.ReadLine();
}
我这里有一个简单的程序,可以显示各种单词中的字母数。它按预期工作。
static void Main(string[] args) {
var word = new Subject<string>();
var wordPub = word.Publish().RefCount();
var length = word.Select(i => i.Length);
var report =
wordPub
.GroupJoin(length,
s => wordPub,
s => Observable.Empty<int>(),
(w, a) => new { Word = w, Lengths = a })
.SelectMany(i => i.Lengths.Select(j => new { Word = i.Word, Length = j }));
report.Subscribe(i => Console.WriteLine($"{i.Word} {i.Length}"));
word.OnNext("Apple");
word.OnNext("Banana");
word.OnNext("Cat");
word.OnNext("Donkey");
word.OnNext("Elephant");
word.OnNext("Zebra");
Console.ReadLine();
}
输出为:
Apple 5
Banana 6
Cat 3
Donkey 6
Elephant 8
Zebra 5
我使用了 Publish().RefCount(),因为 "wordpub" 包含在 "report" 中两次。没有它,当一个词发出时,报告的一部分将首先收到回调通知,然后报告的另一部分将收到通知,通知加倍。这就是发生的事情;输出最终有 11 个项目而不是 6 个。至少这是我认为正在发生的事情。我认为在这种情况下使用 Publish().RefCount() 可以同时更新报告的两个部分。
但是,如果我将长度函数更改为也像这样使用已发布的源代码:
var length = wordPub.Select(i => i.Length);
那么输出是这样的:
Apple 5
Apple 6
Banana 6
Cat 3
Banana 3
Cat 6
Donkey 6
Elephant 8
Donkey 8
Elephant 5
Zebra 5
为什么长度函数不能也使用相同的已发布源?
RefCount
returns 一种 Observable,只要至少有一个对返回的 Observable 的订阅,它就会与源保持连接。处理完最后一个订阅后,RefCount
会处理它与源的连接,并在进行新订阅时重新连接。您的报告查询可能会在查询完成之前处理掉对 'wordPub' 的所有订阅。
您可以简单地代替复杂的 GroupJoin 查询:
var report = word.Select(x => new { Word = x, Length = x.Length });
编辑:
如果您想使用 GroupJoin
运算符,请将报告查询更改为此:
var report =
wordPub
.GroupJoin(length,
s => wordPub,
s => Observable.Empty<int>(),
(w, a) => new { Word = w, Lengths = a })
.SelectMany(i => i.Lengths.FirstAsync().Select(j => new { Word = i.Word, Length = j }));
这是一个需要解决的巨大挑战! 发生这种情况的条件如此微妙。 提前为冗长的解释道歉,但请耐心等待!
TL;DR
对已发布源的订阅按顺序处理,但在任何其他直接对未发布源的订阅之前。也就是说,你可以插队!
使用 GroupJoin
认购顺序对于确定 windows 何时打开和关闭很重要。
我首先担心的是您发布的是对主题的引用。
这应该是一个空操作。
Subject<T>
没有订阅费用。
因此,当您删除 Publish().RefCount()
时:
var word = new Subject<string>();
var wordPub = word;//.Publish().RefCount();
var length = word.Select(i => i.Length);
然后你会遇到同样的问题。
然后我查看 GroupJoin
(因为我的直觉表明 Publish().Refcount()
是一个转移注意力的问题)。
对我来说,仅靠目测很难合理化,所以我也依靠简单的调试我多年来使用了几十次 - Trace
或 Log
扩展方法。
public interface ILogger
{
void Log(string input);
}
public class DumpLogger : ILogger
{
public void Log(string input)
{
//LinqPad `Dump()` extension method.
// Could use Console.Write instead.
input.Dump();
}
}
public static class ObservableLoggingExtensions
{
private static int _index = 0;
public static IObservable<T> Log<T>(this IObservable<T> source, ILogger logger, string name)
{
return Observable.Create<T>(o =>
{
var index = Interlocked.Increment(ref _index);
var label = $"{index:0000}{name}";
logger.Log($"{label}.Subscribe()");
var disposed = Disposable.Create(() => logger.Log($"{label}.Dispose()"));
var subscription = source
.Do(
x => logger.Log($"{label}.OnNext({x.ToString()})"),
ex => logger.Log($"{label}.OnError({ex})"),
() => logger.Log($"{label}.OnCompleted()")
)
.Subscribe(o);
return new CompositeDisposable(subscription, disposed);
});
}
}
当我将日志记录添加到您提供的代码时,它看起来像这样:
var logger = new DumpLogger();
var word = new Subject<string>();
var wordPub = word.Publish().RefCount();
var length = word.Select(i => i.Length);
var report =
wordPub.Log(logger, "lhs")
.GroupJoin(word.Select(i => i.Length).Log(logger, "rhs"),
s => wordPub.Log(logger, "lhsDuration"),
s => Observable.Empty<int>().Log(logger, "rhsDuration"),
(w, a) => new { Word = w, Lengths = a })
.SelectMany(i => i.Lengths.Select(j => new { Word = i.Word, Length = j }));
report.Subscribe(i => ($"{i.Word} {i.Length}").Dump("OnNext"));
word.OnNext("Apple");
word.OnNext("Banana");
word.OnNext("Cat");
word.OnNext("Donkey");
word.OnNext("Elephant");
word.OnNext("Zebra");
这将在我的日志中输出如下内容
使用 Publish().RefCount() 记录
0001lhs.Subscribe()
0002rhs.Subscribe()
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe()
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()
OnNext
Apple 5
0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe()
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose()
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()
OnNext
Banana 6
...
然而,当我删除使用 Publish().RefCount()
时,新的日志输出如下:
没有主题的日志
0001lhs.Subscribe()
0002rhs.Subscribe()
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe()
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()
OnNext
Apple 5
0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe()
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()
OnNext
Apple 6
OnNext
Banana 6
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose()
...
这给了我们一些见解,但是当我们开始使用逻辑订阅列表注释我们的日志时,问题才真正变得清晰。
在带有 RefCount 的原始(工作)代码中,我们的注释可能如下所示
//word.Subsribers.Add(wordPub)
0001lhs.Subscribe() //wordPub.Subsribers.Add(0001lhs)
0002rhs.Subscribe() //word.Subsribers.Add(0002rhs)
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe() //wordPub.Subsribers.Add(0003lhsDuration)
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()
OnNext
Apple 5
0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe() //wordPub.Subsribers.Add(0005lhsDuration)
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose() //wordPub.Subsribers.Remove(0003lhsDuration)
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()
OnNext
Banana 6
所以在这个例子中,当 word.OnNext("Banana");
被执行时,观察者链是按照这个顺序链接的
- wordPub
- 0002rhs
不过,wordPub有子订阅! 所以真正的订阅列表看起来像
- wordPub
- 0001lhs
0003lhsDuration- 0005lhsDuration
- 0002rhs
如果我们注释主题日志,我们就会看到微妙之处
0001lhs.Subscribe() //word.Subsribers.Add(0001lhs)
0002rhs.Subscribe() //word.Subsribers.Add(0002rhs)
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe() //word.Subsribers.Add(0003lhsDuration)
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()
OnNext
Apple 5
0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe() //word.Subsribers.Add(0005lhsDuration)
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()
OnNext
Apple 6
OnNext
Banana 6
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose()
所以在这个例子中,当 word.OnNext("Banana");
被执行时,观察者链是按照这个顺序链接的
1. 0001lhs
2. 0002rhs
3. 0003lhsDuration
4. 0005lhsDuration
由于 0003lhsDuration
订阅在 0002rhs
之后激活,它不会看到 "Banana" 值来终止 window,直到 之后rhs 已发送值,因此在仍然打开的 window.
呼
正如@francezu13k50 所指出的,解决您的问题的明显而简单的方法就是使用 word.Select(x => new { Word = x, Length = x.Length });
,但我认为您已经为我们提供了您实际问题的简化版本(感谢),我理解为什么会这样不适合。
但是,因为我不知道你真正的问题 space 是什么,所以我不确定向你建议什么来提供解决方案,除了你有一个与你当前的代码,现在你应该知道它为什么这样工作确实如此。
因为 GroupJoin 似乎很难使用,这里是另一种关联函数输入和输出的方法。
static void Main(string[] args) {
var word = new Subject<string>();
var length = new Subject<int>();
var report =
word
.CombineLatest(length, (w, l) => new { Word = w, Length = l })
.Scan((a, b) => new { Word = b.Word, Length = a.Word == b.Word ? b.Length : -1 })
.Where(i => i.Length != -1);
report.Subscribe(i => Console.WriteLine($"{i.Word} {i.Length}"));
word.OnNext("Apple"); length.OnNext(5);
word.OnNext("Banana");
word.OnNext("Cat"); length.OnNext(3);
word.OnNext("Donkey");
word.OnNext("Elephant"); length.OnNext(8);
word.OnNext("Zebra"); length.OnNext(5);
Console.ReadLine();
}
如果每个输入都有 0 个或多个输出,但受制于 (1) 输出仅以与输入相同的顺序到达并且 (2) 每个输出对应于其最新输入的约束,则此方法有效。这就像 LeftJoin - 第一个列表(单词)中的每个项目都与随后到达的右侧列表(长度)中的项目配对,直到发出第一个列表中的另一个项目。
正在尝试使用常规 Join 而不是 GroupJoin。我认为问题在于,当创建一个新词时,Join 内部在创建新 window 和结束当前词之间存在竞争条件。所以在这里我试图通过将每个单词与表示 window 结束的空配对来消除它。不起作用,就像第一个版本没有一样。在没有先关闭前一个单词的情况下,如何为每个单词创建一个新的 window ?完全糊涂了。
static void Main(string[] args) {
var lgr = new DelegateLogger(Console.WriteLine);
var word = new Subject<string>();
var wordDelimited =
word
.Select(i => Observable.Return<string>(null).StartWith(i))
.SelectMany(i => i);
var wordStart = wordDelimited.Where(i => i != null);
var wordEnd = wordDelimited.Where(i => i == null);
var report = Observable
.Join(
wordStart.Log(lgr, "word"), // starts window
wordStart.Select(i => i.Length),
s => wordEnd.Log(lgr, "expireWord"), // ends current window
s => Observable.Empty<int>(),
(l, r) => new { Word = l, Length = r });
report.Subscribe(i => Console.WriteLine($"{i.Word} {i.Length}"));
word.OnNext("Apple");
word.OnNext("Banana");
word.OnNext("Cat");
word.OnNext("Zebra");
word.OnNext("Elephant");
word.OnNext("Bear");
Console.ReadLine();
}