RxJS5 => 订阅者的 onCompleted 回调没有触发

RxJS5 => subscriber's onCompleted callback not firing

我在队列中有这两种方法。我已经实现了某种形式的背压,由此从方法创建的可观察对象只会在用户触发回调时触发事件,所有这些都是通过可观察对象进行的。问题是我无法让 onCompleted 处理程序在 drain() 的主订阅者中触发。令我惊讶的是 onNext 会为同一个订阅者触发,那么为什么 onCompleted 不会触发呢?我认为在 takeUntil 调用和订阅者中的 onCompleted 处理程序将触发的重手 $obs.complete() 之间...

Queue.prototype.isEmpty = function (obs) {

    if (!obs) {
        // this is just a dummy observable
        // I wish Rx had Rx.Observable.dummy() alongside
        // Rx.Observable.empty(), but oh well
        obs = Rx.Observable.of('dummy');
    }

    return this.init()
        .flatMap(() => {
            return obs; // when you call obs.next(), it should fire this chain again
        })
        .flatMap(() => {
            return acquireLock(this)
                .flatMap(obj => {
                    return acquireLockRetry(this, obj)
                })
        })
        .flatMap(obj => {
            return findFirstLine(this)
                .flatMap(l => {
                    return releaseLock(this, obj.id)
                        .map(() => {
                            console.log(' => LLLL1 => ', l);
                            return l;
                        });
                });
        })
        .filter(l => {
            // filter out any lines => only fire event if there is no line

            return !l;
        })
        .map(() => {
            //  the queue is now empty
            obs.complete(); // <<<<<<<<<< note this call
            return {isEmpty: true}
        });


};


Queue.prototype.drain = function (obs, opts) {

    opts = opts || {};

    const isConnect = opts.isConnect || false;
    const delay = opts.delay || 500;

    let $obs = obs.takeUntil(this.isEmpty(obs))
        .flatMap(() => {
            return this.init();
        })
        .flatMap(() => {
            return acquireLock(this)
                .flatMap(obj => {
                    return acquireLockRetry(this, obj)
                });
        })
        .flatMap(obj => {
            return removeOneLine(this)
                .flatMap(l => {
                    return releaseLock(this, obj.id)
                        .map(() => l);
                });
        });


    process.nextTick(function(){
        obs.next('foo foo foo');
        $obs.next('bar bar bar');
        $obs.complete();
    });


    return $obs;

};

是什么导致了绝对的疯狂,当我像这样调用上面的内容时,我无法触发 onCompleted 回调:

const q = new Queue();

const obs = new Rx.Subject();

q.drain(obs).subscribe(

    function (v) {

        console.log('end result => ', colors.yellow(util.inspect(v)));

        setTimeout(function () {
            // the following call serves as the callback which will fire the observables in the methods again
            obs.next();
        }, 100);

    },
    function (e) {
        console.log('on error => ', e);
    },
    function (c) {
        // this never gets called and it is driving me f*cking crazy
        console.log(colors.red(' DRAIN on completed => '), c);
    }

);

obs.subscribe(
    function (v) {
        console.log('next item that was drained => ', v);
    },
    function (e) {
        console.log('on error => ', e);
    },
    function (c) {
        // this gets called!
        console.log(colors.red(' => obs on completed => '), c);
    }
);

当我调用上面的方法时,我得到了这个:

next item that was drained =>  foo foo foo
next item that was drained =>  bar bar bar
 => obs on completed =>  undefined

我只得到这 3 行的原因是因为我这样做了:

process.nextTick(function(){
    obs.next('foo foo foo');
    $obs.next('bar bar bar');
    $obs.complete();
}); 

但是为什么不会显式调用$obs.complete();触发这个回调:

 function (c) {
            // this never gets called and it is driving me f*cking crazy
            console.log(colors.red(' DRAIN on completed => '), c);
        }

?

好吧,我想我明白了,这个 RxJS 库多么疯狂

最有可能做正确的事情,你应该使用 take() 或 takeUntil() 或类似的方法

所以我这样做了:

Queue.prototype.drain = function (obs, opts) {

    if (!(obs instanceof Rx.Observable)) {
        opts = obs || {};
        obs = new Rx.Subject();
    }
    else {
        opts = opts || {};
    }


    const isConnect = opts.isConnect || false;
    const delay = opts.delay || 500;

    process.nextTick(function () {
        obs.next();
    });


    let $obs = obs
        .flatMap(() => {
            return this.init();
        })
        .flatMap(() => {
            return acquireLock(this)
                .flatMap(obj => {
                    return acquireLockRetry(this, obj)
                });
        })
        .flatMap(obj => {
            return removeOneLine(this)
                .flatMap(l => {
                    return releaseLock(this, obj.id)
                        .map(() => ({data: l, cb: obs.next.bind(obs)}));
                });
        })
        //  here is the key part!
        .takeUntil(this.isEmpty(obs));


    return $obs;

};

这似乎成功了。有一段时间我很绝望。如果您想进一步了解这是如何工作的,请在内部查询。