如何将一个流按顺序拆分为多个流
How to split a stream into multiple streams in sequence
使用 RxJS 6,
我有一个任意数据流:
[in] -> a, b, c, d, e, f, g, h, i, ....
我想以交替顺序将其拆分为固定数量的 N 个流(在本例中为 3 个输出流):
[out] -> stream1 -> a, d, g
-> stream2 -> b, e, h
-> stream3 -> c, f, i
或更简单地说:
a => stream1
b => stream2
c => stream3
d => stream1
e => stream2
f => stream3
g => stream1
h => stream2
i => stream3
有人知道我该怎么做吗?
您可以迭代 N
并在每次迭代时使用 partition
:
将流分成两部分
import { from, merge } from 'rxjs';
import { partition, map } from 'rxjs/operators';
const source = from(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i']);
function split(source, n) {
const streams = [];
let toSplit = source;
for (let k = n; k > 0; --k) {
const [stream, rest] = toSplit.pipe(
partition((_, i) => i % k === 0)
);
streams.push(stream);
toSplit = rest;
}
return streams;
}
const obs = split(source, 3);
const subscribe = merge(
obs[0].pipe(map(val => `1: ${val}`)),
obs[1].pipe(map(val => `2: ${val}`)),
obs[2].pipe(map(val => `3: ${val}`)),
).subscribe(val => console.log(val));
使用 RxJS 6,
我有一个任意数据流:
[in] -> a, b, c, d, e, f, g, h, i, ....
我想以交替顺序将其拆分为固定数量的 N 个流(在本例中为 3 个输出流):
[out] -> stream1 -> a, d, g
-> stream2 -> b, e, h
-> stream3 -> c, f, i
或更简单地说:
a => stream1
b => stream2
c => stream3
d => stream1
e => stream2
f => stream3
g => stream1
h => stream2
i => stream3
有人知道我该怎么做吗?
您可以迭代 N
并在每次迭代时使用 partition
:
import { from, merge } from 'rxjs';
import { partition, map } from 'rxjs/operators';
const source = from(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i']);
function split(source, n) {
const streams = [];
let toSplit = source;
for (let k = n; k > 0; --k) {
const [stream, rest] = toSplit.pipe(
partition((_, i) => i % k === 0)
);
streams.push(stream);
toSplit = rest;
}
return streams;
}
const obs = split(source, 3);
const subscribe = merge(
obs[0].pipe(map(val => `1: ${val}`)),
obs[1].pipe(map(val => `2: ${val}`)),
obs[2].pipe(map(val => `3: ${val}`)),
).subscribe(val => console.log(val));