RxJS:如何在传递下一个有效值之前进行一些清理?
RxJS: How to do some clean-up before the next valid value is passed?
我必须合并流以获得 URL 来加载图像:一个流用于拖放事件,一个流用于文件输入更改。在每个新路径上,我加载此图像并将其绘制到 canvas。这个 canvas 被传递到另一个流中。它看起来像这样:
// prevent browsers default behavior for dropTargetElement
[ 'drop', 'dragover' ].forEach(function(eventName) {
Rx.Observable.fromEvent(dropTargetElement, eventName).subscribe(function(event) {
event.preventDefault();
});
});
// file path stream merged from openFileInputs change and openFileDropTargets drop
Rx.Observable.merge(Rx.Observable.fromEvent(inputElement, 'change').map(function(event) {
return event.target.value;
}), Rx.Observable.fromEvent(dropTargetElement, 'drop').map(function(event) {
return event.dataTransfer.files[0].path;
})).map(function(path) {
var image = new Image();
image.src = path;
// note: I return an Observable in this map function
// is this even good practice? if yes, is mergeAll the best
// way to get the "load" event?
return Rx.Observable.fromEvent(image, 'load');
}).mergeAll().map(function(event) {
return event.path[0];
}).subscribe(function(image) {
var canvas = document.createElement('canvas');
var context = canvas.getContext('2d');
canvas.width = image.width;
canvas.height = image.height;
context.drawImage(image, 0, 0);
canvasState.onNext(canvas);
});
(附带问题:"allowed" 到 return Observables 在 map
中吗?)
我的 canvasState
看起来像这样:
var canvasState = new Rx.BehaviorSubject(undefined);
// draw image
canvasState.filter(function(canvas) {
return !!canvas;
}).subscribe(function drawImage(canvas) {
document.body.appendChild(canvas);
});
如您所见,如果我的 canvas 值为真,我将 canvas 附加到正文。但是每次出现新的 canvas 时,我都想删除旧的。实现这一目标的最佳方法是什么?这样的事情可能吗?
// draw image
canvasState.filter(function(canvas) {
return !!canvas;
}).beforeNext(function(oldCanvas) {
// remove old one
}).subscribe(function drawImage(canvas) {
document.body.appendChild(canvas);
});
嵌套的 Observables
是的,map
操作 return Observable 是正常的。这为您提供了可观察流的可观察流。有 Rx 3 个操作可以将嵌套的可观察对象展平一层:
- mergeAll - 订阅所有到达的内部流。您通常最终会同时订阅多个流,并且通常您的最终流会将来自不同内部流的结果混合在一起。您可以提供
maxConcurrent
参数来限制并发内部订阅的数量。当达到限制时,新的内部 observable 会排队,并且在先前的内部 observable 完成之前不会被订阅。
- concatAll - 按顺序一次订阅每个内部可观察流。您的结果流将以可预测的顺序生成项目。 Concat 只是
Merge
,maxConcurrent 设置为 1。
- switch - 订阅第一个内部流。然后,当新流到达时,"switch" 到它。基本上取消订阅之前的内流,订阅最新的内流。如果只想收听最新的内部流,请使用
switch
。
(在我的代码中,我还实现了第四个扁平化运算符:concatLatest
,类似于 concat
,但是当内部流完成时,而不是跳转到中的下一个流队列,它直接跳转到队列中的最新序列,丢弃任何跳过的内部流。它的行为介于 concat
和 switch
之间,我发现它在处理背压时非常有用仍在产生结果)。
所以,.map(...).mergeAll()
或.map(...).concatAll()
或.map(...).switch()
是很常见的。它是如此常见,以至于有针对这 3 种情况的 Rx 方法:
source.flatMap(selector)
等同于 source.map(selector).mergeAll()
source.concatMap(selector)
等同于 source.map(selector).concatAll()
source.flatMapLatest(selector)
等同于 source.map(selector).switch()
您的代码
根据以上信息,我建议您使用 switch
而不是 merge
。按照您现在的代码,如果有人快速更改值,您可能会同时发出 2 个图像请求,如果它们以错误的顺序完成,您最终会得到错误的最终结果 canvas。 switch
将取消陈旧的图片请求并切换到新的。
请注意,如果用户更改值的速度一直快于图像请求完成的速度,那么他们将永远看不到任何图像,因为您将不断切换到新图像。这是我通常使用我的 concatLatest
的地方,因为它会不时地完成一些中间请求,同时它正在尝试跟上。 YMMV.
清理旧的 canvases
当我的可观察流有一个 map 操作产生必须清理的资源时,我倾向于使用 scan
因为它可以为我跟踪以前的值(不需要我去创建一个闭包变量保持以前的值):
canvasState
.filter(function (c) { return !!c; })
.scan({}, function (acc, canvas) {
return { previous: acc.canvas, canvas: canvas };
})
.subscribe(function (v) {
// remove the old canvas
if (v.previous) { document.body.removeChild(v.previous); }
// add the new one
document.body.appendChild(v.canvas);
});
我必须合并流以获得 URL 来加载图像:一个流用于拖放事件,一个流用于文件输入更改。在每个新路径上,我加载此图像并将其绘制到 canvas。这个 canvas 被传递到另一个流中。它看起来像这样:
// prevent browsers default behavior for dropTargetElement
[ 'drop', 'dragover' ].forEach(function(eventName) {
Rx.Observable.fromEvent(dropTargetElement, eventName).subscribe(function(event) {
event.preventDefault();
});
});
// file path stream merged from openFileInputs change and openFileDropTargets drop
Rx.Observable.merge(Rx.Observable.fromEvent(inputElement, 'change').map(function(event) {
return event.target.value;
}), Rx.Observable.fromEvent(dropTargetElement, 'drop').map(function(event) {
return event.dataTransfer.files[0].path;
})).map(function(path) {
var image = new Image();
image.src = path;
// note: I return an Observable in this map function
// is this even good practice? if yes, is mergeAll the best
// way to get the "load" event?
return Rx.Observable.fromEvent(image, 'load');
}).mergeAll().map(function(event) {
return event.path[0];
}).subscribe(function(image) {
var canvas = document.createElement('canvas');
var context = canvas.getContext('2d');
canvas.width = image.width;
canvas.height = image.height;
context.drawImage(image, 0, 0);
canvasState.onNext(canvas);
});
(附带问题:"allowed" 到 return Observables 在 map
中吗?)
我的 canvasState
看起来像这样:
var canvasState = new Rx.BehaviorSubject(undefined);
// draw image
canvasState.filter(function(canvas) {
return !!canvas;
}).subscribe(function drawImage(canvas) {
document.body.appendChild(canvas);
});
如您所见,如果我的 canvas 值为真,我将 canvas 附加到正文。但是每次出现新的 canvas 时,我都想删除旧的。实现这一目标的最佳方法是什么?这样的事情可能吗?
// draw image
canvasState.filter(function(canvas) {
return !!canvas;
}).beforeNext(function(oldCanvas) {
// remove old one
}).subscribe(function drawImage(canvas) {
document.body.appendChild(canvas);
});
嵌套的 Observables
是的,map
操作 return Observable 是正常的。这为您提供了可观察流的可观察流。有 Rx 3 个操作可以将嵌套的可观察对象展平一层:
- mergeAll - 订阅所有到达的内部流。您通常最终会同时订阅多个流,并且通常您的最终流会将来自不同内部流的结果混合在一起。您可以提供
maxConcurrent
参数来限制并发内部订阅的数量。当达到限制时,新的内部 observable 会排队,并且在先前的内部 observable 完成之前不会被订阅。 - concatAll - 按顺序一次订阅每个内部可观察流。您的结果流将以可预测的顺序生成项目。 Concat 只是
Merge
,maxConcurrent 设置为 1。 - switch - 订阅第一个内部流。然后,当新流到达时,"switch" 到它。基本上取消订阅之前的内流,订阅最新的内流。如果只想收听最新的内部流,请使用
switch
。
(在我的代码中,我还实现了第四个扁平化运算符:concatLatest
,类似于 concat
,但是当内部流完成时,而不是跳转到中的下一个流队列,它直接跳转到队列中的最新序列,丢弃任何跳过的内部流。它的行为介于 concat
和 switch
之间,我发现它在处理背压时非常有用仍在产生结果)。
所以,.map(...).mergeAll()
或.map(...).concatAll()
或.map(...).switch()
是很常见的。它是如此常见,以至于有针对这 3 种情况的 Rx 方法:
source.flatMap(selector)
等同于source.map(selector).mergeAll()
source.concatMap(selector)
等同于source.map(selector).concatAll()
source.flatMapLatest(selector)
等同于source.map(selector).switch()
您的代码
根据以上信息,我建议您使用 switch
而不是 merge
。按照您现在的代码,如果有人快速更改值,您可能会同时发出 2 个图像请求,如果它们以错误的顺序完成,您最终会得到错误的最终结果 canvas。 switch
将取消陈旧的图片请求并切换到新的。
请注意,如果用户更改值的速度一直快于图像请求完成的速度,那么他们将永远看不到任何图像,因为您将不断切换到新图像。这是我通常使用我的 concatLatest
的地方,因为它会不时地完成一些中间请求,同时它正在尝试跟上。 YMMV.
清理旧的 canvases
当我的可观察流有一个 map 操作产生必须清理的资源时,我倾向于使用 scan
因为它可以为我跟踪以前的值(不需要我去创建一个闭包变量保持以前的值):
canvasState
.filter(function (c) { return !!c; })
.scan({}, function (acc, canvas) {
return { previous: acc.canvas, canvas: canvas };
})
.subscribe(function (v) {
// remove the old canvas
if (v.previous) { document.body.removeChild(v.previous); }
// add the new one
document.body.appendChild(v.canvas);
});