RxJS - 拆分数组并在达到特定计数时发出

RxJS - split the array and emit when specific count is reached

我是 RxJS 的新手。我有特定的数组,我正在使用 Observable。

[a,b,c,d,e,f,g,h]

我想处理背压,每次处理3个元素。

我想像这样拆分并发出:

[a,b,c] [d,e,f] [g,h]

我正在为此使用 bufferCount(3)。但是 bufferCount 的问题在于它确实会发出最后一个间隔。由于最后一个区间只有2个元素。

这是我的示例代码

from(streamOfArr$).pipe(
         flatMap(somefunc()),
         bufferCount(3),
         tap((x) => {
                    console.log('3', x);
                }),
         flatMap(somefunc1()),

如何发出 bufferCount 中的最后一个间隔。

我相信这里还有另一个问题。

如果你看到这个例子:https://stackblitz.com/edit/rxjs-3jwznz

import { from } from 'rxjs'; 
import { bufferCount } from 'rxjs/operators';


const source = from([1,2,3,4,5,6,7,8]).pipe(
  bufferCount(3)
);

source.subscribe(x => console.log(x));

您会看到 bufferCount 发出最后一个间隔(即使最后一个只有 2 个)。 您确定您的 source observable 正在完成吗?

我认为您可能想用并发数检查 mergeAll 而不是 bufferCountmergeAll 不会将数据拆分成块,但它只会在另一个先前的可观察对象完成时订阅下一个可观察对象。只有 n 个 observables 同时是热的。

function obs(n: number): Observable<number> {
  return of(n).pipe(
    tap(() => console.log('Processing: ', n)),
    delay(n * 500),
  );
}

const observables$ = [obs(1), obs(2), obs(3), obs(4), obs(5), obs(6), obs(7), obs(8), obs(9), obs(10)];

from(observables$).pipe(
  mergeAll(3),
).subscribe(console.log);

另见 stackblitz:https://stackblitz.com/edit/rxjs-rcruhv?file=index.ts

通过这个例子,你可以在控制台中看到,开始时直接打印了3个输出,当订阅输出发生时,每隔一个处理输出出现。