RxJava 2、Retrofit 2 + Retrolambda - 将 2 个请求链接在一起

RxJava 2, Retrofit 2 + Retrolambda - chaining 2 requests together

第一个 API 调用 return 一个元素列表,然后我想随后用 String return 调用另一个 API来自第一个 API 调用的列表元素。我(想我)已经知道了,所以它会调用列表中每个元素的第二个 API 调用,但我不确定如何订阅它以从第二个调用中获取结果 returned .

discogsService.getSearchResults(searchTerm, mContext.getString(R.string.token))
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io())
                // Turns the result into individual elements
                .flatMapIterable(RootSearchResponse::getSearchResults)
                // I believe this then calls .getRelease() with each ID string
                .map(result -> discogsService.getRelease(result.getId()));

改造界面:

public interface DiscogsService
{
    @GET("database/search?")
    Observable<RootSearchResponse> getSearchResults(@Query("q") String searchTerm, @Query("token") String token);

    @GET("releases/")
    Observable<Release> getRelease(@Query("release_id") String releaseId);
}

我不确定从这里到哪里去。

我相信 .subscribe(...) 让我能够从每个 .getRelease(...) 中获得 Observable<Release> returned。由于在模型层中调用了上述方法,因此我需要在此模型层中设置一个订阅者以传递回 Presenter,然后在 Presenter 中设置一个额外的订阅者来处理每个 Observable 因为 Presenter 可以访问 View

有没有办法让我可以 return 来自模型层的每个 Observable,这样我就不需要有两个单独的 .subscribe(...)?或者我应该使用两个单独的 .subscribe(...)s 因为我可以在它们两个上捕获错误?我只想要第二次调用的结果。

这是我试过的完整代码:

模型中:

discogsService.getSearchResults(searchTerm, mContext.getString(R.string.token))
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io())
                .flatMapIterable(RootSearchResponse::getSearchResults)
                .subscribeOn(Schedulers.io())
                .map(result -> discogsService.getRelease(result.getId()))
                .subscribe(new Observer<Observable<Release>>()
                {
                    @Override
                    public void onSubscribe(Disposable d)
                    {

                    }

                    @Override
                    public void onNext(Observable<Release> value)
                    {
                        mainPresenter.addToRecyclerView(value);
                    }

                    @Override
                    public void onError(Throwable e)
                    {

                    }

                    @Override
                    public void onComplete()
                    {

                    }
                });

在演示者中:

@Override
public void addToRecyclerView(Observable<Release> value)
{
    value       .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io())
                .subscribe(new Observer<Release>()
                {
                    @Override
                    public void onSubscribe(Disposable d)
                    {

                    }

                    @Override
                    public void onNext(Release value)
                    {
                        Log.e(TAG, "Success! " + value);
                        results.add(value);
                    }

                    @Override
                    public void onError(Throwable e)
                    {
                        Log.e(TAG, "Error: " + e.toString());
                        Log.e(TAG, "Error: " + e.toString());
                    }

                    @Override
                    public void onComplete()
                    {

                    }
                });

我宁愿在模型级别公开 Observable<Release>

Observable<Release> getReleases(...) {
    return discogsService.getSearchResults(...)
        .flatMapIterable(RootSearchResponse::getSearchResults)
        .flatMap(result -> discogsService.getRelease(result.getId()));
}

演示者只需订阅即可:

getReleases
    .observeOn(AndroidSchedulers.mainThread())
    .subscribeOn(Schedulers.io())
    .subscribe(new Observer<Release>()
    {
        @Override
        public void onSubscribe(Disposable d)
        {

        }

        @Override
        public void onNext(Release value)
        {
            Log.e(TAG, "Success! " + value);
            results.add(value);
        }

        @Override
        public void onError(Throwable e)
        {
            Log.e(TAG, "Error: " + e.toString());
            Log.e(TAG, "Error: " + e.toString());
        }

        @Override
        public void onComplete()
        {

        }
    });

只有一个 Observable。请注意 getReleases(...) 中的第二个请求从 map() 切换到 flatMap()。在幕后,这是第二次订阅发生的地方。

最终订阅将收到来自两个请求的错误。我更喜欢让消费者 (Presenter) 处理错误,因为它是关心响应并知道在出现错误时该怎么做的人(例如显示一条消息)。

是 'drive' Observable 的创建者、处置者,所以分配线程恕我直言也是他的职责。

Observable 非常适合从一层暴露到另一层。它描述了数据类型、如何使用它和模式(Observable?Single?Flowable?)。