使用 RxJS5 设置速率

Set rate using RxJS5

我有这段代码,它只是从 .csv 文件中读取数据并将其转换为 json 并记录数据:

const fs = require('fs');
const path = require('path');

const sd = path.resolve(__dirname + '/fixtures/SampleData.csv');
const strm = fs.createReadStream(sd).setEncoding('utf8');

const Rx = require('rxjs/Rx');
const csv2json = require('csv2json');


const dest = strm
  .pipe(csv2json({
    separator: ','
  }));

dest.on('error', function(e){
    console.error(e.stack || e);
})

const obs = Rx.Observable.fromEvent(dest, 'data')
          .flatMap(d => Rx.Observable.timer(100).mapTo(d))

obs.subscribe(v => {
    console.log(String(v));
})

代码所做的是在 100 毫秒延迟后记录所有数据。 我实际上想延迟每一行数据并在一小段延迟后记录每一行。

以上代码无法实现 - 控制数据记录速率的最佳方法是什么?

假设:所有数据行大约同时进入,所以都延迟了 100 毫秒,所以它们最终几乎同时被打印出来。我只需要在记录上一行之后开始延迟下一行。

下面的代码似乎与使用上面的计时器做同样的事情:

const obs = Rx.Observable.fromEvent(dest, 'data')
      .delay(100)

我在 RxJS 库中找不到我需要的功能(虽然它可能在那里,但我就是找不到,如果有更好、更惯用的方法,请告诉我)。

所以我写了这个,它似乎完成了工作:

const fs = require('fs');
const path = require('path');

const sd = path.resolve(__dirname + '/fixtures/SampleData.csv');
const strm = fs.createReadStream(sd).setEncoding('utf8');

const Rx = require('rxjs/Rx');
const csv2json = require('csv2json');

const p = Rx.Observable.prototype;

p.eachWait = function(timeout){

    const source = this;
    const values = [];
    let flipped = true;

    const onNext = function (sub){

          flipped = false;

          setTimeout(() => {

            var c = values.pop();
            if(c)  sub.next(c);

            if(values.length > 0){
               onNext(sub);
            }
            else{
               flipped = true;
            }

         }, timeout);
    }

      return Rx.Observable.create(sub => {

          return source.subscribe(

                function next(v){

                         values.unshift(v);

                         if(flipped){
                             onNext(sub);
                         }

                 },
              sub.error.bind(sub),
              sub.complete.bind(sub)
          );

      });

}


const dest = strm
  .pipe(csv2json({
    separator: ','
  }));

dest.on('error', function(e){
    console.error(e.stack || e);
});

const obs = Rx.Observable.fromEvent(dest, 'data')
      .eachWait(1000)

obs.subscribe(v => {
  console.log(String(v));
});

我认为这已达到您所能达到的最佳性能 - 在任何给定时刻只有一个计时器应该 运行。

Hypothesis: All the lines of data come in approximately at the same time, so all are delayed 100 ms, so they end up getting printed at pretty much the same time. I need to only start delaying the next line after the previous as been logged.

您的假设正确

解决方案

.concatMap()

替换原始解决方案中的 .flatMap()

Rx.Observable.from([1,2,3,4])
  .mergeMap(i => Rx.Observable.timer(500).mapTo(i))
  .subscribe(val => console.log('mergeMap value: ' + val));

Rx.Observable.from([1,2,3,4])
  .concatMap(i => Rx.Observable.timer(500).mapTo(i))
  .subscribe(val => console.log('concatMap value: ' + val));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>

这将确保每次发射都在订阅下一次发射并开始延迟其值之前完成。