使用 RxJava 的线程流水线

Thread pipelining with RxJava

RxJava 大师们,你们大放异彩的机会来了!

你能保证下面的程序不会抛出 IllegalStateException 只改变 main() 方法中以 Flowable.generate() 开头的 RxJava 管道吗?

class ExportJob {
    private static Scheduler singleThread(String threadName) {
        return Schedulers.from(newFixedThreadPool(1, r -> {
            Thread t = new Thread(r, threadName);
            t.setDaemon(true);
            return t;
        }));
    }

    public static void main(String[] args) {
        Scheduler genSched = singleThread("genThread");
        Scheduler mapSched = singleThread("mapThread");
        // execute on "genThread"
        Flowable.generate(ExportJob::infiniteGenerator)
                .subscribeOn(genSched, false)
                // execute on "mapThread"
                .observeOn(mapSched, false)
                .concatMapMaybe(ExportJob::mapping)
                // execute on the thread that creates the pipeline, block it until finished
                .blockingForEach(ExportJob::terminal);
    }

    private static int nb;
    /** Must execute on "genThread" thread. */
    private static void infiniteGenerator(Emitter<Integer> emitter) {
        print(nb, "infiniteGenerator");
        emitter.onNext(nb++);
        checkCurrentThread("genThread");
    }

    /** Must execute on "mapThread" thread. */
    private static Maybe<Integer> mapping(Integer s) {
        print(s, "mapping");
        checkCurrentThread("mapThread");
        return Maybe.just(s);
    }

    /** Must execute on "terminal" thread. */
    private static void terminal(Integer s) {
        print(s, "terminal");
        checkCurrentThread("main");
    }

    private static void print(int item, String method) {
        System.out.format("%d - %s - %s()%n", item, Thread.currentThread().getName(), method);
    }

    private static void checkCurrentThread(String expectedThreadName) throws IllegalStateException {
        String name = Thread.currentThread().getName();
        if (!name.equals(expectedThreadName)) {
            throw new IllegalStateException("Thread changed from '" + expectedThreadName + "' to '" + name + "'");
        }
    }
}

您必须使用 subscribeOn(scheduler, true) 以便请求也被路由回它们预期的线程:

Flowable.generate(ExportJob::infiniteGenerator)
            .subscribeOn(genSched, true)  // <------------------------------
            // execute on "mapThread"
            .observeOn(mapSched, false)
            .concatMapMaybe(ExportJob::mapping)
            .subscribeOn(mapSched, true)  // <------------------------------
            .blockingForEach(ExportJob::terminal);