在 RxJava 中倒带一个 Observable?

Rewinding an Observable in RxJava?

我一直在将 RxJava 和响应式编程应用于命令行应用程序。我一直在发现一些非常棒的设计模式,它们真正简化了命令行界面的代码。

但是,有一件事让我很头疼。我经常沿着操作符链发出值,我 concatMap() 一个 Observable.create() 来征求用户对发出值的输入。

这很容易,直到我想提供一种返回到上一次发射的方法。它开始变得非常混乱,尽管我付出了努力,但我知道这并不能 100% 正确地工作。我希望 "BACK" 的用户输入尝试通过从缓存中请求它来倒回一次发射。是否有更好的运营商或变压器来倒带链中先前点的排放并将它们重新发送回来?

public class Test {

    public static void main(String[] args) {

        final Scanner scanner = new Scanner(System.in);
        final List<String> cache = new ArrayList<>();
        final AtomicInteger cursorIndex = new AtomicInteger(-1);


        Observable.just(
                "What is your name?",
                "What is your quest?",
                "What is your favorite color?",
                "What is the capital of Assyria?",
                "What is the velocity of an unladen swallow?"
        )
        .doOnNext(cache::add)
        .concatMap(q -> {
            int cursor = cursorIndex.get();
            if (cursor >= 0 && cursor < cache.size() - 2) {
                return Observable.just(cache.get(cursor));
            } else {
                cursorIndex.incrementAndGet();
                return Observable.just(q);
            }
        })
        .doOnNext(System.out::println)
        .concatMap(q -> Observable.create(s -> {
                    String answer = scanner.nextLine().trim();

                    if (answer.toUpperCase().equals("BACK")) {
                        cursorIndex.decrementAndGet();
                    }
                    s.onNext(answer);
                    s.onCompleted();
                })
        ).filter(s -> !s.equals("BACK"))
         .subscribe(q -> System.out.println("You said: " + q));

    }
}

    What is your name?
    Sir Lancelot
    You said: Sir Lancelot
    What is your quest?
    To seek the holy grail
    You said: To seek the holy grail
    What is your favorite color?
    BACK
    What is your quest?

我想如果 Observable 是有限的并且排放量是可控的(如果每个排放量都需要用户输入,情况应该如此),那么有一种方法。您可以将其收集到 toList() 中并从该列表中创建一个新的 Observable。然后您可以控制迭代并使用布尔标志随时发出倒带信号。

public class Test {

    public static void main(String[] args) {

        final Scanner scanner = new Scanner(System.in);
        final AtomicBoolean rewind = new AtomicBoolean();

        Observable.just(
                "What is your name?",
                "What is your quest?",
                "What is your favorite color?",
                "What is the capital of Assyria?",
                "What is the velocity of an unladen swallow?"
        ).toList().concatMap(l -> Observable.create(s -> {
            for (int i = 0; i < l.size(); i++) {
                s.onNext(l.get(i));
                if (rewind.getAndSet(false) &&  i >= 1) {
                    i = i - 2;
                }
            }
            s.onCompleted();
        }))
        .doOnNext(System.out::println)
        .concatMap(q -> Observable.create(s -> {
                    String answer = scanner.nextLine().trim();

                    if (answer.toUpperCase().equals("BACK")) {
                        rewind.set(true);
                    }
                    s.onNext(answer);
                    s.onCompleted();
                })
        ).filter(s -> !s.equals("BACK")).subscribe(q -> System.out.println("You said: " + q));

    }
}

What is your name?
Sir Lancelot of Camelot
You said: Sir Lancelot of Camelot
What is your quest?
To seek the Holy Grail
You said: To seek the Holy Grail
What is your favorite color?
Blue
You said: Blue
What is the capital of Assyria?
Asher
You said: Ashur
What is the velocity of an unladen swallow?
BACK
What is the capital of Assyria?

我要做的是将索引作为 BehaviorSubject,并在我必须在问题之间切换时调用 onNext:

Scanner scanner = new Scanner(System.in);

String[] questions = {
        "What is your name?",
        "What is your quest?",
        "What is your favorite color?",
        "What is the capital of Assyria?",
        "What is the velocity of an unladen swallow?"
};

String[] cache = new String[questions.length];

BehaviorSubject<Integer> index = BehaviorSubject.create(0);

index
.observeOn(Schedulers.trampoline())
.concatMap(idx -> {
    if (idx == questions.length) {
        index.onCompleted();
        return Observable.empty();
    }

    System.out.println(questions[idx]);
    String answer = scanner.nextLine().trim();

    if ("BACK".equals(answer)) {
        index.onNext(Math.max(0, idx - 1));
        return Observable.empty();
    } else
    if ("QUIT".equals(answer)) {
        index.onCompleted();
        return Observable.empty();
    }

    cache[idx] = answer;
    index.onNext(idx + 1);
    return Observable.just(answer);
})
.subscribe(v -> System.out.println("You said: " + v));