使用 RxJava 更新和保留现有数据
Update and retain existing data with RxJava
我正在考虑将我的 (Android) 应用程序中的一些逻辑转换为使用 RxJava,但是我正在努力想出一种方法来执行一些更高级的逻辑。
用例如下:我想向用户展示一种Feed。 Feed 由来自不同来源的项目组成,例如消息、文章等。由于 API 限制,应用程序本身必须收集各个来源并将它们显示在一个列表中。
例如,假设我在 Feed 中的项目如下:
class FeedItem {
Type feedItem; // Type of the item, e.g. article, message, etc.
...
}
目前,提要建立在一个单独的线程上,当提要更新时,UI 会使用侦听器通知。为了让您了解它是如何完成的,下面是一些(伪)Java 代码(为清楚起见,省略了线程和其他管理代码)。
class FeedProducer {
List<FeedItem> currentData = new ArrayList();
public void refreshData() {
for (FeedSource source: getFeedSources()) {
Type sourceType = source.getType();
// Remove existing items
currentData.removeIf(item -> item.feedItem.equals(sourceType));
List<FeedItem> newItems = source.produceItems();
// Add the new items
currentData.addAll(newItems);
// Notify the UI things have changed
notifyDataChanged(currentData);
}
// Notify the UI we are done loading
notifyLoadingComplete();
}
}
每次用户想要刷新数据时都会调用此方法refreshData()
。这样,可以只更新一些源,而其他源保持不变(例如,通过更改 getFeedSources()
的 return 值)。
这些来源也在应用程序的其他部分单独使用;我在那里将它们转换为 Observables。这使事情变得容易得多,例如如果数据库发生变化,变化会被 Observable 简单地推送到 UI。
因此,我的问题是如何(优雅地)将这些 Observable 源合并到一个 Observable 中,但其中存在先前结果的 "global" 状态。我研究了各种组合运算符,但没有找到我需要的。如果我忽略了一些明显的东西,我深表歉意,因为我对 RxJava.
还很陌生
如果您有 3 个单独的任务 return [0, 1]
、[10, 11]
和 [20, 21]
,您想将它们合并到一个列表中。在这种情况下,您可以使用zip
操作。
public class TestRx {
public static void main(String[] args) {
// some individual observables.
Observable<List<Integer>> observable1 = Observable.just(Arrays.asList(0, 1));
Observable<List<Integer>> observable2 = Observable.just(Arrays.asList(10, 11));
Observable<List<Integer>> observable3 = Observable.just(Arrays.asList(20, 21));
Observable.zip(observable1, observable2, observable3,
new Func3<List<Integer>, List<Integer>, List<Integer>, List<Integer>>() {
@Override
public List<Integer> call(List<Integer> list1, List<Integer> list2, List<Integer> list3) {
// TODO: Remove existing items
// merge all lists
List<Integer> mergedList = new ArrayList<>();
mergedList.addAll(list1);
mergedList.addAll(list2);
mergedList.addAll(list3);
return mergedList;
}
})
.subscribe(new Observer<List<Integer>>() {
@Override
public void onNext(List<Integer> mergedList) {
System.out.println(mergedList);
// TODO: notifyDataChanged(mergedList)
}
@Override
public void onError(Throwable throwable) {
System.out.println(throwable.toString());
// TODO: handle exceptions
}
@Override
public void onCompleted() {
// TODO: notifyLoadingComplete()
}
});
}
}
结果,它打印成这样[0, 1, 10, 11, 20, 21]
。
天真的选项是,调用者存储最后一个列表并在您请求新数据时将其作为参数提供:
public class ReactiveMultipleSources {
// region Classes
public enum SourceType {
TYPE_ARTICLE,
TYPE_MESSAGE,
TYPE_VIDEO
}
public static class Feed {
private SourceType sourceType;
private String content;
Feed(SourceType sourceType, String content) {
this.sourceType = sourceType;
this.content = content;
}
SourceType getSourceType() {
return sourceType;
}
}
// endregion
public static void main(String[] args) throws InterruptedException {
final List<Feed>[] currentList = new List[]{new ArrayList()};
// Simulate refresh
refreshContent(currentList[0])
.subscribe(feeds -> {
currentList[0] = feeds;
for (int i = 0; i < currentList[0].size(); i++) {
System.out.println(currentList[0].get(i).content);
}
});
Thread.sleep(2000);
System.out.println();
// Simulate refresh
refreshContent(currentList[0])
.subscribe(feeds -> {
currentList[0] = feeds;
for (int i = 0; i < currentList[0].size(); i++) {
System.out.println(currentList[0].get(i).content);
}
});
Thread.sleep(2000);
}
private static Observable<List<Feed>> refreshContent(@NotNull List<Feed> currentFeed) {
return Observable.fromIterable(getSourceTypes())
.observeOn(Schedulers.io())
// Get List<Feed> forEach sourceType
.concatMap(ReactiveMultipleSources::getFeedItemsBySourceType)
.observeOn(Schedulers.computation())
// Get list of "List of Feed for sourceType", = List<List<Feed>>
.toList()
.map(lists -> {
for (List<Feed> list : lists) {
SourceType sourceType = list.get(0).getSourceType();
// Remove items of currentFeed whose sourceType has new List<Feed>
currentFeed.removeIf(temp -> temp.getSourceType() == sourceType);
// Add new items
currentFeed.addAll(list);
}
return currentFeed;
})
.toObservable();
}
// region Helper
private static List<SourceType> getSourceTypes() {
return new ArrayList<>(Arrays.asList(SourceType.values()));
}
private static Observable<List<Feed>> getFeedItemsBySourceType(SourceType sourceType) {
String content;
if (sourceType == SourceType.TYPE_ARTICLE)
content = "article ";
else if (sourceType == SourceType.TYPE_MESSAGE)
content = "message ";
else if (sourceType == SourceType.TYPE_VIDEO)
content = "video ";
else
content = "article ";
Feed feed1 = new Feed(sourceType, content + createRandomInt());
Feed feed2 = new Feed(sourceType, content + createRandomInt());
Feed feed3 = new Feed(sourceType, content + createRandomInt());
Feed feed4 = new Feed(sourceType, content + createRandomInt());
return Observable.just(Arrays.asList(feed1, feed2, feed3, feed4));
}
// For simulating different items each time List<Feed> is required
private static int createRandomInt() {
return ThreadLocalRandom.current().nextInt(0, 21);
}
// endregion
}
示例输出:
article 19
article 15
article 18
article 18
message 3
message 2
message 9
message 1
video 19
video 17
video 18
video 11
article 0
article 4
article 18
article 15
message 11
message 16
message 16
message 4
video 1
video 7
video 20
video 2
我正在考虑将我的 (Android) 应用程序中的一些逻辑转换为使用 RxJava,但是我正在努力想出一种方法来执行一些更高级的逻辑。
用例如下:我想向用户展示一种Feed。 Feed 由来自不同来源的项目组成,例如消息、文章等。由于 API 限制,应用程序本身必须收集各个来源并将它们显示在一个列表中。
例如,假设我在 Feed 中的项目如下:
class FeedItem {
Type feedItem; // Type of the item, e.g. article, message, etc.
...
}
目前,提要建立在一个单独的线程上,当提要更新时,UI 会使用侦听器通知。为了让您了解它是如何完成的,下面是一些(伪)Java 代码(为清楚起见,省略了线程和其他管理代码)。
class FeedProducer {
List<FeedItem> currentData = new ArrayList();
public void refreshData() {
for (FeedSource source: getFeedSources()) {
Type sourceType = source.getType();
// Remove existing items
currentData.removeIf(item -> item.feedItem.equals(sourceType));
List<FeedItem> newItems = source.produceItems();
// Add the new items
currentData.addAll(newItems);
// Notify the UI things have changed
notifyDataChanged(currentData);
}
// Notify the UI we are done loading
notifyLoadingComplete();
}
}
每次用户想要刷新数据时都会调用此方法refreshData()
。这样,可以只更新一些源,而其他源保持不变(例如,通过更改 getFeedSources()
的 return 值)。
这些来源也在应用程序的其他部分单独使用;我在那里将它们转换为 Observables。这使事情变得容易得多,例如如果数据库发生变化,变化会被 Observable 简单地推送到 UI。
因此,我的问题是如何(优雅地)将这些 Observable 源合并到一个 Observable 中,但其中存在先前结果的 "global" 状态。我研究了各种组合运算符,但没有找到我需要的。如果我忽略了一些明显的东西,我深表歉意,因为我对 RxJava.
还很陌生如果您有 3 个单独的任务 return [0, 1]
、[10, 11]
和 [20, 21]
,您想将它们合并到一个列表中。在这种情况下,您可以使用zip
操作。
public class TestRx {
public static void main(String[] args) {
// some individual observables.
Observable<List<Integer>> observable1 = Observable.just(Arrays.asList(0, 1));
Observable<List<Integer>> observable2 = Observable.just(Arrays.asList(10, 11));
Observable<List<Integer>> observable3 = Observable.just(Arrays.asList(20, 21));
Observable.zip(observable1, observable2, observable3,
new Func3<List<Integer>, List<Integer>, List<Integer>, List<Integer>>() {
@Override
public List<Integer> call(List<Integer> list1, List<Integer> list2, List<Integer> list3) {
// TODO: Remove existing items
// merge all lists
List<Integer> mergedList = new ArrayList<>();
mergedList.addAll(list1);
mergedList.addAll(list2);
mergedList.addAll(list3);
return mergedList;
}
})
.subscribe(new Observer<List<Integer>>() {
@Override
public void onNext(List<Integer> mergedList) {
System.out.println(mergedList);
// TODO: notifyDataChanged(mergedList)
}
@Override
public void onError(Throwable throwable) {
System.out.println(throwable.toString());
// TODO: handle exceptions
}
@Override
public void onCompleted() {
// TODO: notifyLoadingComplete()
}
});
}
}
结果,它打印成这样[0, 1, 10, 11, 20, 21]
。
天真的选项是,调用者存储最后一个列表并在您请求新数据时将其作为参数提供:
public class ReactiveMultipleSources {
// region Classes
public enum SourceType {
TYPE_ARTICLE,
TYPE_MESSAGE,
TYPE_VIDEO
}
public static class Feed {
private SourceType sourceType;
private String content;
Feed(SourceType sourceType, String content) {
this.sourceType = sourceType;
this.content = content;
}
SourceType getSourceType() {
return sourceType;
}
}
// endregion
public static void main(String[] args) throws InterruptedException {
final List<Feed>[] currentList = new List[]{new ArrayList()};
// Simulate refresh
refreshContent(currentList[0])
.subscribe(feeds -> {
currentList[0] = feeds;
for (int i = 0; i < currentList[0].size(); i++) {
System.out.println(currentList[0].get(i).content);
}
});
Thread.sleep(2000);
System.out.println();
// Simulate refresh
refreshContent(currentList[0])
.subscribe(feeds -> {
currentList[0] = feeds;
for (int i = 0; i < currentList[0].size(); i++) {
System.out.println(currentList[0].get(i).content);
}
});
Thread.sleep(2000);
}
private static Observable<List<Feed>> refreshContent(@NotNull List<Feed> currentFeed) {
return Observable.fromIterable(getSourceTypes())
.observeOn(Schedulers.io())
// Get List<Feed> forEach sourceType
.concatMap(ReactiveMultipleSources::getFeedItemsBySourceType)
.observeOn(Schedulers.computation())
// Get list of "List of Feed for sourceType", = List<List<Feed>>
.toList()
.map(lists -> {
for (List<Feed> list : lists) {
SourceType sourceType = list.get(0).getSourceType();
// Remove items of currentFeed whose sourceType has new List<Feed>
currentFeed.removeIf(temp -> temp.getSourceType() == sourceType);
// Add new items
currentFeed.addAll(list);
}
return currentFeed;
})
.toObservable();
}
// region Helper
private static List<SourceType> getSourceTypes() {
return new ArrayList<>(Arrays.asList(SourceType.values()));
}
private static Observable<List<Feed>> getFeedItemsBySourceType(SourceType sourceType) {
String content;
if (sourceType == SourceType.TYPE_ARTICLE)
content = "article ";
else if (sourceType == SourceType.TYPE_MESSAGE)
content = "message ";
else if (sourceType == SourceType.TYPE_VIDEO)
content = "video ";
else
content = "article ";
Feed feed1 = new Feed(sourceType, content + createRandomInt());
Feed feed2 = new Feed(sourceType, content + createRandomInt());
Feed feed3 = new Feed(sourceType, content + createRandomInt());
Feed feed4 = new Feed(sourceType, content + createRandomInt());
return Observable.just(Arrays.asList(feed1, feed2, feed3, feed4));
}
// For simulating different items each time List<Feed> is required
private static int createRandomInt() {
return ThreadLocalRandom.current().nextInt(0, 21);
}
// endregion
}
示例输出:
article 19
article 15
article 18
article 18
message 3
message 2
message 9
message 1
video 19
video 17
video 18
video 11
article 0
article 4
article 18
article 15
message 11
message 16
message 16
message 4
video 1
video 7
video 20
video 2