observeOn 的放置给了我不同的行为
Placement of observeOn gives me different behavior
当我 运行 以下片段时,我没有看到背压。
public static void main(String[] args) throws InterruptedException {
MyFileProcessor pro = new MyFileProcessor();
Timer t = new Timer();
t.start();
Disposable x = pro
.generateFlowable(
new File("path\to\file.raw"))
.subscribeOn(Schedulers.io(), false).observeOn(Schedulers.io()).map(y -> {
System.out.println(Thread.currentThread().getName() + " xxx");
return y;
})
.subscribe(onNext -> {
System.out.println(Thread.currentThread().getName() + " " + new String(onNext));
Thread.sleep(100);
}, Throwable::printStackTrace, () -> {
System.out.println("Done");
t.end();
System.out.println(t.getTotalTime());
});
Thread.sleep(1000000000);
}
当我 运行 上面的 class 时,我得到
的交替行
RxCachedThreadScheduler-1 xxx
RxCachedThreadScheduler-1 Line1
....
它使用相同的线程。
现在,当我将 observeOn 移动到订阅之前时,我看到了一堆
RxCachedThreadScheduler-1 xxx
后面跟着一堆
RxCachedThreadScheduler-1 Line1
我假设这是背压,但使用的线程仍然相同。
为什么我会看到这种行为?
为什么只使用了一个线程?
没有可供 observeOn 运行的运算符,那么为什么我会看到这种行为?
[编辑]
public Flowable<byte[]> generateFlowable(File file) {
return Flowable.generate(() -> new BufferedInputStream(new FileInputStream(file)), (bufferedIs, output) -> {
try {
byte[] data = getMessageRawData(bufferedIs);
if (data != null)
output.onNext(data);
else
output.onComplete();
}
catch (Exception e) {
output.onError(e);
}
return bufferedIs;
}, bufferedIs -> {
try {
bufferedIs.close();
}
catch (IOException ex) {
RxJavaPlugins.onError(ex);
}
});
}
Why is only one thread being utilized?
工作正常,因为您检查了 运行 线程 after observeOn
,因此您应该在此处和下方看到相同的线程,无论如何发生在它上面。 subscribeOn
影响 generateFlowable
,我想,您不会打印当前线程,因此您看不到它在不同的 IO 线程上运行。
Now when I move the observeOn to just before the subscribe
除非 generateFlowable
.
中发生奇怪的事情,否则应该没有任何区别
当我 运行 以下片段时,我没有看到背压。
public static void main(String[] args) throws InterruptedException {
MyFileProcessor pro = new MyFileProcessor();
Timer t = new Timer();
t.start();
Disposable x = pro
.generateFlowable(
new File("path\to\file.raw"))
.subscribeOn(Schedulers.io(), false).observeOn(Schedulers.io()).map(y -> {
System.out.println(Thread.currentThread().getName() + " xxx");
return y;
})
.subscribe(onNext -> {
System.out.println(Thread.currentThread().getName() + " " + new String(onNext));
Thread.sleep(100);
}, Throwable::printStackTrace, () -> {
System.out.println("Done");
t.end();
System.out.println(t.getTotalTime());
});
Thread.sleep(1000000000);
}
当我 运行 上面的 class 时,我得到
的交替行RxCachedThreadScheduler-1 xxx
RxCachedThreadScheduler-1 Line1
....
它使用相同的线程。
现在,当我将 observeOn 移动到订阅之前时,我看到了一堆
RxCachedThreadScheduler-1 xxx
后面跟着一堆
RxCachedThreadScheduler-1 Line1
我假设这是背压,但使用的线程仍然相同。
为什么我会看到这种行为?
为什么只使用了一个线程?
没有可供 observeOn 运行的运算符,那么为什么我会看到这种行为?
[编辑]
public Flowable<byte[]> generateFlowable(File file) {
return Flowable.generate(() -> new BufferedInputStream(new FileInputStream(file)), (bufferedIs, output) -> {
try {
byte[] data = getMessageRawData(bufferedIs);
if (data != null)
output.onNext(data);
else
output.onComplete();
}
catch (Exception e) {
output.onError(e);
}
return bufferedIs;
}, bufferedIs -> {
try {
bufferedIs.close();
}
catch (IOException ex) {
RxJavaPlugins.onError(ex);
}
});
}
Why is only one thread being utilized?
工作正常,因为您检查了 运行 线程 after observeOn
,因此您应该在此处和下方看到相同的线程,无论如何发生在它上面。 subscribeOn
影响 generateFlowable
,我想,您不会打印当前线程,因此您看不到它在不同的 IO 线程上运行。
Now when I move the observeOn to just before the subscribe
除非 generateFlowable
.