使用 RxJava 对数组进行异步 forEach
Asynchronous forEach over an array with RxJava
我正在尝试遍历一组地图并执行一些异步操作。我已经使用 RxJava 库尝试了一些事情,但我尝试过的一切似乎都是同步的。我试图避免手动创建新线程并希望让 RxJava 处理它。这是我到目前为止尝试过的。
Observable.from(new Map[20])
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.computation())
.forEach(batch -> {
try {
System.out.println(1);
Thread.sleep(3000);
System.out.println(2);
} catch (Exception e) {
}
});
Observable.from(new Map[20])
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.computation())
.subscribe(batch -> {
try {
System.out.println(1);
Thread.sleep(3000);
System.out.println(2);
} catch (Exception e) {
}
});
Observable.from(new Map[20])
.subscribeOn(Schedulers.newThread())
.subscribe(batch -> {
try {
System.out.println(1);
Thread.sleep(3000);
System.out.println(2);
} catch (Exception e) {
}
});
Observable.from(new Map[20])
.subscribe(batch -> {
try {
System.out.println(1);
Thread.sleep(3000);
System.out.println(2);
} catch (Exception e) {
}
});
当我 运行 使用上面的代码进行单元测试时,我看到以下输出。
1
2
1
2
1
2
...
我想看的是
1
1
1
...
2
2
2
如何使用 RxJava 异步迭代 Map 数组?
你可以实现它从 Observable 变为 Flowable 并使用并行:
Flowable.fromIterable(array)
.parallel(3) // number of items in parallel
.runOn(Schedulers.newThread()) // the desired scheduler
.map(item -> {
try {
System.out.println(1);
Thread.sleep(3000);
System.out.println(2);
} catch (Exception e) {
}
return Completable.complete();
})
.sequential().subscribe();
如果您无法使用 RxJava 1.x,那么您将无法访问 Flowable class。这不是我的情况,但类似下面的代码可以执行并行操作。有更多的嵌套,但它有效。
final ExecutorService executor = Executors.newFixedThreadPool(2);
List<String> iterableList = new ArrayList<>();
iterableList.add("one");
iterableList.add("two");
iterableList.add("three");
iterableList.add("4");
iterableList.add("5");
iterableList.add("6");
iterableList.add("7");
iterableList.add("8");
iterableList.add("9");
Observable.from(iterableList)
.flatMap(val -> Observable.just(val)
.subscribeOn(Schedulers.from(executor))
.doOnNext(numString -> {
try {
System.out.println(1);
Thread.sleep(500);
System.out.println(2);
} catch (Exception ex) {
}
})
)
.subscribe();
我正在尝试遍历一组地图并执行一些异步操作。我已经使用 RxJava 库尝试了一些事情,但我尝试过的一切似乎都是同步的。我试图避免手动创建新线程并希望让 RxJava 处理它。这是我到目前为止尝试过的。
Observable.from(new Map[20])
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.computation())
.forEach(batch -> {
try {
System.out.println(1);
Thread.sleep(3000);
System.out.println(2);
} catch (Exception e) {
}
});
Observable.from(new Map[20])
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.computation())
.subscribe(batch -> {
try {
System.out.println(1);
Thread.sleep(3000);
System.out.println(2);
} catch (Exception e) {
}
});
Observable.from(new Map[20])
.subscribeOn(Schedulers.newThread())
.subscribe(batch -> {
try {
System.out.println(1);
Thread.sleep(3000);
System.out.println(2);
} catch (Exception e) {
}
});
Observable.from(new Map[20])
.subscribe(batch -> {
try {
System.out.println(1);
Thread.sleep(3000);
System.out.println(2);
} catch (Exception e) {
}
});
当我 运行 使用上面的代码进行单元测试时,我看到以下输出。
1
2
1
2
1
2
...
我想看的是
1
1
1
...
2
2
2
如何使用 RxJava 异步迭代 Map 数组?
你可以实现它从 Observable 变为 Flowable 并使用并行:
Flowable.fromIterable(array)
.parallel(3) // number of items in parallel
.runOn(Schedulers.newThread()) // the desired scheduler
.map(item -> {
try {
System.out.println(1);
Thread.sleep(3000);
System.out.println(2);
} catch (Exception e) {
}
return Completable.complete();
})
.sequential().subscribe();
如果您无法使用 RxJava 1.x,那么您将无法访问 Flowable class。这不是我的情况,但类似下面的代码可以执行并行操作。有更多的嵌套,但它有效。
final ExecutorService executor = Executors.newFixedThreadPool(2);
List<String> iterableList = new ArrayList<>();
iterableList.add("one");
iterableList.add("two");
iterableList.add("three");
iterableList.add("4");
iterableList.add("5");
iterableList.add("6");
iterableList.add("7");
iterableList.add("8");
iterableList.add("9");
Observable.from(iterableList)
.flatMap(val -> Observable.just(val)
.subscribeOn(Schedulers.from(executor))
.doOnNext(numString -> {
try {
System.out.println(1);
Thread.sleep(500);
System.out.println(2);
} catch (Exception ex) {
}
})
)
.subscribe();