来自 RxJS 请求流的同步响应流

Synchronous stream of responses from a stream of requests with RxJS

我是 RxJS 的新手,想知道是否有人可以帮助我。

我想从请求流(有效负载数据)中创建同步响应流(最好使用相应的请求)。

我基本上希望请求一个一个发送,每个等待最后一个的响应。

我试过了,但它会立即发送所有内容 (jsbin):

var requestStream, responseStream;
requestStream = Rx.Observable.from(['a','b','c','d','e']);

responseStream = requestStream.flatMap(
  sendRequest,
  (val, response)=>{ return {val, response}; }
);

responseStream.subscribe(
  item=>{
    console.log(item);
  },
  err => {
    console.err(err);
  },
  ()=>{
    console.log('Done');
  }
);

function sendRequest(val) {
  return new Promise((resolve,reject)=>{
    setTimeout(()=>{resolve('result for '+val);},1000);
  });
};

以下在一定程度上有效,但不使用请求数据流 (jsbin)。

var data, responseStream;
data = ['a','b','c','d','e'];
responseStream = Rx.Observable.create(observer=>{
  var sendNext = function(){
    var val = data.shift();
    if (!val) {
      observer.onCompleted();
      return;
    }
    sendRequest(val).then(response=>{
      observer.onNext({val, response});
      sendNext();
    });
  };
  sendNext();
});

responseStream.subscribe(
  item=>{
    console.log(item);
  },
  err => {
    console.err(err);
  },
  ()=>{
    console.log('Done');
  }
);

function sendRequest(val) {
  return new Promise((resolve,reject)=>{
    setTimeout(()=>{resolve('response for '+val);},Math.random() * 2500 + 500);
  });
};

谢谢!

编辑:

澄清一下,这就是我想要实现的目标:

"Send A, when you receive response for A, send B, when you receive response for B, send C, etc..."

按照 user3743222 的建议,使用 concatMap 和 defer 似乎可以做到(jsbin):

responseStream = requestStream.concatMap(
  (val)=>{
    return Rx.Observable.defer(()=>{
      return sendRequest(val);
    });
  },
  (val, response)=>{ return {val, response}; }
);

尝试在您的第一个代码示例中将 flatMap 替换为 concatMap,如果结果行为符合您的要求,请告诉我。

responseStream = requestStream.concatMap(//I replaced `flatMap`
  sendRequest,
  (val, response)=>{ return {val, response}; }
);

基本上 concatMapflatMap 具有相似的签名,行为上的区别在于它将等待当前可观察对象被展平完成,然后再继续下一个。所以在这里:

  • 一个requestStream值将被推送到concatMap运算符。
  • concatMap 运算符将生成一个 sendRequest 可观察对象,并且该可观察对象之外的任何值(似乎是一个元组 (val, response))都将通过选择器函数和对象结果将传递给下游
  • sendRequest 完成时,将处理另一个 requestStream 值。
  • 总之,您的请求将被一一处理

或者,也许您想使用 defer 来推迟 sendRequest.

的执行
responseStream = requestStream.concatMap(//I replaced `flatMap`
  function(x){return Rx.Observable.defer(function(){return sendRequest(x);})},
  (val, response)=>{ return {val, response}; }
);