Rx.js:从承诺列表中获取所有结果作为数组

Rx.js: get all results as array from promise lists

这是演示:

var availableNews$ = Rx.Observable.fromPromise(fetch('https://hacker-news.firebaseio.com/v0/topstories.json?print=pretty').then(res => res.json()));
var pager$ = new Rx.Subject();
var fetchedNews$ = Rx.Observable.combineLatest(availableNews$, pager$, (news, pager) => news.slice((pager.current - 1) * pager.items_per_page, pager.items_per_page))
    .flatMap(id => id)
    .map(id => Rx.Observable.fromPromise(fetch(`https://hacker-news.firebaseio.com/v0/item/${id}.json?print=pretty`).then(res => res.json()))
    .concatMap(res => res));
pager$.subscribe(v => {
  console.log(v);
  document.querySelector('#pager').textContent = JSON.stringify(v);
});
fetchedNews$.subscribe(x => { 
  console.log(x);
  document.querySelector('#news').textContent = JSON.stringify(x);;
})
pager$.next({current: 1, items_per_page: 10})
<script src="https://unpkg.com/@reactivex/rxjs/dist/global/Rx.js"></script>
pager: <p id="pager"></p>
news: <p id="news"></p>
<p>the news return obserable instance instead of <em>array of fetched news</em> which is desired</p>

我在代码中做了这些事情:

1,获取热门新闻提要 (availableNews$)

2,使用 pager 来限制应该获取哪些提要 (pager$)

3、获取新闻应该是一个数组(fetchedNews$)

但是我在第 3 步就搞砸了,返回的结果是每个 promiseObserable 的流,而不是每个 promise 连接成数组的结果。

感谢帮助^_^

--已编辑--

我在 github 上以 issue 的形式提出这个问题,并得到了更好的解决方案。这是代码:

const { Observable } = Rx;
const { ajax: { getJSON }} = Observable;

var availableNews$ = getJSON('https://hacker-news.firebaseio.com/v0/topstories.json?print=pretty');
var pager$ = new Rx.Subject();
var fetchedNews$ = Observable
  .combineLatest(availableNews$, pager$, (news, {current, items_per_page}) => {
    return news.slice((current - 1) * items_per_page, items_per_page);
  })
  .switchMap((ids) => Observable.forkJoin(ids.map((id) => {
    return getJSON(`https://hacker-news.firebaseio.com/v0/item/${id}.json?print=pretty`);
  })));

pager$.subscribe(v => {
  document.querySelector('#pager').textContent = JSON.stringify(v);
});

fetchedNews$.subscribe(stories => {
   document.querySelector('#news').textContent = JSON.stringify(stories);  
});

pager$.next({current: 1, items_per_page: 10})

您有两个或三个错误的组合。

首先,您 combineLatest() 中的 lambda 不对新闻 ID 数组进行操作,而是对包含该数组的单个项目进行操作。因此 flatmap()concatMap().

一样是不必要的

其次,您不需要从 fetch() 创建 Observable,因为

第三,您拥有的 combineLatest() 没有提供在加载新闻时完成的 Promise,而是在加载 availableNews 和 pager 时完成的 Promise。因此,您必须使用 combineLatest() 创建第三个 Observable,但是来自 fetch() Promise 数组。然后,您在第二个 subscribe() 中订阅它,如下所示:

var availableNews$ = Rx.Observable.fromPromise(fetch('https://hacker-news.firebaseio.com/v0/topstories.json?print=pretty').then(res => res.json()));
var pager$ = new Rx.Subject();
var fetchedNews$ = Rx.Observable.combineLatest(availableNews$, pager$, (news, pager) =>
  Rx.Observable.combineLatest.apply(this, news.slice((pager.current - 1) * pager.items_per_page, pager.items_per_page)
    .map(id => fetch(`https://hacker-news.firebaseio.com/v0/item/${id}.json?print=pretty`).then(res => res.json()))));

pager$.subscribe(v => {
  document.querySelector('#pager').textContent = JSON.stringify(v);
});

fetchedNews$.subscribe(x => { 
  x.subscribe(a => {
    document.querySelector('#news').textContent = JSON.stringify(a);
  });
})
pager$.next({current: 1, items_per_page: 10})
<script src="https://unpkg.com/@reactivex/rxjs/dist/global/Rx.js"></script>
pager: <p id="pager"></p>
news: <p id="news"></p>
<p>the news return obserable instance instead of <em>array of fetched news</em> which is desired</p>