了解 rxjs 中的背压 - 仅缓存 5 张等待上传的图像

Understanding back-pressure in rxjs - only cache 5 images waiting for upload

我正在做一个节点项目,需要提交数千张图片进行处理。在将这些图像上传到处理服务器之前,需要调整它们的大小,所以我有一些类似的东西:

imageList
    .map(image => loadAndResizeImage)
    .merge(3)
    .map(image => uploadImage)
    .merge(3)
    .subscribe();

调整图片大小通常需要十分之几秒,上传和处理大约需要 4 秒。

在等待上传队列清除时,如何防止内存中累积数千张调整大小的图像?我可能想要调整 5 张图片的大小并等待上传,以便在图片上传完成后立即从队列中拉出下一张调整大小的图片并上传,并调整新图片的大小并将其添加到 'buffer'.

可以在此处找到问题的说明:

https://jsbin.com/webaleduka/4/edit?js,console

这里有一个加载步骤(耗时 200 毫秒)和一个处理步骤(耗时 4 秒)。每个进程的并发数限制为 2。 我们可以看到,使用 25 个初始项目,我们在内存中得到 20 个图像。

我确实查看了缓冲区选项,但似乎都没有做我想做的事情。

目前我刚刚将加载、调整大小和上传合并到一个延迟的可观察对象中,我将其与最大并发数合并。不过,我想让图片等待上传,我相信这一定是可能的。

我正在使用 RXjs 4,但我想 5 的原理是一样的。

非常感谢。

在 RxJS 5 中我会这样做:

Observable.range(1, 25)
    .bufferCount(5)
    .concatMap(batch => { // process images
        console.log('process', batch);
        return Observable.from(batch)
            .mergeMap(val => Observable.of('p' + val).delay(300))
            .toArray();
    })
    .concatMap(batch => { // send images
        console.log('send batch', batch);
        return Observable.from(batch)
            .mergeMap(val => Observable.of('s' + val).delay(500))
            .toArray();
    })
    .subscribe(val => {
        // console.log('response');
        console.log('response', val);

    });

使用 bufferCount 运算符,我将输入数组分成 5 个项目的批次。然后每个批次首先用第一个 concatMap() 处理(我故意使用 concat 因为我想等到嵌套的 Observable 完成)。然后将处理后的数据发送到另一个 concatMap(),后者将其发送到您的服务器。

我正在使用两个 delay() 运算符来模拟不同的任务需要不同的时间。在我们的案例中,处理图像非常快,所以第一个 concatMap 会比第二个 concatMap 更快地发送项目,这没问题。处理后的图片会堆叠在concatMap里面,会陆续发送。

此演示的输出如下所示:

process [ 1, 2, 3, 4, 5 ]
send batch [ 'p1', 'p2', 'p3', 'p4', 'p5' ]
process [ 6, 7, 8, 9, 10 ]
process [ 11, 12, 13, 14, 15 ]
response [ 'sp1', 'sp2', 'sp3', 'sp4', 'sp5' ]
send batch [ 'p6', 'p7', 'p8', 'p9', 'p10' ]
process [ 16, 17, 18, 19, 20 ]
process [ 21, 22, 23, 24, 25 ]
response [ 'sp6', 'sp7', 'sp8', 'sp9', 'sp10' ]
send batch [ 'p11', 'p12', 'p13', 'p14', 'p15' ]
response [ 'sp11', 'sp12', 'sp13', 'sp14', 'sp15' ]
send batch [ 'p16', 'p17', 'p18', 'p19', 'p20' ]
response [ 'sp16', 'sp17', 'sp18', 'sp19', 'sp20' ]
send batch [ 'p21', 'p22', 'p23', 'p24', 'p25' ]
response [ 'sp21', 'sp22', 'sp23', 'sp24', 'sp25' ]

观看现场演示:https://jsbin.com/mileqa/edit?js,console

然而,如果你想总是先处理一个批次而不是发送它,当它被发送而不是继续处理另一个批次时,你必须将第二个内部 Observable 从 concatMap 移动到 toArray() 在第一个 concatMap() 调用中。

.concatMap(batch => { // process images
    console.log('process', batch);
    return Observable.from(batch)
        .mergeMap(val => Observable.of('p' + val).delay(100))
        .toArray()
        .concatMap(batch => { // send images
            console.log('send batch', batch);
            return Observable.from(batch)
                .mergeMap(val => Observable.of('s' + val).delay(500))
                .toArray();
        });
})

观看现场演示:https://jsbin.com/sabena/2/edit?js,console

这会产生如下输出:

process [ 1, 2, 3, 4, 5 ]
send batch [ 'p1', 'p2', 'p3', 'p4', 'p5' ]
response [ 'sp1', 'sp2', 'sp3', 'sp4', 'sp5' ]
process [ 6, 7, 8, 9, 10 ]
send batch [ 'p6', 'p7', 'p8', 'p9', 'p10' ]
response [ 'sp6', 'sp7', 'sp8', 'sp9', 'sp10' ]
process [ 11, 12, 13, 14, 15 ]
send batch [ 'p11', 'p12', 'p13', 'p14', 'p15' ]
response [ 'sp11', 'sp12', 'sp13', 'sp14', 'sp15' ]
process [ 16, 17, 18, 19, 20 ]
send batch [ 'p16', 'p17', 'p18', 'p19', 'p20' ]
response [ 'sp16', 'sp17', 'sp18', 'sp19', 'sp20' ]
process [ 21, 22, 23, 24, 25 ]
send batch [ 'p21', 'p22', 'p23', 'p24', 'p25' ]
response [ 'sp21', 'sp22', 'sp23', 'sp24', 'sp25' ]

可以看到"process"、"send batch"和"response"日志是有序的。

RxJS 4 中的实现应该几乎相同(只是运算符名称可能略有不同)。

在 RxJS 4 中还有 controlled() operator 在 RxJS 5 中不存在(还没有?)。我可能会做一些与您需要的非常相似的事情。

我认为我已经通过使用 controlled() rxjs 运算符解决了这个问题:

var queuedImages = 0;

var imageSource = Rx.Observable.range(1, 25)
  .map(index => "image_" + index)
  .controlled();

imageSource
  .map(image => loadImage(image))
  .merge(2)
  .do((image) => {
    queuedImages++;
    console.log(`Images waiting for processing: ${queuedImages}`);
  })
  .map(image => processImage(image))
  .merge(2)
  .do( () => {
    queuedImages--;
    console.log(`Images waiting for processing: ${queuedImages}`);

    if(queuedImages < 4){
      console.log(`requesting more image loads`);
      imageSource.request(4-queuedImages);
    }
  })
  .subscribe( 
    (item) => {}, null, 
    () => console.log(`All Complete`) );

imageSource.request(4);

最初请求 4 张图像。这些是从光盘加载然后处理的。随着每个图像的加载和处理,内存中的图像数量将使用 queuedImages 变量进行跟踪。当此数字低于 4 时,将请求更多图像。

可以在这里看到它的 jsbin:

https://jsbin.com/webaleduka/11/edit?js,console

此方法意味着缓存中的图片永远不会超过 6 张左右,并确保缓存中始终有足够的图片等待上传。