RxJava / RxJs:如何合并两个源可观察对象但在其中一个完成后立即完成
RxJava / RxJs: How to merge two source observables but complete as soon as one of them completes
我有两个源可观察量。
我想合并两个源 observable,但是合并的 observable 应该在其中一个源 observable 完成后立即完成。
期望的行为:
Source 1: ---1--------3--4-----------------------------x
Source 2: -------2----------x
"merged" ---1---2----3--4--x
如果其中一个来源出现错误,错误应该传播到合并的可观察对象:
Source 1: ---1--------3--4-----------------------------x
Source 2: -------2----------e
"merged" ---1---2----3--4--ex
"merge" 运算符仅在两个源都已完成时才完成合并流:
Source 1: ---1--------3--4-----------------------------x
Source 2: -------2----------x
"merged" ---1---2----3--4-----------------------------x
我怎样才能达到我想要的行为?
我当然希望其他人用更优雅的方法来回答,但这很有效。
我认为您必须使用 take
运算符之一。您可以在一个源完成时完成所有源,如下所示:
const a = Rx.Observable.interval(1000).take(3).map(x => `a${x}`);
const b = Rx.Observable.interval(800).take(6).map(x => `b${x}`);
Rx.Observable.merge(a.takeUntil(b.last()), b.takeUntil(a.last()))
.subscribe(
x => { console.log('next', x); },
null,
() => { console.log('complete'); }
);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.5/Rx.min.js"></script>
或者可读性较差但可扩展性更强的版本:
function merge(...obs) {
return Rx.Observable.merge(...obs.map(x => x.takeUntil(Rx.Observable.race(obs.filter(y => y !== x).map(z => z.last())))));
}
const a = Rx.Observable.interval(1000).take(3).map(x => `a${x}`);
const b = Rx.Observable.interval(800).take(6).map(x => `b${x}`);
merge(a, b)
.subscribe(
x => { console.log('next', x); },
null,
() => { console.log('complete'); }
);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.5/Rx.min.js"></script>
这是错误传播的示例:
function merge(...obs) {
return Rx.Observable.merge(...obs.map(x => x.takeUntil(Rx.Observable.race(obs.filter(y => y !== x).map(z => z.last())))));
}
const a = Rx.Observable.interval(1000).take(3).map(x => `a${x}`);
const b = Rx.Observable.interval(800).take(6).map(x => `b${x}`);
const c = Rx.Observable.timer(2200).map(x => { throw 'oops!'; });
merge(a, b, c)
.subscribe(
x => { console.log('next', x); },
x => { console.log('error', x); },
() => { console.log('complete'); }
);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.5/Rx.min.js"></script>
在合并的外部使用 takeUntil
很棘手,因为您会丢失最后发出的值。
您需要处理元数据,即有关每个可观察对象的信息。为此,请在每个流上使用 materialize()
运算符,并在合并流上使用 dematerialize()
以实际发出数据。
Observable.merge( observableA.materialize(),
observableB.materialize() )
.takeWhile( notification -> notification.hasValue() )
.dematerialize()
.subscribe( ... );
这将合并两个可观察对象,直到其中一个完成或发出错误。
当一个 observable 完成时,它不会发出值,但我们可以 concat
它与另一个发出单个值的 'signal' observable。然后我们可以使用 takeWhile
运算符观察 'signal' observable 的值。
当然你必须确保 'signal' observable 的发射值不是被合并的 observable 发射的值 - 如果 takeWhile
谓词通过引用进行比较。
这是一个例子:
const obs1$ = Rx.Observable.interval(1000)
.map(x => `obs1: ${x}`)
.take(5);
const obs2$ = Rx.Observable.interval(300)
.map(x => `obs2: ${x}`)
.take(9);
const signalFinishMessage = {};
const signalFinish$ = Rx.Observable.of(signalFinishMessage);
Rx.Observable.merge(obs1$.concat(signalFinish$), obs2$.concat(signalFinish$))
.takeWhile(x => x !== signalFinishMessage)
.subscribe(
x => console.log(x),
err => console.log('received error:', err),
() => console.log('complete')
);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.5/Rx.min.js"></script>
错误也会传播:
const obs1$ = Rx.Observable.interval(1000)
.map(x => `obs1: ${x}`)
.take(5);
const obs2$ = Rx.Observable.interval(300)
.map(x => `obs2: ${x}`)
.take(9)
.concat(Rx.Observable.throw(`the world's about to end`));
const signalFinishMessage = {};
const signalFinish$ = Rx.Observable.of(signalFinishMessage);
Rx.Observable.merge(obs1$.concat(signalFinish$), obs2$.concat(signalFinish$))
.takeWhile(x => x !== signalFinishMessage)
.subscribe(
x => console.log(x),
err => console.log('received error:', err),
() => console.log('complete')
);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.5/Rx.min.js"></script>
我最后自己动手了:
import { Observable } from 'rxjs';
export function whileAll<T>(...observables: Observable<T>[]): Observable<T> {
return new Observable<T>(function (observer) {
if (observables.length === 0)
observer.complete();
else {
const next = observer.next.bind(observer);
const error = observer.error.bind(observer);
const complete = observer.complete.bind(observer);
for (let i = 0; i < observables.length; i++)
observer.add(observables[i].subscribe(next, error, complete));
}
});
}
我有两个源可观察量。 我想合并两个源 observable,但是合并的 observable 应该在其中一个源 observable 完成后立即完成。
期望的行为:
Source 1: ---1--------3--4-----------------------------x
Source 2: -------2----------x
"merged" ---1---2----3--4--x
如果其中一个来源出现错误,错误应该传播到合并的可观察对象:
Source 1: ---1--------3--4-----------------------------x
Source 2: -------2----------e
"merged" ---1---2----3--4--ex
"merge" 运算符仅在两个源都已完成时才完成合并流:
Source 1: ---1--------3--4-----------------------------x
Source 2: -------2----------x
"merged" ---1---2----3--4-----------------------------x
我怎样才能达到我想要的行为?
我当然希望其他人用更优雅的方法来回答,但这很有效。
我认为您必须使用 take
运算符之一。您可以在一个源完成时完成所有源,如下所示:
const a = Rx.Observable.interval(1000).take(3).map(x => `a${x}`);
const b = Rx.Observable.interval(800).take(6).map(x => `b${x}`);
Rx.Observable.merge(a.takeUntil(b.last()), b.takeUntil(a.last()))
.subscribe(
x => { console.log('next', x); },
null,
() => { console.log('complete'); }
);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.5/Rx.min.js"></script>
或者可读性较差但可扩展性更强的版本:
function merge(...obs) {
return Rx.Observable.merge(...obs.map(x => x.takeUntil(Rx.Observable.race(obs.filter(y => y !== x).map(z => z.last())))));
}
const a = Rx.Observable.interval(1000).take(3).map(x => `a${x}`);
const b = Rx.Observable.interval(800).take(6).map(x => `b${x}`);
merge(a, b)
.subscribe(
x => { console.log('next', x); },
null,
() => { console.log('complete'); }
);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.5/Rx.min.js"></script>
这是错误传播的示例:
function merge(...obs) {
return Rx.Observable.merge(...obs.map(x => x.takeUntil(Rx.Observable.race(obs.filter(y => y !== x).map(z => z.last())))));
}
const a = Rx.Observable.interval(1000).take(3).map(x => `a${x}`);
const b = Rx.Observable.interval(800).take(6).map(x => `b${x}`);
const c = Rx.Observable.timer(2200).map(x => { throw 'oops!'; });
merge(a, b, c)
.subscribe(
x => { console.log('next', x); },
x => { console.log('error', x); },
() => { console.log('complete'); }
);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.5/Rx.min.js"></script>
在合并的外部使用 takeUntil
很棘手,因为您会丢失最后发出的值。
您需要处理元数据,即有关每个可观察对象的信息。为此,请在每个流上使用 materialize()
运算符,并在合并流上使用 dematerialize()
以实际发出数据。
Observable.merge( observableA.materialize(),
observableB.materialize() )
.takeWhile( notification -> notification.hasValue() )
.dematerialize()
.subscribe( ... );
这将合并两个可观察对象,直到其中一个完成或发出错误。
当一个 observable 完成时,它不会发出值,但我们可以 concat
它与另一个发出单个值的 'signal' observable。然后我们可以使用 takeWhile
运算符观察 'signal' observable 的值。
当然你必须确保 'signal' observable 的发射值不是被合并的 observable 发射的值 - 如果 takeWhile
谓词通过引用进行比较。
这是一个例子:
const obs1$ = Rx.Observable.interval(1000)
.map(x => `obs1: ${x}`)
.take(5);
const obs2$ = Rx.Observable.interval(300)
.map(x => `obs2: ${x}`)
.take(9);
const signalFinishMessage = {};
const signalFinish$ = Rx.Observable.of(signalFinishMessage);
Rx.Observable.merge(obs1$.concat(signalFinish$), obs2$.concat(signalFinish$))
.takeWhile(x => x !== signalFinishMessage)
.subscribe(
x => console.log(x),
err => console.log('received error:', err),
() => console.log('complete')
);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.5/Rx.min.js"></script>
错误也会传播:
const obs1$ = Rx.Observable.interval(1000)
.map(x => `obs1: ${x}`)
.take(5);
const obs2$ = Rx.Observable.interval(300)
.map(x => `obs2: ${x}`)
.take(9)
.concat(Rx.Observable.throw(`the world's about to end`));
const signalFinishMessage = {};
const signalFinish$ = Rx.Observable.of(signalFinishMessage);
Rx.Observable.merge(obs1$.concat(signalFinish$), obs2$.concat(signalFinish$))
.takeWhile(x => x !== signalFinishMessage)
.subscribe(
x => console.log(x),
err => console.log('received error:', err),
() => console.log('complete')
);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.5/Rx.min.js"></script>
我最后自己动手了:
import { Observable } from 'rxjs';
export function whileAll<T>(...observables: Observable<T>[]): Observable<T> {
return new Observable<T>(function (observer) {
if (observables.length === 0)
observer.complete();
else {
const next = observer.next.bind(observer);
const error = observer.error.bind(observer);
const complete = observer.complete.bind(observer);
for (let i = 0; i < observables.length; i++)
observer.add(observables[i].subscribe(next, error, complete));
}
});
}