windowCount 下降值

windowCount dropping values

我正在尝试使用 windowCount 将我的可观察值分组,并针对每个组的每个值发送请求。
然后,连接这些组,以便下一组的请求不会在当前组的请求之前开始请求未完成。
问题是某些值被跳过。

这是我的代码。
(我没有进行实际的 ajax 调用在这里,但 Observable.timer 应该可以作为示例)。

Observable.interval(300)
     .take(12)
     .windowCount(3)
     .concatMap(obs => {
         return obs.mergeMap(
             v => Observable.timer(Math.random() * 1500).mapTo(v)
         );
     })
     .do(v => console.log(v))
     .finally(() => console.log('fin'))
     .subscribe();

我尝试通过手动创建组来替换 windowCount。它完美地工作。没有跳过任何值。

Observable.interval(900)
    .take(4)
    .map(i => Observable.interval(300).take(3).map(j => j + i * 3))
    .concatMap(obs => {
        return obs.mergeMap(
            v => Observable.timer(Math.random() * 1500).mapTo(v)
        );
    })
    .do(v => console.log(v))
    .finally(() => console.log('fin'))
    .subscribe();

我的印象是 windowCount 应该以相同的方式对发出的值进行分组。
但是,显然它做了其他事情。

我将非常感谢对其行为的任何解释。

谢谢!

缺失值是使用热可观察对象 (Observable.interval(300)) 的结果,它继续输出您未存储以供使用的值。

以下是您的代码的一个稍微简化的版本,它还记录了发出数字的时间。我将 Math.random() 替换为 1 以便输出是确定性的。我也把代码加载到jsbin里给你试试:

https://jsbin.com/burocu/edit?js,console

Observable.interval(300)
    .do(x => console.log(x + ") hot observable at: " + (x * 300 + 300)))
    .take(12)
    .windowCount(3)
    .do(observe3 => {observe3.toArray()
      .subscribe(x => console.log(x + " do window count at: " + (x[2] * 300 + 300)));})
    .concatMap(obs => {
        return obs.mergeMap(
            v => Observable.timer(1 * 1500).mapTo(v)
        )
        .do(v => console.log(v + " merge map at: " + (v * 300 + 300 + 1500)));
    })
    .finally(() => console.log('fin windowCount'))
    .subscribe();

结果如下。请注意,当其他运算符仍在处理时,热 observables 继续前进。

这就是给您的印象是价值正在下降的原因。您可以看到 windowCount(3) 正在做您所想的 而不是您所想的 when

"0) hot observable at: 300"
"1) hot observable at: 600"
"2) hot observable at: 900"
"0,1,2 do window count at: 900"
"3) hot observable at: 1200"
"4) hot observable at: 1500"
"5) hot observable at: 1800"
"3,4,5 do window count at: 1800"
"0 merge map at: 1800"
"6) hot observable at: 2100"
"1 merge map at: 2100"
"7) hot observable at: 2400"
"2 merge map at: 2400"
"8) hot observable at: 2700"
"6,7,8 do window count at: 2700"
"9) hot observable at: 3000"
"10) hot observable at: 3300"
"11) hot observable at: 3600"
"9,10,11 do window count at: 3600"
" do window count at: NaN"
"8 merge map at: 4200"
"fin windowCount"

编辑:进一步解释...

windowCount(3) 之后有一个对 concatMap 的调用。 concatMapmapconcatAll 的组合。

concatAll:

Joins every Observable emitted by the source (a higher-order Observable), in a serial fashion. It subscribes to each inner Observable only after the previous inner Observable has completed (emphasis added), and merges all of their values into the returned observable.

因此,查看上面的输出,我们看到第一个 windowCount(3) 值 [0,1,2] 在 1800 和 2400 之间发出。

请注意,第二个 windowCount(3) 值 [3,4,5] 在 1800 发出。concatAll 在发出 [3,4,5] 时尚未准备好订阅,因为 之前的内部 Observable 还没有完成。所以这些值实际上被删除了。

接下来,注意之前的内部 Observable [0,1,2] 在 2400 完成。concatAll 在 2400 订阅。

下一个出现的值是 2700 处的值 8(订阅从 2400 开始后 300 毫秒)。然后值 8 由 mergeMap 在 4200 输出,因为从订阅开始点 2400 开始有 300 的间隔延迟,然后是 1500 的定时器延迟(即 2400 + 300 + 1500 = 4200)。

在这一点之后,序列完成,因此不会发出更多值。

如果需要更多说明,请添加评论。