RxJS5 => 订阅者的 onCompleted 回调没有触发
RxJS5 => subscriber's onCompleted callback not firing
我在队列中有这两种方法。我已经实现了某种形式的背压,由此从方法创建的可观察对象只会在用户触发回调时触发事件,所有这些都是通过可观察对象进行的。问题是我无法让 onCompleted 处理程序在 drain()
的主订阅者中触发。令我惊讶的是 onNext 会为同一个订阅者触发,那么为什么 onCompleted 不会触发呢?我认为在 takeUntil 调用和订阅者中的 onCompleted 处理程序将触发的重手 $obs.complete() 之间...
Queue.prototype.isEmpty = function (obs) {
if (!obs) {
// this is just a dummy observable
// I wish Rx had Rx.Observable.dummy() alongside
// Rx.Observable.empty(), but oh well
obs = Rx.Observable.of('dummy');
}
return this.init()
.flatMap(() => {
return obs; // when you call obs.next(), it should fire this chain again
})
.flatMap(() => {
return acquireLock(this)
.flatMap(obj => {
return acquireLockRetry(this, obj)
})
})
.flatMap(obj => {
return findFirstLine(this)
.flatMap(l => {
return releaseLock(this, obj.id)
.map(() => {
console.log(' => LLLL1 => ', l);
return l;
});
});
})
.filter(l => {
// filter out any lines => only fire event if there is no line
return !l;
})
.map(() => {
// the queue is now empty
obs.complete(); // <<<<<<<<<< note this call
return {isEmpty: true}
});
};
Queue.prototype.drain = function (obs, opts) {
opts = opts || {};
const isConnect = opts.isConnect || false;
const delay = opts.delay || 500;
let $obs = obs.takeUntil(this.isEmpty(obs))
.flatMap(() => {
return this.init();
})
.flatMap(() => {
return acquireLock(this)
.flatMap(obj => {
return acquireLockRetry(this, obj)
});
})
.flatMap(obj => {
return removeOneLine(this)
.flatMap(l => {
return releaseLock(this, obj.id)
.map(() => l);
});
});
process.nextTick(function(){
obs.next('foo foo foo');
$obs.next('bar bar bar');
$obs.complete();
});
return $obs;
};
是什么导致了绝对的疯狂,当我像这样调用上面的内容时,我无法触发 onCompleted 回调:
const q = new Queue();
const obs = new Rx.Subject();
q.drain(obs).subscribe(
function (v) {
console.log('end result => ', colors.yellow(util.inspect(v)));
setTimeout(function () {
// the following call serves as the callback which will fire the observables in the methods again
obs.next();
}, 100);
},
function (e) {
console.log('on error => ', e);
},
function (c) {
// this never gets called and it is driving me f*cking crazy
console.log(colors.red(' DRAIN on completed => '), c);
}
);
obs.subscribe(
function (v) {
console.log('next item that was drained => ', v);
},
function (e) {
console.log('on error => ', e);
},
function (c) {
// this gets called!
console.log(colors.red(' => obs on completed => '), c);
}
);
当我调用上面的方法时,我得到了这个:
next item that was drained => foo foo foo
next item that was drained => bar bar bar
=> obs on completed => undefined
我只得到这 3 行的原因是因为我这样做了:
process.nextTick(function(){
obs.next('foo foo foo');
$obs.next('bar bar bar');
$obs.complete();
});
但是为什么不会显式调用$obs.complete();
触发这个回调:
function (c) {
// this never gets called and it is driving me f*cking crazy
console.log(colors.red(' DRAIN on completed => '), c);
}
?
好吧,我想我明白了,这个 RxJS 库多么疯狂
最有可能做正确的事情,你应该使用 take() 或 takeUntil() 或类似的方法
所以我这样做了:
Queue.prototype.drain = function (obs, opts) {
if (!(obs instanceof Rx.Observable)) {
opts = obs || {};
obs = new Rx.Subject();
}
else {
opts = opts || {};
}
const isConnect = opts.isConnect || false;
const delay = opts.delay || 500;
process.nextTick(function () {
obs.next();
});
let $obs = obs
.flatMap(() => {
return this.init();
})
.flatMap(() => {
return acquireLock(this)
.flatMap(obj => {
return acquireLockRetry(this, obj)
});
})
.flatMap(obj => {
return removeOneLine(this)
.flatMap(l => {
return releaseLock(this, obj.id)
.map(() => ({data: l, cb: obs.next.bind(obs)}));
});
})
// here is the key part!
.takeUntil(this.isEmpty(obs));
return $obs;
};
这似乎成功了。有一段时间我很绝望。如果您想进一步了解这是如何工作的,请在内部查询。
我在队列中有这两种方法。我已经实现了某种形式的背压,由此从方法创建的可观察对象只会在用户触发回调时触发事件,所有这些都是通过可观察对象进行的。问题是我无法让 onCompleted 处理程序在 drain()
的主订阅者中触发。令我惊讶的是 onNext 会为同一个订阅者触发,那么为什么 onCompleted 不会触发呢?我认为在 takeUntil 调用和订阅者中的 onCompleted 处理程序将触发的重手 $obs.complete() 之间...
Queue.prototype.isEmpty = function (obs) {
if (!obs) {
// this is just a dummy observable
// I wish Rx had Rx.Observable.dummy() alongside
// Rx.Observable.empty(), but oh well
obs = Rx.Observable.of('dummy');
}
return this.init()
.flatMap(() => {
return obs; // when you call obs.next(), it should fire this chain again
})
.flatMap(() => {
return acquireLock(this)
.flatMap(obj => {
return acquireLockRetry(this, obj)
})
})
.flatMap(obj => {
return findFirstLine(this)
.flatMap(l => {
return releaseLock(this, obj.id)
.map(() => {
console.log(' => LLLL1 => ', l);
return l;
});
});
})
.filter(l => {
// filter out any lines => only fire event if there is no line
return !l;
})
.map(() => {
// the queue is now empty
obs.complete(); // <<<<<<<<<< note this call
return {isEmpty: true}
});
};
Queue.prototype.drain = function (obs, opts) {
opts = opts || {};
const isConnect = opts.isConnect || false;
const delay = opts.delay || 500;
let $obs = obs.takeUntil(this.isEmpty(obs))
.flatMap(() => {
return this.init();
})
.flatMap(() => {
return acquireLock(this)
.flatMap(obj => {
return acquireLockRetry(this, obj)
});
})
.flatMap(obj => {
return removeOneLine(this)
.flatMap(l => {
return releaseLock(this, obj.id)
.map(() => l);
});
});
process.nextTick(function(){
obs.next('foo foo foo');
$obs.next('bar bar bar');
$obs.complete();
});
return $obs;
};
是什么导致了绝对的疯狂,当我像这样调用上面的内容时,我无法触发 onCompleted 回调:
const q = new Queue();
const obs = new Rx.Subject();
q.drain(obs).subscribe(
function (v) {
console.log('end result => ', colors.yellow(util.inspect(v)));
setTimeout(function () {
// the following call serves as the callback which will fire the observables in the methods again
obs.next();
}, 100);
},
function (e) {
console.log('on error => ', e);
},
function (c) {
// this never gets called and it is driving me f*cking crazy
console.log(colors.red(' DRAIN on completed => '), c);
}
);
obs.subscribe(
function (v) {
console.log('next item that was drained => ', v);
},
function (e) {
console.log('on error => ', e);
},
function (c) {
// this gets called!
console.log(colors.red(' => obs on completed => '), c);
}
);
当我调用上面的方法时,我得到了这个:
next item that was drained => foo foo foo
next item that was drained => bar bar bar
=> obs on completed => undefined
我只得到这 3 行的原因是因为我这样做了:
process.nextTick(function(){
obs.next('foo foo foo');
$obs.next('bar bar bar');
$obs.complete();
});
但是为什么不会显式调用$obs.complete();
触发这个回调:
function (c) {
// this never gets called and it is driving me f*cking crazy
console.log(colors.red(' DRAIN on completed => '), c);
}
?
好吧,我想我明白了,这个 RxJS 库多么疯狂
最有可能做正确的事情,你应该使用 take() 或 takeUntil() 或类似的方法
所以我这样做了:
Queue.prototype.drain = function (obs, opts) {
if (!(obs instanceof Rx.Observable)) {
opts = obs || {};
obs = new Rx.Subject();
}
else {
opts = opts || {};
}
const isConnect = opts.isConnect || false;
const delay = opts.delay || 500;
process.nextTick(function () {
obs.next();
});
let $obs = obs
.flatMap(() => {
return this.init();
})
.flatMap(() => {
return acquireLock(this)
.flatMap(obj => {
return acquireLockRetry(this, obj)
});
})
.flatMap(obj => {
return removeOneLine(this)
.flatMap(l => {
return releaseLock(this, obj.id)
.map(() => ({data: l, cb: obs.next.bind(obs)}));
});
})
// here is the key part!
.takeUntil(this.isEmpty(obs));
return $obs;
};
这似乎成功了。有一段时间我很绝望。如果您想进一步了解这是如何工作的,请在内部查询。