我可以并行化由 flatMap 驱动的操作吗?

Can I parallelise an operation driven by a flatMap?

假设我有一个类似于下面代码的方法,其中一个列表被平面映射到各个字符串,每个字符串都应用了一些昂贵的操作。有什么方法可以并行化昂贵的操作,就像我在 Java 8 中使用 parallelStream() 一样?

final List<String> names = new ArrayList<String>() {{
        add("Ringo");
        add("John");
        add("Paul");
        add("George");
    }};

    Observable.just(names).subscribeOn(Schedulers.io())
            .flatMap(new Func1<List<String>, Observable<String>>() {
                @Override
                public Observable<String> call(final List<String> names) {
                    return Observable.create(new Observable.OnSubscribe<String>() {
                        @Override
                        public void call(Subscriber<? super String> subscriber) {
                            for (String name : names) {
                                subscriber.onNext(name);
                            }
                        }
                    });
                }
            })
            .map(new Func1<String, String>() {
                @Override
                public String call(String s) {
                    //Simulate expensive operation
                    try {
                        Thread.sleep(6000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return s.toUpperCase();
                }
            }).subscribe(new Subscriber<String>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(String s) {
            Log.v("RXExample", s + " on " + Thread.currentThread().getName());
        }
    });

为了完成,应用答案中推荐的更改如下所示并且效果很好!

final List<String> names = new ArrayList<String>() {{
        add("Ringo");
        add("John");
        add("Paul");
        add("George");
    }};

    Observable.just(names).subscribeOn(Schedulers.io())
            .flatMap(new Func1<List<String>, Observable<String>>() {
                @Override
                public Observable<String> call(final List<String> names) {
                    return Observable.create(new Observable.OnSubscribe<String>() {
                        @Override
                        public void call(final Subscriber<? super String> subscriber) {
                            for (final String name : names) {
                                Observable
                                        .just(name)
                                        .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(5)))
                                        .map(new Func1<String, String>() {
                                            @Override
                                            public String call(String s) {
                                                //Simulate expensive operation
                                                try {
                                                    Thread.sleep(6000);
                                                } catch (InterruptedException e) {
                                                    e.printStackTrace();
                                                }
                                                return s.toUpperCase();
                                            }
                                        }).subscribe(new Observer<String>() {
                                    @Override
                                    public void onCompleted() {

                                    }

                                    @Override
                                    public void onError(Throwable e) {

                                    }

                                    @Override
                                    public void onNext(String s) {
                                        subscriber.onNext(name);
                                    }
                                });
                            }
                        }
                    });
                }
            })
            .subscribe(new Subscriber<String>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(String s) {
            Log.v("RXExample", s + " on " + Thread.currentThread().getName());
        }
    });

您可以使用 flatMap 并行处理,如下例所示。我正在使用 RxJava2 进行测试。

如需进一步说明,请阅读此处的 flatMap 用法:http://tomstechnicalblog.blogspot.de/2015/11/rxjava-achieving-parallelization.html

@Test
public void name() throws Exception {
    final List<String> names = new ArrayList<String>() {{
        add("Ringo");
        add("John");
        add("Paul");
        add("George");
    }};

    Observable<String> stringObservable = Observable.fromIterable(names)
            .flatMap(s -> {
                return longWork(s).doOnNext(s1 -> {
                    printCurrentThread(s1);
                }).subscribeOn(Schedulers.newThread());
            });

    TestObserver<String> test = stringObservable.test();

    test.awaitDone(2_000, TimeUnit.MILLISECONDS).assertValueCount(4);
}

private Observable<String> longWork(String s) throws InterruptedException {
    return Observable.fromCallable(() -> {
        Thread.sleep(1_000);

        return s;
    });
}

private void printCurrentThread(String additional) {
    System.out.println(additional + "_" + Thread.currentThread());
}