可观察的 forkJoin 未触发
Observable forkJoin not firing
我正在尝试在两个 Observable 上使用 forkJoin
。其中一个以流的形式开始...如果我直接订阅它们,我会收到响应,但 forkJoin
没有触发。有什么想法吗?
private data$: Observable<any[]>;
private statuses$: Observable<any[]>;
private queryStream = new Subject<string>();
....
this.data$ = this.queryStream
.startWith('')
.flatMap(queryInput => {
this.query = queryInput
return this._companyService.getCompanies(this.queryRequired + ' ' + this.query, this.page, this.sort);
})
.share();
...
Observable.forkJoin(this.statuses$, this.companies$)
.subscribe(res => {
console.log('forkjoin');
this._countStatus(res[0], res[1]);
});
// This shows arrays in the console...
this.statuses$.subscribe(res => console.log(res));
this.companies$.subscribe(res => console.log(res));
// In the console
Array[9]
Array[6]
forkJoin
的一个非常常见的问题是它要求所有源 Observable 至少发出一项并且所有这些都必须完成。
换句话说,如果 this.statuses$
或 this.companies$
不发射任何物品并且在它们都完成之前 forkJoin
不会发射任何东西。
this.statuses$.subscribe(
res => console.log(res),
undefined,
() => console.log('completed'),
);
Observable.forkJoin([
_someService.getUsers(),
_someService.getCustomers(),
])
.subscribe((data: [Array<User>, Array<Customer>]) => {
let users: Array<User> = data[0];
let customer: Array<Customer> = data[1];
}, err => {
});
//someService
getUsers():Observable<User> {
let url = '/users';
return this._http.get(url, headers)
.map(res => res.json());
}
getCustomers():Observable<Customer> {
let url = '/customers';
return this._http.get(url, headers)
.map(res => res.json());
}
forkJoin
仅在所有内部可观察对象完成时发出。
如果你需要一个相当于 forkJoin
的只听来自每个来源的单一发射,使用 combineLatest
+ take(1)
combineLatest(
this.statuses$,
this.companies$,
)
.pipe(
take(1),
)
.subscribe(([statuses, companies]) => {
console.log('forkjoin');
this._countStatus(statuses, companies);
});
只要两个来源都发出,combineLatest
就会发出,take(1)
会立即取消订阅。
forkJoin
没有用,所以我使用下面的代码来解决我的问题。使用mergeMap
可以将外层订阅的结果映射到内层订阅,随心所欲地订阅
this.statuses$.pipe(
mergeMap(source => this.companies$.pipe(
map(inner => [source , inner])
)
)
).subscribe(([e , r]) => {
console.log(e , r);
})
对我来说,combineLatest
运算符就是解决方案!
将 .pipe(take(1))
附加为 asObservable()
类可观察对象的管道即可。
forkJoin({
l0: this._svc.data$.pipe(take(1)),
l1: this._api.getLogman1(),
l2: this._api.getLogman2(),
l3: this._api.getLogman3(),
})
.pipe(
takeUntil(this._unsubscribeAll),
)
.subscribe(x => {
console.log(x);
});
我正在尝试在两个 Observable 上使用 forkJoin
。其中一个以流的形式开始...如果我直接订阅它们,我会收到响应,但 forkJoin
没有触发。有什么想法吗?
private data$: Observable<any[]>;
private statuses$: Observable<any[]>;
private queryStream = new Subject<string>();
....
this.data$ = this.queryStream
.startWith('')
.flatMap(queryInput => {
this.query = queryInput
return this._companyService.getCompanies(this.queryRequired + ' ' + this.query, this.page, this.sort);
})
.share();
...
Observable.forkJoin(this.statuses$, this.companies$)
.subscribe(res => {
console.log('forkjoin');
this._countStatus(res[0], res[1]);
});
// This shows arrays in the console...
this.statuses$.subscribe(res => console.log(res));
this.companies$.subscribe(res => console.log(res));
// In the console
Array[9]
Array[6]
forkJoin
的一个非常常见的问题是它要求所有源 Observable 至少发出一项并且所有这些都必须完成。
换句话说,如果 this.statuses$
或 this.companies$
不发射任何物品并且在它们都完成之前 forkJoin
不会发射任何东西。
this.statuses$.subscribe(
res => console.log(res),
undefined,
() => console.log('completed'),
);
Observable.forkJoin([
_someService.getUsers(),
_someService.getCustomers(),
])
.subscribe((data: [Array<User>, Array<Customer>]) => {
let users: Array<User> = data[0];
let customer: Array<Customer> = data[1];
}, err => {
});
//someService
getUsers():Observable<User> {
let url = '/users';
return this._http.get(url, headers)
.map(res => res.json());
}
getCustomers():Observable<Customer> {
let url = '/customers';
return this._http.get(url, headers)
.map(res => res.json());
}
forkJoin
仅在所有内部可观察对象完成时发出。
如果你需要一个相当于 forkJoin
的只听来自每个来源的单一发射,使用 combineLatest
+ take(1)
combineLatest(
this.statuses$,
this.companies$,
)
.pipe(
take(1),
)
.subscribe(([statuses, companies]) => {
console.log('forkjoin');
this._countStatus(statuses, companies);
});
只要两个来源都发出,combineLatest
就会发出,take(1)
会立即取消订阅。
forkJoin
没有用,所以我使用下面的代码来解决我的问题。使用mergeMap
可以将外层订阅的结果映射到内层订阅,随心所欲地订阅
this.statuses$.pipe(
mergeMap(source => this.companies$.pipe(
map(inner => [source , inner])
)
)
).subscribe(([e , r]) => {
console.log(e , r);
})
对我来说,combineLatest
运算符就是解决方案!
将 .pipe(take(1))
附加为 asObservable()
类可观察对象的管道即可。
forkJoin({
l0: this._svc.data$.pipe(take(1)),
l1: this._api.getLogman1(),
l2: this._api.getLogman2(),
l3: this._api.getLogman3(),
})
.pipe(
takeUntil(this._unsubscribeAll),
)
.subscribe(x => {
console.log(x);
});