Android RxJava 并行化 - 使用 ExecutorService 计算
Android RxJava parallelization - computing with ExecutorService
我已经用 RxJava 成功完成了一个小 Java 程序。代码是:
public static void main( String[] args ) {
int threadCt = Runtime.getRuntime().availableProcessors() + 1;
//multi-threading
ExecutorService executor = Executors.newFixedThreadPool(threadCt);
Scheduler scheduler = Schedulers.from(executor);
final AtomicInteger batch = new AtomicInteger(0);
Observable.range(1,80)
.groupBy(i -> batch.getAndIncrement() % threadCt )
.flatMap(g -> g.observeOn(scheduler)
.map(i -> intenseCalculation(i))
).subscribe(System.out::println);
}
public static int intenseCalculation(int i) {
try {
System.out.println("Calculating " + i +
" on " + Thread.currentThread().getName());
Thread.sleep(500);
return i;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
使用此代码一切正常。现在我试图将此代码传递给 Android:
Scheduler scheduler = Schedulers.from(executor);
final AtomicInteger batch = new AtomicInteger(0);
Observable.range(0, copiedCategories.size() - 1)
.groupBy(i -> batch.getAndIncrement() % threadCt)
.flatMap(g -> g.observeOn(scheduler))
.map(i -> intenseCalculation(i))
.subscribe(finishedListener::finished);
在 finished() 方法中,我正在更新 GUI(finishedListener 是当前 Activity 正在实现的接口)。
我在 map(i -> intenseCalculation(i)):
行中遇到错误
no instance(s) of type variable(s) exist so that void conforms to R
在 build.gradle 中(对于应用程序)我正在使用:
compile 'io.reactivex:rxjava:1.2.9'
我该如何解决这个问题?
在 java 代码中你有:
.flatMap(g -> g.observeOn(scheduler)
.map(i -> intenseCalculation(i))
)
但在 Android 代码中,您正在对主流执行 map()
:
.flatMap(g -> g.observeOn(scheduler))
.map(i -> intenseCalculation(i))
所以,这些不是一回事,你在 Android 代码中有额外的括号,这完全改变了流逻辑。
注意,你应该在UI线程上执行UI相关操作,所以你必须在订阅之前执行observeOn(AndroidSchedulers.mainThread())
流,否则你会崩溃。
我已经用 RxJava 成功完成了一个小 Java 程序。代码是:
public static void main( String[] args ) {
int threadCt = Runtime.getRuntime().availableProcessors() + 1;
//multi-threading
ExecutorService executor = Executors.newFixedThreadPool(threadCt);
Scheduler scheduler = Schedulers.from(executor);
final AtomicInteger batch = new AtomicInteger(0);
Observable.range(1,80)
.groupBy(i -> batch.getAndIncrement() % threadCt )
.flatMap(g -> g.observeOn(scheduler)
.map(i -> intenseCalculation(i))
).subscribe(System.out::println);
}
public static int intenseCalculation(int i) {
try {
System.out.println("Calculating " + i +
" on " + Thread.currentThread().getName());
Thread.sleep(500);
return i;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
使用此代码一切正常。现在我试图将此代码传递给 Android:
Scheduler scheduler = Schedulers.from(executor);
final AtomicInteger batch = new AtomicInteger(0);
Observable.range(0, copiedCategories.size() - 1)
.groupBy(i -> batch.getAndIncrement() % threadCt)
.flatMap(g -> g.observeOn(scheduler))
.map(i -> intenseCalculation(i))
.subscribe(finishedListener::finished);
在 finished() 方法中,我正在更新 GUI(finishedListener 是当前 Activity 正在实现的接口)。
我在 map(i -> intenseCalculation(i)):
行中遇到错误no instance(s) of type variable(s) exist so that void conforms to R
在 build.gradle 中(对于应用程序)我正在使用:
compile 'io.reactivex:rxjava:1.2.9'
我该如何解决这个问题?
在 java 代码中你有:
.flatMap(g -> g.observeOn(scheduler)
.map(i -> intenseCalculation(i))
)
但在 Android 代码中,您正在对主流执行 map()
:
.flatMap(g -> g.observeOn(scheduler))
.map(i -> intenseCalculation(i))
所以,这些不是一回事,你在 Android 代码中有额外的括号,这完全改变了流逻辑。
注意,你应该在UI线程上执行UI相关操作,所以你必须在订阅之前执行observeOn(AndroidSchedulers.mainThread())
流,否则你会崩溃。