Android RxJava 并行化 - 使用 ExecutorService 计算

Android RxJava parallelization - computing with ExecutorService

我已经用 RxJava 成功完成了一个小 Java 程序。代码是:

    public static void main( String[] args ) {
        int threadCt = Runtime.getRuntime().availableProcessors() + 1;
        //multi-threading
        ExecutorService executor = Executors.newFixedThreadPool(threadCt);
        Scheduler scheduler = Schedulers.from(executor);

        final AtomicInteger batch = new AtomicInteger(0);

        Observable.range(1,80)
            .groupBy(i -> batch.getAndIncrement() % threadCt )
            .flatMap(g -> g.observeOn(scheduler)
                    .map(i -> intenseCalculation(i))
            ).subscribe(System.out::println);
    }



    public static int intenseCalculation(int i) {
       try {
          System.out.println("Calculating " + i +
                " on " + Thread.currentThread().getName());
          Thread.sleep(500);
          return i;
       } catch (InterruptedException e) {
        throw new RuntimeException(e);
       }
    }

使用此代码一切正常。现在我试图将此代码传递给 Android:

        Scheduler scheduler = Schedulers.from(executor);

        final AtomicInteger batch = new AtomicInteger(0);
        Observable.range(0, copiedCategories.size() - 1)
                .groupBy(i -> batch.getAndIncrement() % threadCt)
                .flatMap(g -> g.observeOn(scheduler))
                .map(i -> intenseCalculation(i))
                .subscribe(finishedListener::finished);

在 finished() 方法中,我正在更新 GUI(finishedListener 是当前 Activity 正在实现的接口)。

我在 map(i -> intenseCalculation(i)):

行中遇到错误
no instance(s) of type variable(s) exist so that void conforms to R

在 build.gradle 中(对于应用程序)我正在使用:

compile 'io.reactivex:rxjava:1.2.9'

我该如何解决这个问题?

在 java 代码中你有:

.flatMap(g -> g.observeOn(scheduler)
               .map(i -> intenseCalculation(i))
        )

但在 Android 代码中,您正在对主流执行 map()

.flatMap(g -> g.observeOn(scheduler))
.map(i -> intenseCalculation(i))

所以,这些不是一回事,你在 Android 代码中有额外的括号,这完全改变了流逻辑。

注意,你应该在UI线程上执行UI相关操作,所以你必须在订阅之前执行observeOn(AndroidSchedulers.mainThread())流,否则你会崩溃。