将多个异步 Observable<List> 减少为一个 Observable<List>
Reduce many async Observable<List> into one Observable<List>
所以这就是我正在尝试做的事情:
对于每个事件,我想调用一些 Service.getList()
10 次,然后将 10 个列表合并为一个。
现在我尝试了这两种方法,它们都在 UT 中工作但在真实应用程序中失败(我猜我在真实应用程序中给定异步 http 调用时我没有正确执行 reduce 操作)。
对于这两种情况,我都看不到 reduce()
、onNext()
和 onError()
之后的日志,所以我猜测 reduce()
操作没有完成。
尝试 #1:
public Observable<List<Event>> getEventsForLocation(Location location) {
List<Observable<List<Event>>> obs = new ArrayList<>();
for (Venue v : location.getVenues()) {
obs.add(getEventsForVenue(v)); //does one http call, returns Observable<List<Event>>
}
return Observable.concat(Observable.from(obs))
.reduce((List<Event>) new ArrayList<Event>(), (events, events2) -> {
events.addAll(events2);
return events;
})
.doOnNext(events -> Log.d("reduce ", events.toString()))
.doOnError(throwable -> Log.e("reduce error", throwable.toString()));}
尝试#2:
public Observable<List<Event>> getEventsForLocation(Location location) {
return Observable
.from(location.getVenues())
.flatMap(venue -> getEventsForVenue(venue)) //does one http call, returns Observable<List<Event>>
.reduce((List<Event>) new ArrayList<Event>(), (events, events2) -> {
events.addAll(events2);
return events;
})
.doOnNext(events -> Log.d("service", "total events " + events.toString()))
.doOnError(t -> Log.e("service", "total events error2 " + t.toString()));}
两种方法都通过的 UT:
@Test
public void getEventsForLocation() {
Location loc = new Location("test", newArrayList(new Venue("v1", "url1"),new Venue("v2", "url2")));
when(httpGateway.downloadWebPage(Mockito.anyString())).thenReturn(
Observable.just(readResource("eventsForVenue1.html")),
Observable.just(readResource("eventsForVenue2.html"))
);
TestSubscriber<List<Event>> probe = new TestSubscriber<>();
service.getEventsForLocation(loc).subscribe(probe);
probe.assertNoErrors();
//assert the next event containts contents of all lists
List<Event> events = probe.getOnNextEvents().get(0);
//first list
Assert.assertEquals("Unexpected title", "event1", events.get(0).getName());
Assert.assertEquals("Unexpected artist", "artist1", events.get(0).getArtist());
//second list
Assert.assertEquals("Unexpected title", "event2", events.get(1).getName());
Assert.assertEquals("Unexpected artist", "artist2", events.get(1).getArtist());
}
更新
这是更完整的代码,带有调度程序。
Observable
.just(loc)
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.io())
.flatMap(location -> service.getEventsForLocation(location))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver();
您可以使用 Collect 来解决问题。但是在我合并多个项目的这种情况下,通常我更愿意使用 Scan。对于每个新项目,它都会为您提供最后处理的项目。因此,您可以将每个项目附加到之前的项目。
看看弹珠图是否适合你的情况
http://reactivex.io/documentation/operators/scan.html
尝试从这个简单的示例开始,然后尝试将其应用到您的代码中
/**
* apply this function for every item against the previous emitted item from the source.
* Emitted:
0
1
3
6
10
15
*/
@Test
public void scanObservable() {
Integer[] numbers = {0, 1, 2, 3, 4, 5};
Observable.from(numbers)
.scan((lastItemEmitted, newItem) -> (lastItemEmitted + newItem))
.subscribe(System.out::println);
}
我终于找到了一个解决方案,使用 zip() 但我不喜欢它。
问题应该可以通过 flatMap/reduce
的组合来解决
public Observable<List<Event>> getEventsForLocation(Location location) {
List<Observable<List<Event>>> venues = new ArrayList<>();
for (Venue v : location.getVenues()) {
venues.add(Observable.just(v).flatMap(venue -> getEventsForVenue(venue)));
}
return Observable.zip(venues, new FuncN<List<Event>>() {
@Override
public List<Event> call(Object... args) {
List<Event> allEvents = new ArrayList<Event>();
for (Object o : args) {
List<Event> le = (List<Event>) o;
allEvents.addAll(le);
}
return allEvents;
}
});
如果你不关心列表中的最后顺序,你可以直接使用from
+ flatMap
+ flatMapIterable
+ toList
:
Observable.from(location.getVenues())
.flatMap(venue -> getEventsForVenue(venue))
.flatMapIterable(list -> list)
.toList();
如果顺序很重要并且您想在 "parallel" 中执行 getEventsForVenue,您可以将 flatMap 替换为 concatMapEager:
Observable.from(location.getVenues())
.concatMapEager(venue -> getEventsForVenue(venue))
.concatMapIterable(list -> list)
.toList();
所以这就是我正在尝试做的事情:
对于每个事件,我想调用一些 Service.getList()
10 次,然后将 10 个列表合并为一个。
现在我尝试了这两种方法,它们都在 UT 中工作但在真实应用程序中失败(我猜我在真实应用程序中给定异步 http 调用时我没有正确执行 reduce 操作)。
对于这两种情况,我都看不到 reduce()
、onNext()
和 onError()
之后的日志,所以我猜测 reduce()
操作没有完成。
尝试 #1:
public Observable<List<Event>> getEventsForLocation(Location location) {
List<Observable<List<Event>>> obs = new ArrayList<>();
for (Venue v : location.getVenues()) {
obs.add(getEventsForVenue(v)); //does one http call, returns Observable<List<Event>>
}
return Observable.concat(Observable.from(obs))
.reduce((List<Event>) new ArrayList<Event>(), (events, events2) -> {
events.addAll(events2);
return events;
})
.doOnNext(events -> Log.d("reduce ", events.toString()))
.doOnError(throwable -> Log.e("reduce error", throwable.toString()));}
尝试#2:
public Observable<List<Event>> getEventsForLocation(Location location) {
return Observable
.from(location.getVenues())
.flatMap(venue -> getEventsForVenue(venue)) //does one http call, returns Observable<List<Event>>
.reduce((List<Event>) new ArrayList<Event>(), (events, events2) -> {
events.addAll(events2);
return events;
})
.doOnNext(events -> Log.d("service", "total events " + events.toString()))
.doOnError(t -> Log.e("service", "total events error2 " + t.toString()));}
两种方法都通过的 UT:
@Test
public void getEventsForLocation() {
Location loc = new Location("test", newArrayList(new Venue("v1", "url1"),new Venue("v2", "url2")));
when(httpGateway.downloadWebPage(Mockito.anyString())).thenReturn(
Observable.just(readResource("eventsForVenue1.html")),
Observable.just(readResource("eventsForVenue2.html"))
);
TestSubscriber<List<Event>> probe = new TestSubscriber<>();
service.getEventsForLocation(loc).subscribe(probe);
probe.assertNoErrors();
//assert the next event containts contents of all lists
List<Event> events = probe.getOnNextEvents().get(0);
//first list
Assert.assertEquals("Unexpected title", "event1", events.get(0).getName());
Assert.assertEquals("Unexpected artist", "artist1", events.get(0).getArtist());
//second list
Assert.assertEquals("Unexpected title", "event2", events.get(1).getName());
Assert.assertEquals("Unexpected artist", "artist2", events.get(1).getArtist());
}
更新
这是更完整的代码,带有调度程序。
Observable
.just(loc)
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.io())
.flatMap(location -> service.getEventsForLocation(location))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver();
您可以使用 Collect 来解决问题。但是在我合并多个项目的这种情况下,通常我更愿意使用 Scan。对于每个新项目,它都会为您提供最后处理的项目。因此,您可以将每个项目附加到之前的项目。
看看弹珠图是否适合你的情况 http://reactivex.io/documentation/operators/scan.html
尝试从这个简单的示例开始,然后尝试将其应用到您的代码中
/**
* apply this function for every item against the previous emitted item from the source.
* Emitted:
0
1
3
6
10
15
*/
@Test
public void scanObservable() {
Integer[] numbers = {0, 1, 2, 3, 4, 5};
Observable.from(numbers)
.scan((lastItemEmitted, newItem) -> (lastItemEmitted + newItem))
.subscribe(System.out::println);
}
我终于找到了一个解决方案,使用 zip() 但我不喜欢它。
问题应该可以通过 flatMap/reduce
public Observable<List<Event>> getEventsForLocation(Location location) {
List<Observable<List<Event>>> venues = new ArrayList<>();
for (Venue v : location.getVenues()) {
venues.add(Observable.just(v).flatMap(venue -> getEventsForVenue(venue)));
}
return Observable.zip(venues, new FuncN<List<Event>>() {
@Override
public List<Event> call(Object... args) {
List<Event> allEvents = new ArrayList<Event>();
for (Object o : args) {
List<Event> le = (List<Event>) o;
allEvents.addAll(le);
}
return allEvents;
}
});
如果你不关心列表中的最后顺序,你可以直接使用from
+ flatMap
+ flatMapIterable
+ toList
:
Observable.from(location.getVenues())
.flatMap(venue -> getEventsForVenue(venue))
.flatMapIterable(list -> list)
.toList();
如果顺序很重要并且您想在 "parallel" 中执行 getEventsForVenue,您可以将 flatMap 替换为 concatMapEager:
Observable.from(location.getVenues())
.concatMapEager(venue -> getEventsForVenue(venue))
.concatMapIterable(list -> list)
.toList();