从 Observable 使用 startWith 创建 Observable
Creating an Observable using startWith from an Observable
我正在用另一个 Observable
的输入过滤一个 Observable
- 过滤的输入来自用户。
过滤是使用 RxJS 运算符完成的 combineLatest
。使用这意味着当订阅这个流时,在两个源 Observables 都发出发射之前不会发出任何值 - 我希望创建的流在创建时发出(没有任何过滤),然后再进行任何用户输入。
我想我应该使用 startWith
运算符,以便流在创建时产生发射,但我不知道如何从 Observable 中播种它。使用 Observable,因为数据来自 Firebase 并使用 FirebaseListObservable
处理。
下面是我目前正在做的事情的拼凑版本。
let tagInput = document.getElementById('tags');
let tagExclusionStream = Observable
.fromEvent(tagInput, 'input')
.map((e: any) => createsArrayFromInput(e.target.value));
let allTags: Observable<any[]> = getAllTags();
let filteredTags = allTags
.combineLatest(tagExclusionStream, (tags, tagExclusions) => {
return tags.filter((tag: any) => tagExclusions.indexOf(tag.$key) == -1)
});
// I want this to print out without needing the tagExclusionStream to emit first
filteredTags.subscribe(tags => console.log("Tags:", tags))
请告诉我我的方法是否完全off/there是更好的方法,因为我是 RxJS 的新手。
我认为这可以解决问题:
let filteredTags = allTags
.combineLatest(tagExclusionStream.startWith(''), (tags, tagExclusions) => {
return tags.filter((tag: any) => tagExclusions.indexOf(tag.$key) == -1)
});
或者,如果您在不同的地方使用 tagExclusionStream
,您可以这样做:
let tagExclusionStream = Observable
.fromEvent(tagInput, 'input')
.map((e: any) => createsArrayFromInput(e.target.value))
.startWith('');
startWith
只是在使用 concat
internally。
也一样
const startValue$ = of('start with this')
concat(startValue$, source$)
或者自己编写运算符
function startFrom<T, O extends ObservableInput<any>>(start: O): OperatorFunction<T, T | ObservedValueOf<O>> {
return (source: Observable<T>) => concat(start, source)
}
source$.pipe(
startFrom(startValue$)
)
确保startValue$
完成
一旦 startValue$
完成,新的 Observable 将切换到 source$
。
因此,如果开始的 Observable 是一个长期存在的 Observable,例如ReplaySubject
你只关心最近的值,你必须确保你传入的 Observable 完成,例如take(1)
.
const { ReplaySubject, concat, of } = rxjs;
const { take } = rxjs.operators;
const source$ = of('source-1', 'source-2')
const replay = new ReplaySubject(1)
replay.next('subject-1')
replay.next('subject-2')
const startWith$ = replay.pipe(take(1)) // <-- take one item and complete
const o$ = concat(startWith$, source$)
o$.subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.5/rxjs.umd.min.js" integrity="sha256-85uCh8dPb35sH3WK435rtYUKALbcvEQFC65eg+raeuc=" crossorigin="anonymous"></script>
我正在用另一个 Observable
的输入过滤一个 Observable
- 过滤的输入来自用户。
过滤是使用 RxJS 运算符完成的 combineLatest
。使用这意味着当订阅这个流时,在两个源 Observables 都发出发射之前不会发出任何值 - 我希望创建的流在创建时发出(没有任何过滤),然后再进行任何用户输入。
我想我应该使用 startWith
运算符,以便流在创建时产生发射,但我不知道如何从 Observable 中播种它。使用 Observable,因为数据来自 Firebase 并使用 FirebaseListObservable
处理。
下面是我目前正在做的事情的拼凑版本。
let tagInput = document.getElementById('tags');
let tagExclusionStream = Observable
.fromEvent(tagInput, 'input')
.map((e: any) => createsArrayFromInput(e.target.value));
let allTags: Observable<any[]> = getAllTags();
let filteredTags = allTags
.combineLatest(tagExclusionStream, (tags, tagExclusions) => {
return tags.filter((tag: any) => tagExclusions.indexOf(tag.$key) == -1)
});
// I want this to print out without needing the tagExclusionStream to emit first
filteredTags.subscribe(tags => console.log("Tags:", tags))
请告诉我我的方法是否完全off/there是更好的方法,因为我是 RxJS 的新手。
我认为这可以解决问题:
let filteredTags = allTags
.combineLatest(tagExclusionStream.startWith(''), (tags, tagExclusions) => {
return tags.filter((tag: any) => tagExclusions.indexOf(tag.$key) == -1)
});
或者,如果您在不同的地方使用 tagExclusionStream
,您可以这样做:
let tagExclusionStream = Observable
.fromEvent(tagInput, 'input')
.map((e: any) => createsArrayFromInput(e.target.value))
.startWith('');
startWith
只是在使用 concat
internally。
也一样
const startValue$ = of('start with this')
concat(startValue$, source$)
或者自己编写运算符
function startFrom<T, O extends ObservableInput<any>>(start: O): OperatorFunction<T, T | ObservedValueOf<O>> {
return (source: Observable<T>) => concat(start, source)
}
source$.pipe(
startFrom(startValue$)
)
确保startValue$
完成
一旦 startValue$
完成,新的 Observable 将切换到 source$
。
因此,如果开始的 Observable 是一个长期存在的 Observable,例如ReplaySubject
你只关心最近的值,你必须确保你传入的 Observable 完成,例如take(1)
.
const { ReplaySubject, concat, of } = rxjs;
const { take } = rxjs.operators;
const source$ = of('source-1', 'source-2')
const replay = new ReplaySubject(1)
replay.next('subject-1')
replay.next('subject-2')
const startWith$ = replay.pipe(take(1)) // <-- take one item and complete
const o$ = concat(startWith$, source$)
o$.subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.5/rxjs.umd.min.js" integrity="sha256-85uCh8dPb35sH3WK435rtYUKALbcvEQFC65eg+raeuc=" crossorigin="anonymous"></script>