Rxjs:使用来自另一个可观察对象的数据更新可观察流中的值,返回单个可观察流

Rxjs: updating values in observable stream with data from another observable, returning a single observable stream

背景

我正在尝试从 Stash Rest Api 拉取请求构建一个可观察的值流。不幸的是,有关 PR 是否存在合并冲突的信息可在合并列表的不同端点获得。

打开的拉取请求列表可见于,比方说,http://my.stash.com/rest/api/1.0/projects/myproject/repos/myrepo/pull-requests

对于每个 PR,有关合并冲突的数据可见于 http://my.stash.com/rest/api/1.0/projects/myproject/repos/myrepo/pull-requests/[PR-ID]/merge

使用 atlas-stash 包,我可以创建和订阅可观察的拉取请求流(每秒更新一次):

let pullRequestsObs = Rx.Observable.create(function(o) {
    stash.pullRequests(project, repo)
        .on('error', function(error) {o.onError(error)})
        .on('allPages', function(data) {
            o.onNext(data);
            o.onCompleted();
        });
    });

let pullRequestStream = pullRequestsObs
    .take(1)
    .merge(
        Rx.Observable
            .interval(1000)
            .flatMapLatest(pullRequestsObs)
    );

pullRequestsStream.subscribe(
    (data) => {
        console.log(data)
        // do something with data
    },
    (error) => log.error(error),
    () => log.info('done')
);

这符合我的期望。最后,pullRequestsStream 是一个可观察对象,其值是 JSON 个对象的列表。

我的目标

我希望更新 pullRequestsStream 值,以便列表的每个元素都包含来自 [PR-ID]/merge api.

的信息

我认为这可以在 pullRequestsStream 上使用 map 来实现,但我没有成功。

let pullRequestWithMergeStream = pullRequestStream.map(function(prlist) {
    _.map(prlist, function(pr) {
        let mergeObs = Rx.Observable.create(function(o) {
            stash.pullRequestMerge(project, repo, pr['id'])
                .on('error', function(error) {o.onError(error)})
                .on('newPage', function(data) {
                    o.onNext(data);
                    o.onCompleted();
                }).take(1);
        });

        mergeObs.subscribe(
            (data) => {
                pr['merge'] = data;
                return pr; // this definitely isn't right
            },
            (error) => log.error(error),
            () => log.info('done')
        );
    });
});

通过一些日志记录,我可以看到拉取请求和合并 api 都被正确命中,但是当我订阅 pullRequestWithMergeStream 时,我 获取未定义的值。

map 中的 subscribe 步骤中使用 return 不起作用(并且看起来不应该)但我无法弄清楚什么 pattern/idiom 会实现我想要的。

有正确的方法吗?我是不是完全走错了路?

tl;博士

我可以使用来自 不同的可观察对象 的信息更新来自 Rxjs.Observable 的值吗?

您可以使用 flatMapconcatMap 让一项任务触发另一项任务。您可以使用 forkJoin 请求并行合并并将结果收集在一个地方。它没有经过测试,但应该是这样的:

pullRequestStream.concatMap(function (prlist){
  var arrayRequestMerge = prlist.map(function(pr){
    return Rx.Observable.create(function(o) {...same as your code});
  });
  return Rx.Observable.forkJoin(arrayRequestMerge)
         .do(function(arrayData){
               prlist.map(function(pr, index){pr['merge']=arrayData[index]
             })})
         .map(function(){return prlist})
})

PS :我认为 prlist 是一个数组。

更新 根据您的评论,这是一个 运行 仅 maxConcurrent 并行调用的版本。

pullRequestStream.concatMap(function (prlist){
  var arrayRequestMerge = prlist.map(function(pr, index){
    return Rx.Observable.create(function(o) {
        stash.pullRequestMerge(project, repo, pr['id'])
            .on('error', function(error) {o.onError(error)})
            .on('newPage', function(data) {
                o.onNext({data: data, index : index});
                o.onCompleted();
            }).take(1);
    });
  });
  var maxConcurrent = 2;
  Rx.Observable.from(arrayRequestMerge)
    .merge(maxConcurrent)
    .do(function(obj){
               prlist[obj.index]['merge'] = obj.data
             })})
    .map(function(){return prlist})
})