来自 RxJS 请求流的同步响应流
Synchronous stream of responses from a stream of requests with RxJS
我是 RxJS 的新手,想知道是否有人可以帮助我。
我想从请求流(有效负载数据)中创建同步响应流(最好使用相应的请求)。
我基本上希望请求一个一个发送,每个等待最后一个的响应。
我试过了,但它会立即发送所有内容 (jsbin):
var requestStream, responseStream;
requestStream = Rx.Observable.from(['a','b','c','d','e']);
responseStream = requestStream.flatMap(
sendRequest,
(val, response)=>{ return {val, response}; }
);
responseStream.subscribe(
item=>{
console.log(item);
},
err => {
console.err(err);
},
()=>{
console.log('Done');
}
);
function sendRequest(val) {
return new Promise((resolve,reject)=>{
setTimeout(()=>{resolve('result for '+val);},1000);
});
};
以下在一定程度上有效,但不使用请求数据流 (jsbin)。
var data, responseStream;
data = ['a','b','c','d','e'];
responseStream = Rx.Observable.create(observer=>{
var sendNext = function(){
var val = data.shift();
if (!val) {
observer.onCompleted();
return;
}
sendRequest(val).then(response=>{
observer.onNext({val, response});
sendNext();
});
};
sendNext();
});
responseStream.subscribe(
item=>{
console.log(item);
},
err => {
console.err(err);
},
()=>{
console.log('Done');
}
);
function sendRequest(val) {
return new Promise((resolve,reject)=>{
setTimeout(()=>{resolve('response for '+val);},Math.random() * 2500 + 500);
});
};
谢谢!
编辑:
澄清一下,这就是我想要实现的目标:
"Send A, when you receive response for A, send B, when you receive response for B, send C, etc..."
按照 user3743222 的建议,使用 concatMap 和 defer 似乎可以做到(jsbin):
responseStream = requestStream.concatMap(
(val)=>{
return Rx.Observable.defer(()=>{
return sendRequest(val);
});
},
(val, response)=>{ return {val, response}; }
);
尝试在您的第一个代码示例中将 flatMap
替换为 concatMap
,如果结果行为符合您的要求,请告诉我。
responseStream = requestStream.concatMap(//I replaced `flatMap`
sendRequest,
(val, response)=>{ return {val, response}; }
);
基本上 concatMap
与 flatMap
具有相似的签名,行为上的区别在于它将等待当前可观察对象被展平完成,然后再继续下一个。所以在这里:
- 一个
requestStream
值将被推送到concatMap
运算符。
concatMap
运算符将生成一个 sendRequest
可观察对象,并且该可观察对象之外的任何值(似乎是一个元组 (val, response)
)都将通过选择器函数和对象结果将传递给下游
- 当
sendRequest
完成时,将处理另一个 requestStream
值。
- 总之,您的请求将被一一处理
或者,也许您想使用 defer
来推迟 sendRequest
.
的执行
responseStream = requestStream.concatMap(//I replaced `flatMap`
function(x){return Rx.Observable.defer(function(){return sendRequest(x);})},
(val, response)=>{ return {val, response}; }
);
我是 RxJS 的新手,想知道是否有人可以帮助我。
我想从请求流(有效负载数据)中创建同步响应流(最好使用相应的请求)。
我基本上希望请求一个一个发送,每个等待最后一个的响应。
我试过了,但它会立即发送所有内容 (jsbin):
var requestStream, responseStream;
requestStream = Rx.Observable.from(['a','b','c','d','e']);
responseStream = requestStream.flatMap(
sendRequest,
(val, response)=>{ return {val, response}; }
);
responseStream.subscribe(
item=>{
console.log(item);
},
err => {
console.err(err);
},
()=>{
console.log('Done');
}
);
function sendRequest(val) {
return new Promise((resolve,reject)=>{
setTimeout(()=>{resolve('result for '+val);},1000);
});
};
以下在一定程度上有效,但不使用请求数据流 (jsbin)。
var data, responseStream;
data = ['a','b','c','d','e'];
responseStream = Rx.Observable.create(observer=>{
var sendNext = function(){
var val = data.shift();
if (!val) {
observer.onCompleted();
return;
}
sendRequest(val).then(response=>{
observer.onNext({val, response});
sendNext();
});
};
sendNext();
});
responseStream.subscribe(
item=>{
console.log(item);
},
err => {
console.err(err);
},
()=>{
console.log('Done');
}
);
function sendRequest(val) {
return new Promise((resolve,reject)=>{
setTimeout(()=>{resolve('response for '+val);},Math.random() * 2500 + 500);
});
};
谢谢!
编辑:
澄清一下,这就是我想要实现的目标:
"Send A, when you receive response for A, send B, when you receive response for B, send C, etc..."
按照 user3743222 的建议,使用 concatMap 和 defer 似乎可以做到(jsbin):
responseStream = requestStream.concatMap(
(val)=>{
return Rx.Observable.defer(()=>{
return sendRequest(val);
});
},
(val, response)=>{ return {val, response}; }
);
尝试在您的第一个代码示例中将 flatMap
替换为 concatMap
,如果结果行为符合您的要求,请告诉我。
responseStream = requestStream.concatMap(//I replaced `flatMap`
sendRequest,
(val, response)=>{ return {val, response}; }
);
基本上 concatMap
与 flatMap
具有相似的签名,行为上的区别在于它将等待当前可观察对象被展平完成,然后再继续下一个。所以在这里:
- 一个
requestStream
值将被推送到concatMap
运算符。 concatMap
运算符将生成一个sendRequest
可观察对象,并且该可观察对象之外的任何值(似乎是一个元组(val, response)
)都将通过选择器函数和对象结果将传递给下游- 当
sendRequest
完成时,将处理另一个requestStream
值。 - 总之,您的请求将被一一处理
或者,也许您想使用 defer
来推迟 sendRequest
.
responseStream = requestStream.concatMap(//I replaced `flatMap`
function(x){return Rx.Observable.defer(function(){return sendRequest(x);})},
(val, response)=>{ return {val, response}; }
);