如何异步 运行 这个 Observable?
How to run This Observable asynchronously?
我有以下模仿阻塞 IO 操作的服务:
public class StudentService{
public List<Student> getStudentAslist(){
Thread.sleep(10000);
.....
return students;
}
}
现在我正在尝试 运行 使用 Java 8 环境 FutureTask<T>
中的此服务:
StudentService studentService = new StudentService();
FutureTask<List<Student>> future = new FutureTask<>(()-> studentService.getStudentAslist());
Worker worker = Schedulers.io().createWorker();
worker.schedule(()->future.run());
Observable<Student> observable2 = Observable.from(future).flatMap(Observable::from);
observable2.filter((s)-> s.getAge()>40).subscribe(student->System.out.println(student));
System.out.println("end");
但是最后的打印 "end" 只在 10 秒后打印,所以看起来我是完全同步的,
我怎样才能运行它反应?
我想你要找的是.subscribeOn
FutureTask<List<String>> future = new FutureTask<>(this::getStudentAslist);
Scheduler.Worker worker = Schedulers.io().createWorker();
worker.schedule(future::run);
Observable.from(future)
.subscribeOn(Schedulers.computation())
.flatMapIterable(s -> s)
.subscribe(System.out::println);
System.out.println("marker");
TimeUnit.SECONDS.sleep(3);
供参考:
public List<String> getStudentAslist(){
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return Collections.singletonList("finished");
}
您可以使用 defer 将所有工作移至 Observable
FutureTask<List<String>> future = new FutureTask<>(this::getStudentAslist);
Observable<List<Student>> studentsObservable = Observable
.defer(() -> {
future.run();
return Observable.from(future);
});
然后使用subscribeOn来决定哪个线程将用于这个Observable
studentsObservable
.flatMapIterable(students -> students)
.filter((s) -> s.getAge() > 40)
.subscribeOn(Schedulers.computation())
.subscribe(student -> System.out.println(student));
我有以下模仿阻塞 IO 操作的服务:
public class StudentService{
public List<Student> getStudentAslist(){
Thread.sleep(10000);
.....
return students;
}
}
现在我正在尝试 运行 使用 Java 8 环境 FutureTask<T>
中的此服务:
StudentService studentService = new StudentService();
FutureTask<List<Student>> future = new FutureTask<>(()-> studentService.getStudentAslist());
Worker worker = Schedulers.io().createWorker();
worker.schedule(()->future.run());
Observable<Student> observable2 = Observable.from(future).flatMap(Observable::from);
observable2.filter((s)-> s.getAge()>40).subscribe(student->System.out.println(student));
System.out.println("end");
但是最后的打印 "end" 只在 10 秒后打印,所以看起来我是完全同步的,
我怎样才能运行它反应?
我想你要找的是.subscribeOn
FutureTask<List<String>> future = new FutureTask<>(this::getStudentAslist);
Scheduler.Worker worker = Schedulers.io().createWorker();
worker.schedule(future::run);
Observable.from(future)
.subscribeOn(Schedulers.computation())
.flatMapIterable(s -> s)
.subscribe(System.out::println);
System.out.println("marker");
TimeUnit.SECONDS.sleep(3);
供参考:
public List<String> getStudentAslist(){
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return Collections.singletonList("finished");
}
您可以使用 defer 将所有工作移至 Observable
FutureTask<List<String>> future = new FutureTask<>(this::getStudentAslist);
Observable<List<Student>> studentsObservable = Observable
.defer(() -> {
future.run();
return Observable.from(future);
});
然后使用subscribeOn来决定哪个线程将用于这个Observable
studentsObservable
.flatMapIterable(students -> students)
.filter((s) -> s.getAge() > 40)
.subscribeOn(Schedulers.computation())
.subscribe(student -> System.out.println(student));