使用 RxJS5 设置速率
Set rate using RxJS5
我有这段代码,它只是从 .csv 文件中读取数据并将其转换为 json 并记录数据:
const fs = require('fs');
const path = require('path');
const sd = path.resolve(__dirname + '/fixtures/SampleData.csv');
const strm = fs.createReadStream(sd).setEncoding('utf8');
const Rx = require('rxjs/Rx');
const csv2json = require('csv2json');
const dest = strm
.pipe(csv2json({
separator: ','
}));
dest.on('error', function(e){
console.error(e.stack || e);
})
const obs = Rx.Observable.fromEvent(dest, 'data')
.flatMap(d => Rx.Observable.timer(100).mapTo(d))
obs.subscribe(v => {
console.log(String(v));
})
代码所做的是在 100 毫秒延迟后记录所有数据。 我实际上想延迟每一行数据并在一小段延迟后记录每一行。
以上代码无法实现 - 控制数据记录速率的最佳方法是什么?
假设:所有数据行大约同时进入,所以都延迟了 100 毫秒,所以它们最终几乎同时被打印出来。我只需要在记录上一行之后开始延迟下一行。
下面的代码似乎与使用上面的计时器做同样的事情:
const obs = Rx.Observable.fromEvent(dest, 'data')
.delay(100)
我在 RxJS 库中找不到我需要的功能(虽然它可能在那里,但我就是找不到,如果有更好、更惯用的方法,请告诉我)。
所以我写了这个,它似乎完成了工作:
const fs = require('fs');
const path = require('path');
const sd = path.resolve(__dirname + '/fixtures/SampleData.csv');
const strm = fs.createReadStream(sd).setEncoding('utf8');
const Rx = require('rxjs/Rx');
const csv2json = require('csv2json');
const p = Rx.Observable.prototype;
p.eachWait = function(timeout){
const source = this;
const values = [];
let flipped = true;
const onNext = function (sub){
flipped = false;
setTimeout(() => {
var c = values.pop();
if(c) sub.next(c);
if(values.length > 0){
onNext(sub);
}
else{
flipped = true;
}
}, timeout);
}
return Rx.Observable.create(sub => {
return source.subscribe(
function next(v){
values.unshift(v);
if(flipped){
onNext(sub);
}
},
sub.error.bind(sub),
sub.complete.bind(sub)
);
});
}
const dest = strm
.pipe(csv2json({
separator: ','
}));
dest.on('error', function(e){
console.error(e.stack || e);
});
const obs = Rx.Observable.fromEvent(dest, 'data')
.eachWait(1000)
obs.subscribe(v => {
console.log(String(v));
});
我认为这已达到您所能达到的最佳性能 - 在任何给定时刻只有一个计时器应该 运行。
Hypothesis: All the lines of data come in approximately at the same
time, so all are delayed 100 ms, so they end up getting printed at
pretty much the same time. I need to only start delaying the next line
after the previous as been logged.
您的假设正确
解决方案
用 .concatMap()
替换原始解决方案中的 .flatMap()
Rx.Observable.from([1,2,3,4])
.mergeMap(i => Rx.Observable.timer(500).mapTo(i))
.subscribe(val => console.log('mergeMap value: ' + val));
Rx.Observable.from([1,2,3,4])
.concatMap(i => Rx.Observable.timer(500).mapTo(i))
.subscribe(val => console.log('concatMap value: ' + val));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>
这将确保每次发射都在订阅下一次发射并开始延迟其值之前完成。
我有这段代码,它只是从 .csv 文件中读取数据并将其转换为 json 并记录数据:
const fs = require('fs');
const path = require('path');
const sd = path.resolve(__dirname + '/fixtures/SampleData.csv');
const strm = fs.createReadStream(sd).setEncoding('utf8');
const Rx = require('rxjs/Rx');
const csv2json = require('csv2json');
const dest = strm
.pipe(csv2json({
separator: ','
}));
dest.on('error', function(e){
console.error(e.stack || e);
})
const obs = Rx.Observable.fromEvent(dest, 'data')
.flatMap(d => Rx.Observable.timer(100).mapTo(d))
obs.subscribe(v => {
console.log(String(v));
})
代码所做的是在 100 毫秒延迟后记录所有数据。 我实际上想延迟每一行数据并在一小段延迟后记录每一行。
以上代码无法实现 - 控制数据记录速率的最佳方法是什么?
假设:所有数据行大约同时进入,所以都延迟了 100 毫秒,所以它们最终几乎同时被打印出来。我只需要在记录上一行之后开始延迟下一行。
下面的代码似乎与使用上面的计时器做同样的事情:
const obs = Rx.Observable.fromEvent(dest, 'data')
.delay(100)
我在 RxJS 库中找不到我需要的功能(虽然它可能在那里,但我就是找不到,如果有更好、更惯用的方法,请告诉我)。
所以我写了这个,它似乎完成了工作:
const fs = require('fs');
const path = require('path');
const sd = path.resolve(__dirname + '/fixtures/SampleData.csv');
const strm = fs.createReadStream(sd).setEncoding('utf8');
const Rx = require('rxjs/Rx');
const csv2json = require('csv2json');
const p = Rx.Observable.prototype;
p.eachWait = function(timeout){
const source = this;
const values = [];
let flipped = true;
const onNext = function (sub){
flipped = false;
setTimeout(() => {
var c = values.pop();
if(c) sub.next(c);
if(values.length > 0){
onNext(sub);
}
else{
flipped = true;
}
}, timeout);
}
return Rx.Observable.create(sub => {
return source.subscribe(
function next(v){
values.unshift(v);
if(flipped){
onNext(sub);
}
},
sub.error.bind(sub),
sub.complete.bind(sub)
);
});
}
const dest = strm
.pipe(csv2json({
separator: ','
}));
dest.on('error', function(e){
console.error(e.stack || e);
});
const obs = Rx.Observable.fromEvent(dest, 'data')
.eachWait(1000)
obs.subscribe(v => {
console.log(String(v));
});
我认为这已达到您所能达到的最佳性能 - 在任何给定时刻只有一个计时器应该 运行。
Hypothesis: All the lines of data come in approximately at the same time, so all are delayed 100 ms, so they end up getting printed at pretty much the same time. I need to only start delaying the next line after the previous as been logged.
您的假设正确
解决方案
用 .concatMap()
.flatMap()
Rx.Observable.from([1,2,3,4])
.mergeMap(i => Rx.Observable.timer(500).mapTo(i))
.subscribe(val => console.log('mergeMap value: ' + val));
Rx.Observable.from([1,2,3,4])
.concatMap(i => Rx.Observable.timer(500).mapTo(i))
.subscribe(val => console.log('concatMap value: ' + val));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>
这将确保每次发射都在订阅下一次发射并开始延迟其值之前完成。