仅将 GroupBy 与 Concat 结合使用 returns 第一组值
Using GroupBy with Concat only returns first group values
我有一些 GroupBy 的测试代码,它按预期工作...
代码
var sw = Stopwatch.StartNew();
int groupSize = 5;
var coreObservable = Observable
.Range(1, 20)
.Select((x, idx) => new { x, idx })
.GroupBy(x => x.idx / groupSize, x => x.x)
.Select(x => x.ToList())
.Replay()
.RefCount();
coreObservable.Subscribe(
x => x.Subscribe(y => Console.WriteLine("Event raised [Books: {0}, Timestamp: {1}]", string.Join("|", y), sw.Elapsed)),
() => Console.WriteLine("Subcription closed"));
coreObservable.Wait(); // blocking until observable completes
输出
Event raised [Values: 1|2|3|4|5, Timestamp: 00:00:00.3224002]
Event raised [Values: 6|7|8|9|10, Timestamp: 00:00:00.3268353]
Event raised [Values: 11|12|13|14|15, Timestamp: 00:00:00.3270101]
Event raised [Values: 16|17|18|19|20, Timestamp: 00:00:00.3270803]
Subcription closed
问题是当我尝试对这个表达式使用 Concat 时...
代码
var sw = Stopwatch.StartNew();
int groupSize = 5;
var coreObservable = Observable
.Range(1, 20)
.Select((x, idx) => new { x, idx })
.GroupBy(x => x.idx / groupSize, x => x.x)
.Select(x => x.ToList())
.Concat() // JUST ADDED THIS
.Replay()
.RefCount();
coreObservable.Subscribe(
x => Console.WriteLine("Event raised [Values: {0}, Timestamp: {1}]", string.Join("|", x), sw.Elapsed),
() => Console.WriteLine("Subcription closed"));
coreObservable.Wait(); // blocking until observable completes
输出
Event raised [Values: 1|2|3|4|5, Timestamp: 00:00:00.2728469]
Event raised [Values: , Timestamp: 00:00:00.2791311]
Event raised [Values: , Timestamp: 00:00:00.2793720]
Event raised [Values: , Timestamp: 00:00:00.2794617]
Subcription closed
注意只公开了第一组值。
我使用 GroupBy 而不是 Buffer 的原因是因为我试图将其用作为突发数据馈送创建最大大小块的方法。原始的可观察对象可能是项目数组,当单个事件中的项目太多时,我想拆分数组。
我想使用 Concat 的原因是因为我希望能够在数组事件之间创建延迟,就像很多人推荐的那样 here。
将 Concat()
替换为 Merge()
,它可以正常工作。
我认为你的问题的原因是 Concat()
在当前序列完成之前不会开始收听下一个序列。
连接图:
s1 --0--1--2-|
s2 -5--6--7--8--|
r --0--1--2--5--6--7--8--|
而Merge()
同时订阅所有子序列,并在任何子序列发布一个值时发布一个值。
合并图表:
s1 --1--1--1--|
s2 ---2---2---2|
r --12-1-21--2|
因此,在您的情况下,Concat()
订阅了 Select(x => x.ToList())
中的第一个 IObservable<IList<int>>
,发布值直到它完成,然后订阅下一个序列。 GroupBy()
将为它找到的每个组创建一个新的 IGroupedObservable
流,但是所有 IGroupedObservable
将同时完成:当基础流完成时。
所以Concat()
监听第一个流直到它完成,但是当第一个流完成时,所有其他流也都完成了(因为它们实际上都是相同的序列,只是按键分割),所以对于以下序列,它没有要发布的值。
所有图表都是从 here 借来的,这是 Rx 的绝佳资源,我强烈建议您在那里查看有关各种运算符如何工作的任何问题。
您的问题可以简化为这样的问题,考虑起来可能更简单:
var sw = Stopwatch.StartNew();
var subject = new Subject<int>();
var o2 = subject.Where(i => i % 2 == 0).ToList();
var o3 = subject.Where(i => i % 3 == 0).ToList();
var o4 = subject.Where(i => i % 4 == 0).ToList();
var c = Observable.Concat(o2, o3, o4)
// .Replay()
// .RefCount()
//.Replay().RefCount() has no impact here.
;
c.Subscribe(
x => Console.WriteLine("Event raised [Values: {0}, Timestamp: {1}]", string.Join("|", x), sw.Elapsed),
() => Console.WriteLine("Subcription closed"));
for(int i = 0; i < 6; i++)
subject.OnNext(i);
subject.OnCompleted();
输出:
Event raised [Values: 0|2|4, Timestamp: 00:00:00.0002278]
Event raised [Values: , Timestamp: 00:00:00.0002850]
Event raised [Values: , Timestamp: 00:00:00.0003049]
Subcription closed
如果你要用大理石绘制这些图,它会像这样:
s : 012345|
o2 : ------(024)|
o3 : ------(03) |
o4 : ------(04) |
cOut: ------(024)|
cSub: (So2)------(So3)(So4)
cSub shows when c subscribes to child observables. cOut shows c's output.
So2 means subscribe to o2, So3 means subscribe to o3, etc..
Concat
订阅传递给它的第一个 observable,然后仅在当前 observable 完成时订阅后续的 observable。在我们的例子中,ToList
在源完成之前不会遗漏任何东西,当它转储整个列表时。所以o2
、o3
、o4
都同时完成,但是c
只订阅了o2
。 o2
完成后,它会尝试订阅其他人,但他们已经完成了。
至于如何修复它,Merge
会起作用,但我猜您想在第 2 组之前处理第 1 组,Merge
会中断。
我有一些 GroupBy 的测试代码,它按预期工作...
代码
var sw = Stopwatch.StartNew();
int groupSize = 5;
var coreObservable = Observable
.Range(1, 20)
.Select((x, idx) => new { x, idx })
.GroupBy(x => x.idx / groupSize, x => x.x)
.Select(x => x.ToList())
.Replay()
.RefCount();
coreObservable.Subscribe(
x => x.Subscribe(y => Console.WriteLine("Event raised [Books: {0}, Timestamp: {1}]", string.Join("|", y), sw.Elapsed)),
() => Console.WriteLine("Subcription closed"));
coreObservable.Wait(); // blocking until observable completes
输出
Event raised [Values: 1|2|3|4|5, Timestamp: 00:00:00.3224002]
Event raised [Values: 6|7|8|9|10, Timestamp: 00:00:00.3268353]
Event raised [Values: 11|12|13|14|15, Timestamp: 00:00:00.3270101]
Event raised [Values: 16|17|18|19|20, Timestamp: 00:00:00.3270803]
Subcription closed
问题是当我尝试对这个表达式使用 Concat 时...
代码
var sw = Stopwatch.StartNew();
int groupSize = 5;
var coreObservable = Observable
.Range(1, 20)
.Select((x, idx) => new { x, idx })
.GroupBy(x => x.idx / groupSize, x => x.x)
.Select(x => x.ToList())
.Concat() // JUST ADDED THIS
.Replay()
.RefCount();
coreObservable.Subscribe(
x => Console.WriteLine("Event raised [Values: {0}, Timestamp: {1}]", string.Join("|", x), sw.Elapsed),
() => Console.WriteLine("Subcription closed"));
coreObservable.Wait(); // blocking until observable completes
输出
Event raised [Values: 1|2|3|4|5, Timestamp: 00:00:00.2728469]
Event raised [Values: , Timestamp: 00:00:00.2791311]
Event raised [Values: , Timestamp: 00:00:00.2793720]
Event raised [Values: , Timestamp: 00:00:00.2794617]
Subcription closed
注意只公开了第一组值。
我使用 GroupBy 而不是 Buffer 的原因是因为我试图将其用作为突发数据馈送创建最大大小块的方法。原始的可观察对象可能是项目数组,当单个事件中的项目太多时,我想拆分数组。
我想使用 Concat 的原因是因为我希望能够在数组事件之间创建延迟,就像很多人推荐的那样 here。
将 Concat()
替换为 Merge()
,它可以正常工作。
我认为你的问题的原因是 Concat()
在当前序列完成之前不会开始收听下一个序列。
连接图:
s1 --0--1--2-|
s2 -5--6--7--8--|
r --0--1--2--5--6--7--8--|
而Merge()
同时订阅所有子序列,并在任何子序列发布一个值时发布一个值。
合并图表:
s1 --1--1--1--|
s2 ---2---2---2|
r --12-1-21--2|
因此,在您的情况下,Concat()
订阅了 Select(x => x.ToList())
中的第一个 IObservable<IList<int>>
,发布值直到它完成,然后订阅下一个序列。 GroupBy()
将为它找到的每个组创建一个新的 IGroupedObservable
流,但是所有 IGroupedObservable
将同时完成:当基础流完成时。
所以Concat()
监听第一个流直到它完成,但是当第一个流完成时,所有其他流也都完成了(因为它们实际上都是相同的序列,只是按键分割),所以对于以下序列,它没有要发布的值。
所有图表都是从 here 借来的,这是 Rx 的绝佳资源,我强烈建议您在那里查看有关各种运算符如何工作的任何问题。
您的问题可以简化为这样的问题,考虑起来可能更简单:
var sw = Stopwatch.StartNew();
var subject = new Subject<int>();
var o2 = subject.Where(i => i % 2 == 0).ToList();
var o3 = subject.Where(i => i % 3 == 0).ToList();
var o4 = subject.Where(i => i % 4 == 0).ToList();
var c = Observable.Concat(o2, o3, o4)
// .Replay()
// .RefCount()
//.Replay().RefCount() has no impact here.
;
c.Subscribe(
x => Console.WriteLine("Event raised [Values: {0}, Timestamp: {1}]", string.Join("|", x), sw.Elapsed),
() => Console.WriteLine("Subcription closed"));
for(int i = 0; i < 6; i++)
subject.OnNext(i);
subject.OnCompleted();
输出:
Event raised [Values: 0|2|4, Timestamp: 00:00:00.0002278]
Event raised [Values: , Timestamp: 00:00:00.0002850]
Event raised [Values: , Timestamp: 00:00:00.0003049]
Subcription closed
如果你要用大理石绘制这些图,它会像这样:
s : 012345|
o2 : ------(024)|
o3 : ------(03) |
o4 : ------(04) |
cOut: ------(024)|
cSub: (So2)------(So3)(So4)
cSub shows when c subscribes to child observables. cOut shows c's output.
So2 means subscribe to o2, So3 means subscribe to o3, etc..
Concat
订阅传递给它的第一个 observable,然后仅在当前 observable 完成时订阅后续的 observable。在我们的例子中,ToList
在源完成之前不会遗漏任何东西,当它转储整个列表时。所以o2
、o3
、o4
都同时完成,但是c
只订阅了o2
。 o2
完成后,它会尝试订阅其他人,但他们已经完成了。
至于如何修复它,Merge
会起作用,但我猜您想在第 2 组之前处理第 1 组,Merge
会中断。