如果未发出某个值,n 秒后超时

Timeout after n-seconds if a certain value is not emitted

我正在使用 angularfire2 将 Firebase 推送到状态为 NEW 的对象上。我的后端正在监听该列表上的写入,并将对状态为 NEW 的每个新请求采取行动。 我想处理 3 种可能的结果:成功、错误和超时。

add(book: Book) {
        return this.authentication.user.pipe(
            take(1),
            switchMap(user => {
                // Set owner for the backend to handle correctly
                book.setOwner(user.uid);

                // Add book request
                const queueRef = this._afqueue.list(this.ADD_BOOK_QUEUE_PATH);
                const pushPromise = queueRef.push({ status: { code: 'NEW' }, ...book })
                    .then(ref => {
                        console.log('Request to add a new book added to queue.');
                        return ref;
                    }) as Promise<any>;
                return from(pushPromise);
            }),
            switchMap(ref => {
                return this._afqueue.object(this.ADD_BOOK_QUEUE_PATH + '/' + ref.key)
                    .valueChanges();
            }),
            map(snap => snap['status']),
            filter(status => status['code'] === 'SUCCESS' || status['code'] === 'ERROR'),
            switchMap(status => {
                if (status['code'] === 'SUCCESS') {
                    return Observable.create(function(observer) {
                        observer.complete(status['book_id']);
                      });
                    //return status['book_id'];
                }
                else if (status['code'] === 'ERROR') {
                    throw(status['error']);
                }
            }),
            timeout(60000), // timeout after 60 secondes
        );
    }

无论我收到 ERROR 还是 SUCCESS,都会发生超时。只有当我在 60 秒后收到 none 时,我怎么能超时?

我这样重写了,效果不错:

add(book: Book) {
        return this.authentication.user.pipe(
            take(1),
            mergeMap(user => {
                // Set owner for the backend to handle correctly
                book.setOwner(user.uid);

                // Add book request
                const queueRef = this._afqueue.list(this.ADD_BOOK_QUEUE_PATH);
                const pushPromise = queueRef.push({ status: { code: 'NEW' }, ...book })
                    .then(ref => {
                        console.log('Request to add a new book added to queue.');
                        return ref;
                    }) as Promise<any>;
                return from(pushPromise);
            }),
            mergeMap(ref => {
                return this._afqueue.object(this.ADD_BOOK_QUEUE_PATH + '/' + ref.key)
                    .valueChanges();
            }),
            map(snap => snap['status']),
            filter(status => (status['code'] === 'SUCCESS' || status['code'] === 'ERROR')),
            first(),
            timeout(60000), // timeout after 60 secondes
            mergeMap(status => {
                console.log(status);
                console.log(status['code'])
                if (status['code'] === 'SUCCESS') {
                    return of(status['book_id']);
                    //return status['book_id'];
                }
                else if (status['code'] === 'ERROR') {
                    throwError(status['error']);
                }
            })
        );
    }