RxJava 2 - 如何链接异步调用

RxJava 2 - How to chain asyncronous calls

我最近才开始学习 RxJava 所以如果我问的是一个新手问题,请不要责备我,但我已经花了几天时间试图解决这个问题,但没有成功。我已经阅读了几乎所有我能找到的文档,并且我遵循了 http://reactivex.io/tutorials.html 上的大部分教程。我搜索了 Whosebug 和互联网的其他部分,但显然我似乎是这个星球上唯一遇到这个问题的人。这很奇怪,因为它本质上归结为每个软件都必须做的事情:登录用户。

我找到的所有教程都是关于在流上应用一些函数来创建一个有用且很棒的新流的,请不要误会我的意思,但对我来说并不是很有帮助。这也让我开始思考……也许我做错了。但我现在陷入如此深的困境,并且还遵循 "everything is a stream" 为什么那不可能?

的咒语

这就是我要做的事情:

  1. 表示某种形式的加载
  2. 调用 Completable 在某些服务器上执行登录操作
  3. 调用 Single 在某些服务器上执行创建用户操作,其中 returns 本地引用的用户 ID
  4. 在下一个操作中使用 Single 调用的结果并隐藏加载

尽管我会在 Android 上结束这个,但我创建了一个基本的 Java 8 示例来概述我想要实现的目标。

这是我到目前为止想出的:

备注:

这里是代码的可运行版本:

public static void main(final String[] args) {
    getMainStream()
        .doOnNext(__ -> showLoading())
        .flatMap(__ -> loginUser().toObservable())
        .flatMap(__ -> createUser().toObservable())
        .doOnNext(userId -> {
            hideLoading();
            System.out.println("userId: " + userId);
        })
        .subscribe();
}

public static Completable loginUser() {
    return Completable.create(new CompletableOnSubscribe() {
        @Override
        public void subscribe(final CompletableEmitter e) throws Exception {
            Thread.sleep(500);
            System.out.println("loginUser");
            e.onComplete();
        }
    });
}

public static Single<String> createUser() {
    return Single.<String>create(new SingleOnSubscribe<String>() {
        @Override
        public void subscribe(final SingleEmitter<String> e) throws Exception {
            Thread.sleep(1000);
            System.out.println("createUser");
            e.onSuccess("some_user_id");
        }
    });
}

public static Completable getCompletable(final String input) {
    return Completable.create(new CompletableOnSubscribe() {
        @Override
        public void subscribe(final CompletableEmitter e) throws Exception {
            Thread.sleep(750);
            System.out.println("completable, input=" + input);
            e.onComplete();
        }
    });
}

public static Observable<Object> getMainStream() {
    return Observable.just(new Object());
}

private static void hideLoading() {
    System.out.println("hideLoading()");
}

private static void showLoading() {
    System.out.println("showLoading()");
}

控制台输出为:

showLoading()
loginUser

不幸的是,登录用户永远不会returns?!

我真的很期待关于这个主题的任何帮助!

谢谢!!!

loginUser() 是一个 Completable,当您将 Completable 转换为 Observable 时,效果是 observable 将完成。因此,createUser().

没有要对下游值进行操作的

您可以考虑将表达式更改为 loginUser().andThen( () -> createUser().toObservable(),这会导致发出字符串。