我可以并行化由 flatMap 驱动的操作吗?
Can I parallelise an operation driven by a flatMap?
假设我有一个类似于下面代码的方法,其中一个列表被平面映射到各个字符串,每个字符串都应用了一些昂贵的操作。有什么方法可以并行化昂贵的操作,就像我在 Java 8 中使用 parallelStream() 一样?
final List<String> names = new ArrayList<String>() {{
add("Ringo");
add("John");
add("Paul");
add("George");
}};
Observable.just(names).subscribeOn(Schedulers.io())
.flatMap(new Func1<List<String>, Observable<String>>() {
@Override
public Observable<String> call(final List<String> names) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (String name : names) {
subscriber.onNext(name);
}
}
});
}
})
.map(new Func1<String, String>() {
@Override
public String call(String s) {
//Simulate expensive operation
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s.toUpperCase();
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.v("RXExample", s + " on " + Thread.currentThread().getName());
}
});
为了完成,应用答案中推荐的更改如下所示并且效果很好!
final List<String> names = new ArrayList<String>() {{
add("Ringo");
add("John");
add("Paul");
add("George");
}};
Observable.just(names).subscribeOn(Schedulers.io())
.flatMap(new Func1<List<String>, Observable<String>>() {
@Override
public Observable<String> call(final List<String> names) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(final Subscriber<? super String> subscriber) {
for (final String name : names) {
Observable
.just(name)
.subscribeOn(Schedulers.from(Executors.newFixedThreadPool(5)))
.map(new Func1<String, String>() {
@Override
public String call(String s) {
//Simulate expensive operation
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s.toUpperCase();
}
}).subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
subscriber.onNext(name);
}
});
}
}
});
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.v("RXExample", s + " on " + Thread.currentThread().getName());
}
});
您可以使用 flatMap 并行处理,如下例所示。我正在使用 RxJava2 进行测试。
如需进一步说明,请阅读此处的 flatMap 用法:http://tomstechnicalblog.blogspot.de/2015/11/rxjava-achieving-parallelization.html
@Test
public void name() throws Exception {
final List<String> names = new ArrayList<String>() {{
add("Ringo");
add("John");
add("Paul");
add("George");
}};
Observable<String> stringObservable = Observable.fromIterable(names)
.flatMap(s -> {
return longWork(s).doOnNext(s1 -> {
printCurrentThread(s1);
}).subscribeOn(Schedulers.newThread());
});
TestObserver<String> test = stringObservable.test();
test.awaitDone(2_000, TimeUnit.MILLISECONDS).assertValueCount(4);
}
private Observable<String> longWork(String s) throws InterruptedException {
return Observable.fromCallable(() -> {
Thread.sleep(1_000);
return s;
});
}
private void printCurrentThread(String additional) {
System.out.println(additional + "_" + Thread.currentThread());
}
假设我有一个类似于下面代码的方法,其中一个列表被平面映射到各个字符串,每个字符串都应用了一些昂贵的操作。有什么方法可以并行化昂贵的操作,就像我在 Java 8 中使用 parallelStream() 一样?
final List<String> names = new ArrayList<String>() {{
add("Ringo");
add("John");
add("Paul");
add("George");
}};
Observable.just(names).subscribeOn(Schedulers.io())
.flatMap(new Func1<List<String>, Observable<String>>() {
@Override
public Observable<String> call(final List<String> names) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (String name : names) {
subscriber.onNext(name);
}
}
});
}
})
.map(new Func1<String, String>() {
@Override
public String call(String s) {
//Simulate expensive operation
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s.toUpperCase();
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.v("RXExample", s + " on " + Thread.currentThread().getName());
}
});
为了完成,应用答案中推荐的更改如下所示并且效果很好!
final List<String> names = new ArrayList<String>() {{
add("Ringo");
add("John");
add("Paul");
add("George");
}};
Observable.just(names).subscribeOn(Schedulers.io())
.flatMap(new Func1<List<String>, Observable<String>>() {
@Override
public Observable<String> call(final List<String> names) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(final Subscriber<? super String> subscriber) {
for (final String name : names) {
Observable
.just(name)
.subscribeOn(Schedulers.from(Executors.newFixedThreadPool(5)))
.map(new Func1<String, String>() {
@Override
public String call(String s) {
//Simulate expensive operation
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s.toUpperCase();
}
}).subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
subscriber.onNext(name);
}
});
}
}
});
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.v("RXExample", s + " on " + Thread.currentThread().getName());
}
});
您可以使用 flatMap 并行处理,如下例所示。我正在使用 RxJava2 进行测试。
如需进一步说明,请阅读此处的 flatMap 用法:http://tomstechnicalblog.blogspot.de/2015/11/rxjava-achieving-parallelization.html
@Test
public void name() throws Exception {
final List<String> names = new ArrayList<String>() {{
add("Ringo");
add("John");
add("Paul");
add("George");
}};
Observable<String> stringObservable = Observable.fromIterable(names)
.flatMap(s -> {
return longWork(s).doOnNext(s1 -> {
printCurrentThread(s1);
}).subscribeOn(Schedulers.newThread());
});
TestObserver<String> test = stringObservable.test();
test.awaitDone(2_000, TimeUnit.MILLISECONDS).assertValueCount(4);
}
private Observable<String> longWork(String s) throws InterruptedException {
return Observable.fromCallable(() -> {
Thread.sleep(1_000);
return s;
});
}
private void printCurrentThread(String additional) {
System.out.println(additional + "_" + Thread.currentThread());
}