忽略已取消的 Observable 的中间排放

Ignore intermediate emissions of canceled Observable

用户可以发起搜索,我使用这种方法链接可变数量的 HTTP 请求:

一切都很好,但我想取消旧搜索,如果他们启动新搜索时它仍然是 运行。像这样:

我能找到的最接近的是 switchAll,它执行 discard/cancel A 第 2 部分,但我将 A 第 1 部分混入结果中。注意:仅包括最后完成的部分,因此如果用户在部分 n 期间开始新搜索,则仅包括 n - 1(针对 n > 1)和 n - 2 以及先验被丢弃。

所以我的问题:

我得到的代码有点乱,但这里有一个粗略的简化大纲:

// User fires this event.
startSearch() {
    this.searchSubject(getUsersCriteria());
}

ngOnInit() {
    // The event is transformed into a search request.
    this.searchSubject
        .asObservable()
        .pipe(
            map((criteria) => search(criteria)),
            switchAll(), // FIXME Not sufficient.
            tap((results) => this.setResults(results))
        )
        .subscribe();
}

/**
 * Execute the search and return the results.
 **/
search(criteria): Observable<Result[]> {
    return this.http.get('https://proprietary.com/endpoint', criteria)
        .pipe(flatMap((results) => {
            // Process results. Conditionally continue.
            if (done()) return of(results);
            else return search(criteria.nextCriteria())
                .pipe(map((moreResults) => [...results, ...moreResults]));
        });
}

注意:目前使用 RxJS 6.

不确定这是否有效,但您可以尝试传入额外的参数作为结果集的起始值。 searchSubject 触发时的所有内容,新的空结果集 [] 将被传递到递归链中。

ngOnInit() {
    // The event is transformed into a search request.
    this.searchSubject
        .asObservable()
        .pipe(
            switchMap((criteria) => search(criteria,[])),
            tap((results) => this.setResults(results))
        )
        .subscribe();
}


/**
 * Execute the search and return the results.
 **/
search(criteria,resultSet): Observable<Result[]> {
    return this.http.get('https://proprietary.com/endpoint', criteria)
        .pipe(flatMap((results) => {
            // Process results. Conditionally continue.
            if (done()) return of(resultSet);                
            return search(criteria.nextCriteria(),resultSet.concat(results))   
        });
}

过度热心的 HTTP 拦截器返回了第一次(取消的)搜索以及(错误地)第二次搜索的结果。

仔细检查 HTTP 拦截器。