RxJS - 拆分数组并在达到特定计数时发出
RxJS - split the array and emit when specific count is reached
我是 RxJS 的新手。我有特定的数组,我正在使用 Observable。
[a,b,c,d,e,f,g,h]
我想处理背压,每次处理3个元素。
我想像这样拆分并发出:
[a,b,c] [d,e,f] [g,h]
我正在为此使用 bufferCount(3)。但是 bufferCount 的问题在于它确实会发出最后一个间隔。由于最后一个区间只有2个元素。
这是我的示例代码
from(streamOfArr$).pipe(
flatMap(somefunc()),
bufferCount(3),
tap((x) => {
console.log('3', x);
}),
flatMap(somefunc1()),
如何发出 bufferCount 中的最后一个间隔。
我相信这里还有另一个问题。
如果你看到这个例子:https://stackblitz.com/edit/rxjs-3jwznz
import { from } from 'rxjs';
import { bufferCount } from 'rxjs/operators';
const source = from([1,2,3,4,5,6,7,8]).pipe(
bufferCount(3)
);
source.subscribe(x => console.log(x));
您会看到 bufferCount 发出最后一个间隔(即使最后一个只有 2 个)。
您确定您的 source observable 正在完成吗?
我认为您可能想用并发数检查 mergeAll
而不是 bufferCount
。 mergeAll
不会将数据拆分成块,但它只会在另一个先前的可观察对象完成时订阅下一个可观察对象。只有 n 个 observables 同时是热的。
function obs(n: number): Observable<number> {
return of(n).pipe(
tap(() => console.log('Processing: ', n)),
delay(n * 500),
);
}
const observables$ = [obs(1), obs(2), obs(3), obs(4), obs(5), obs(6), obs(7), obs(8), obs(9), obs(10)];
from(observables$).pipe(
mergeAll(3),
).subscribe(console.log);
另见 stackblitz:https://stackblitz.com/edit/rxjs-rcruhv?file=index.ts
通过这个例子,你可以在控制台中看到,开始时直接打印了3个输出,当订阅输出发生时,每隔一个处理输出出现。
我是 RxJS 的新手。我有特定的数组,我正在使用 Observable。
[a,b,c,d,e,f,g,h]
我想处理背压,每次处理3个元素。
我想像这样拆分并发出:
[a,b,c] [d,e,f] [g,h]
我正在为此使用 bufferCount(3)。但是 bufferCount 的问题在于它确实会发出最后一个间隔。由于最后一个区间只有2个元素。
这是我的示例代码
from(streamOfArr$).pipe(
flatMap(somefunc()),
bufferCount(3),
tap((x) => {
console.log('3', x);
}),
flatMap(somefunc1()),
如何发出 bufferCount 中的最后一个间隔。
我相信这里还有另一个问题。
如果你看到这个例子:https://stackblitz.com/edit/rxjs-3jwznz
import { from } from 'rxjs';
import { bufferCount } from 'rxjs/operators';
const source = from([1,2,3,4,5,6,7,8]).pipe(
bufferCount(3)
);
source.subscribe(x => console.log(x));
您会看到 bufferCount 发出最后一个间隔(即使最后一个只有 2 个)。 您确定您的 source observable 正在完成吗?
我认为您可能想用并发数检查 mergeAll
而不是 bufferCount
。 mergeAll
不会将数据拆分成块,但它只会在另一个先前的可观察对象完成时订阅下一个可观察对象。只有 n 个 observables 同时是热的。
function obs(n: number): Observable<number> {
return of(n).pipe(
tap(() => console.log('Processing: ', n)),
delay(n * 500),
);
}
const observables$ = [obs(1), obs(2), obs(3), obs(4), obs(5), obs(6), obs(7), obs(8), obs(9), obs(10)];
from(observables$).pipe(
mergeAll(3),
).subscribe(console.log);
另见 stackblitz:https://stackblitz.com/edit/rxjs-rcruhv?file=index.ts
通过这个例子,你可以在控制台中看到,开始时直接打印了3个输出,当订阅输出发生时,每隔一个处理输出出现。