使用 RxJava 对数组进行异步 forEach

Asynchronous forEach over an array with RxJava

我正在尝试遍历一组地图并执行一些异步操作。我已经使用 RxJava 库尝试了一些事情,但我尝试过的一切似乎都是同步的。我试图避免手动创建新线程并希望让 RxJava 处理它。这是我到目前为止尝试过的。

Observable.from(new Map[20])
            .subscribeOn(Schedulers.newThread()) 
            .observeOn(Schedulers.computation())
            .forEach(batch -> {
                try {
                    System.out.println(1);
                    Thread.sleep(3000);
                    System.out.println(2);
                } catch (Exception e) {

                }
            });

    Observable.from(new Map[20])
            .subscribeOn(Schedulers.newThread())
            .observeOn(Schedulers.computation())
            .subscribe(batch -> {
                try {
                    System.out.println(1);
                    Thread.sleep(3000);
                    System.out.println(2);
                } catch (Exception e) {

                }
            });

    Observable.from(new Map[20])
            .subscribeOn(Schedulers.newThread())
            .subscribe(batch -> {
                try {
                    System.out.println(1);
                    Thread.sleep(3000);
                    System.out.println(2);
                } catch (Exception e) {

                }
            });

    Observable.from(new Map[20])
            .subscribe(batch -> {
                try {
                    System.out.println(1);
                    Thread.sleep(3000);
                    System.out.println(2);
                } catch (Exception e) {

                }
            });

当我 运行 使用上面的代码进行单元测试时,我看到以下输出。

1
2
1
2
1
2
...

我想看的是

1
1
1
... 
2
2
2

如何使用 RxJava 异步迭代 Map 数组?

你可以实现它从 Observable 变为 Flowable 并使用并行:

        Flowable.fromIterable(array)
                .parallel(3) // number of items in parallel
                .runOn(Schedulers.newThread()) // the desired scheduler
                .map(item -> {
                    try {
                        System.out.println(1);
                        Thread.sleep(3000);
                        System.out.println(2);
                    } catch (Exception e) {

                    }

                    return Completable.complete();
                })
        .sequential().subscribe();

如果您无法使用 RxJava 1.x,那么您将无法访问 Flowable class。这不是我的情况,但类似下面的代码可以执行并行操作。有更多的嵌套,但它有效。

    final ExecutorService executor = Executors.newFixedThreadPool(2);
    List<String> iterableList = new ArrayList<>();
    iterableList.add("one");
    iterableList.add("two");
    iterableList.add("three");
    iterableList.add("4");
    iterableList.add("5");
    iterableList.add("6");
    iterableList.add("7");
    iterableList.add("8");
    iterableList.add("9");
    Observable.from(iterableList)
            .flatMap(val -> Observable.just(val)
                    .subscribeOn(Schedulers.from(executor))
                    .doOnNext(numString -> {
                        try {
                            System.out.println(1);
                            Thread.sleep(500);
                            System.out.println(2);
                        } catch (Exception ex) {
                        }
                    })
            )
            .subscribe();