如何从 Observable 手动请求事件?

How to manually request event from Observable?

我正在使用 RxJavaRetrofit 来创建一个应用程序,我有这样一个场景,在我的视图中,一个项目列表从服务器加载并显示给用户。用户可以通过单击刷新按钮来刷新数据。以下是我使用 MVP 的场景的粗略实现。

// This is my retrofit adapter service
public interface ApiService{
    @GET("items");
    Observable<JsonElement> getItemsList();
}

// This is my presnter
public class MyPresnter{
    ...
    public Observable<List<Item>> loadItemsList(){
        return apiService.getItemsList().map(jsonElement -> {
            // here I convert my api response to a List of Items
            return new ArrayList<Item>();
        });
    }
}

// And inside my view I use RxBinding to bind my button click events
RxView.clicks(refreshBtn).subscribe(view -> mPresenter.loadItemsList()
                                            .subscribeOn(Schdulers.io())
                                            .observeOn(AndroidSchedulers.mainThread())
                                            .subscribe(list -> {/* update view */})
                                   );

现在我的问题是,当用户第一次进入视图时,我应该如何在不单击 refreshBtn 的情况下请求数据?我应该使用 refreshBtn.performOnClick() 吗?
或者有没有办法在已经订阅的可观察对象上手动请求事件?

我猜你可以调用 performOnClick,前提是你事先设置了 RxView.clicks()

通常,您可能希望合并到一个 Subject 中,它也可以触发操作:

PublishSubject<Object> manualRefresh = PublishSubject.create();

RxView.clicks(refreshBtn)
.cast(Object.class)
.mergeWith(manualRefresh.onBackpressureLatest())
.switchMap(v -> mPresenter.loadItemsList()
                .subscribeOn(Schdulers.io())
                .observeOn(AndroidSchedulers.mainThread())
)
.subscribe(list -> /* update */);

manualRefresh.onNext("Now!");

要回答这个问题,我们需要质疑流的逻辑。通常 不好的做法 在前一个的订阅方法中订阅一个新的可观察对象。在某些情况下,可能没有更好的解决方法,但这至少表明需要进行调查。流的组合是 RxJava 最大的优势之一。

这里的逻辑表明存在一系列事件(点击和手动调用),您希望通过订阅另一个流来响应这些事件。有一个运算符具有这种确切的行为,称为 flatMap.

多亏了 flatMap 运算符,我们已经可以在 某事 发生时做出反应:

refreshEventStream
    .flatMap(object -> {
        return mPresenter.loadItemsList().subscribeOn(Schdulers.io())
    })
    .subscribe(list -> /* update UI */)

现在,我们需要弄清楚 something 究竟代表什么。如您所述,目前有两种调用刷新的方法。通过点击事件和手动刷新。换句话说,我们希望将这些事件合并到一个流中。同样,我们可以使用一个名为 mergeWith 的漂亮而闪亮的运算符。但是,对于手动调用,我们需要创建 Subject ( http://reactivex.io/documentation/subject.html ),它既像 Observable 又像 Subscriber。

 PublishSubject<Void> refreshSubject = PublishSubject.create()
 refreshEventStream = refreshSubject
                          .mergeWith(Rx.clicks(refreshBtn).throttleFirst(500, TimeUnit.MILLISECONDS))

 refreshEventStream
    .flatMap(object -> {
        return mPresenter.loadItemsList().subscribeOn(Schdulers.io())
    })
    .subscribe(list -> /* update UI */)

我在点击流中添加了 throttleFirst 运算符,因为您可能不想让用户向这样的请求发送垃圾邮件:-) 它唯一做的就是从该流中获取一个事件并在再次对其做出反应之前在指定的时间内忽略其余的排放量。