RxJS5 groupBy 从第一组中删除第一项?
RxJS5 groupBy dropping the first item from first group?
我很难理解为什么这段代码会删除第一个元素?
const Rx = require("@reactivex/rxjs")
const requestQueue = Rx.Observable.from([
{type: "show.json", value: "1"},
{type: "tweets.json", value: "A"},
{type: "show.json", value: "2"},
{type: "tweets.json", value: "B"}
])
requestQueue.groupBy(request => request.type).bufferCount(2).mergeMap(requestSubstreams => {
return requestSubstreams[0].merge(requestSubstreams[1])
}).subscribe(x => console.log(x))
我得到的输出是:
{ type: 'tweets.json', value: 'A' }
{ type: 'show.json', value: '2' }
{ type: 'tweets.json', value: 'B' }
做了一些实验,它总是第一个被丢弃的元素。
我必须求助于此代码(使用分区)才能获得我期望的结果:
const requestQueue = Rx.Observable.from([
{type: "show.json", value: "1"},
{type: "tweets.json", value: "A"},
{type: "show.json", value: "2"},
{type: "tweets.json", value: "B"}
])
const requestSubstreams = requestQueue.partition(request => request.type == "show.json")
const showJsonSubstream = requestSubstreams[0]
const tweetsJsonSubstream = requestSubstreams[1]
showJsonSubstream.merge(tweetsJsonSubstream).subscribe(x => console.log(x))
你能帮忙指出 groupBy 的问题/注意事项吗?
致@cartant:它与此代码有关(在此代码中),字母和数字来自不同的可观察对象。
const letterArr = ["A", "B", "C", "D", "E", "F", "G", "H", "I"]
const letterStream = Rx.Observable.zip(
Rx.Observable.from(letterArr),
Rx.Observable.range(0, letterArr.length - 1),
(letter, index) => Rx.Observable.of({letter, index}).delay(
index == 0 ? 0 : index < 5 ? (index == 2 ? 2000 : 5000) : 20000000
)
).concatAll().multicast(new Rx.Subject())
//A emitted at 0s, B at 5s, C at 7s, D at 12s, E at 17s, F at ... never
const numberStream = Rx.Observable.zip(
Rx.Observable.range(1, 20),
Rx.Observable.interval(2000).startWith(-1).map(x => x +1),
(number, index) => ({number, index})
).multicast(new Rx.Subject())
//1 emitted at 0s, 2 at 2s, 3 at 4s, ...
const slowedDownNumberStream = Rx.Observable.zip(
numberStream,
Rx.Observable.zip(
letterStream,
Rx.Observable.interval(10000).startWith(-1).map(x => x +1)
).timeoutWith(11000, Rx.Observable.interval(2000).startWith(-1)),
(number, index) => ({number, index})
)
Rx.Observable.merge(letterStream, slowedDownNumberStream).subscribe(x => console.log(new Date(), " > ", x))
letterStream.connect()
numberStream.connect()
你看,我在slowedDownNumberStream中使用了letterStream。
我现在的情况是,我只有一个流作为输入(其中字母和数字混合在一起),所以我需要使用 groupBy... 对它们进行分区,并且由于我需要将两者都放在 mergeMap 中,所以我把groupBy 和 mergeMap 之间的 bufferCount。
我很难理解为什么这段代码会删除第一个元素?
const Rx = require("@reactivex/rxjs")
const requestQueue = Rx.Observable.from([
{type: "show.json", value: "1"},
{type: "tweets.json", value: "A"},
{type: "show.json", value: "2"},
{type: "tweets.json", value: "B"}
])
requestQueue.groupBy(request => request.type).bufferCount(2).mergeMap(requestSubstreams => {
return requestSubstreams[0].merge(requestSubstreams[1])
}).subscribe(x => console.log(x))
我得到的输出是:
{ type: 'tweets.json', value: 'A' }
{ type: 'show.json', value: '2' }
{ type: 'tweets.json', value: 'B' }
做了一些实验,它总是第一个被丢弃的元素。
我必须求助于此代码(使用分区)才能获得我期望的结果:
const requestQueue = Rx.Observable.from([
{type: "show.json", value: "1"},
{type: "tweets.json", value: "A"},
{type: "show.json", value: "2"},
{type: "tweets.json", value: "B"}
])
const requestSubstreams = requestQueue.partition(request => request.type == "show.json")
const showJsonSubstream = requestSubstreams[0]
const tweetsJsonSubstream = requestSubstreams[1]
showJsonSubstream.merge(tweetsJsonSubstream).subscribe(x => console.log(x))
你能帮忙指出 groupBy 的问题/注意事项吗?
致@cartant:它与此代码有关(在此代码中),字母和数字来自不同的可观察对象。
const letterArr = ["A", "B", "C", "D", "E", "F", "G", "H", "I"]
const letterStream = Rx.Observable.zip(
Rx.Observable.from(letterArr),
Rx.Observable.range(0, letterArr.length - 1),
(letter, index) => Rx.Observable.of({letter, index}).delay(
index == 0 ? 0 : index < 5 ? (index == 2 ? 2000 : 5000) : 20000000
)
).concatAll().multicast(new Rx.Subject())
//A emitted at 0s, B at 5s, C at 7s, D at 12s, E at 17s, F at ... never
const numberStream = Rx.Observable.zip(
Rx.Observable.range(1, 20),
Rx.Observable.interval(2000).startWith(-1).map(x => x +1),
(number, index) => ({number, index})
).multicast(new Rx.Subject())
//1 emitted at 0s, 2 at 2s, 3 at 4s, ...
const slowedDownNumberStream = Rx.Observable.zip(
numberStream,
Rx.Observable.zip(
letterStream,
Rx.Observable.interval(10000).startWith(-1).map(x => x +1)
).timeoutWith(11000, Rx.Observable.interval(2000).startWith(-1)),
(number, index) => ({number, index})
)
Rx.Observable.merge(letterStream, slowedDownNumberStream).subscribe(x => console.log(new Date(), " > ", x))
letterStream.connect()
numberStream.connect()
你看,我在slowedDownNumberStream中使用了letterStream。
我现在的情况是,我只有一个流作为输入(其中字母和数字混合在一起),所以我需要使用 groupBy... 对它们进行分区,并且由于我需要将两者都放在 mergeMap 中,所以我把groupBy 和 mergeMap 之间的 bufferCount。