RxJava 和 rx.exceptions.MissingBackpressureException 异常
RxJava and rx.exceptions.MissingBackpressureException exception
我正在尝试让一个非常基本的基于 RxJava 的应用程序运行。我定义了以下 Observable class,它从文件中读取 returns 行:
public Observable<String> getObservable() throws IOException
{
return Observable.create(subscribe -> {
InputStream in = getClass().getResourceAsStream("/trial.txt");
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
String line = null;
try {
while((line = reader.readLine()) != null)
{
subscribe.onNext(line);
}
} catch (IOException e) {
subscribe.onError(e);
}
finally {
subscribe.onCompleted();
}
});
}
接下来我定义了订阅者代码:
public static void main(String[] args) throws IOException, InterruptedException {
Thread thread = new Thread(() ->
{
RxObserver observer = new RxObserver();
try {
observer.getObservable()
.observeOn(Schedulers.io())
.subscribe( x ->System.out.println(x),
t -> System.out.println(t),
() -> System.out.println("Completed"));
} catch (IOException e) {
e.printStackTrace();
}
});
thread.start();
thread.join();
}
该文件有近 50000 条记录。当 运行 我得到的应用 "rx.exceptions.MissingBackpressureException"。我已经阅读了一些文档,并且按照建议,我尝试在调用链中添加“.onBackpressureBuffer()”方法。但是后来我没有得到异常,但完成的调用也没有被解雇。
处理我们有一个快速生成的 Observable 的场景的正确方法是什么?
这里的问题是 observeOn 运算符,因为每个 Observer 的 onNext() 调用都被安排在一个单独的线程上调用,所以您的 Observable 会继续在循环中产生这些预定的调用,无论订户 (observeOn) 容量。
如果你保持同步,Observable 将不会发出下一个元素,直到订阅者完成前一个元素,因为它都是在一个线程上完成的,你不会再有背压问题。
如果您仍想使用 observeOn,您将必须在 Observable 的 OnSubscribe#call 方法中实现背压逻辑
第一个问题是您的 readLine 逻辑忽略了背压。您可以在 observeOn
之前应用 onBackpressureBuffer()
开始,但最近添加了 SyncOnSubscribe
让您一个一个地生成值并处理背压:
SyncOnSubscribe.createSingleState(() => {
try {
InputStream in = getClass().getResourceAsStream("/trial.txt");
return new BufferedReader(new InputStreamReader(in));
} catch (IOException ex) {
throw new RuntimeException(ex);
}
},
(s, o) -> {
try {
String line = s.readLine();
if (line == null) {
o.onCompleted();
} else {
o.onNext(line);
}
} catch (IOException ex) {
s.onError(ex);
}
},
s -> {
try {
s.close();
} catch (IOException ex) {
}
});
第二个问题是你的线程将在 io 线程上的所有元素都被传递之前完成,因此主程序退出。删除 observeOn
、添加 .toBlocking
或使用 CountDownLatch
.
RxObserver observer = new RxObserver();
try {
CountDownLatch cdl = new CountDownLatch(1);
observer.getObservable()
.observeOn(Schedulers.io())
.subscribe( x ->System.out.println(x),
t -> { System.out.println(t); cdl.countDown(); },
() -> { System.out.println("Completed"); cdl.countDown(); });
cdl.await();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
我正在尝试让一个非常基本的基于 RxJava 的应用程序运行。我定义了以下 Observable class,它从文件中读取 returns 行:
public Observable<String> getObservable() throws IOException
{
return Observable.create(subscribe -> {
InputStream in = getClass().getResourceAsStream("/trial.txt");
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
String line = null;
try {
while((line = reader.readLine()) != null)
{
subscribe.onNext(line);
}
} catch (IOException e) {
subscribe.onError(e);
}
finally {
subscribe.onCompleted();
}
});
}
接下来我定义了订阅者代码:
public static void main(String[] args) throws IOException, InterruptedException {
Thread thread = new Thread(() ->
{
RxObserver observer = new RxObserver();
try {
observer.getObservable()
.observeOn(Schedulers.io())
.subscribe( x ->System.out.println(x),
t -> System.out.println(t),
() -> System.out.println("Completed"));
} catch (IOException e) {
e.printStackTrace();
}
});
thread.start();
thread.join();
}
该文件有近 50000 条记录。当 运行 我得到的应用 "rx.exceptions.MissingBackpressureException"。我已经阅读了一些文档,并且按照建议,我尝试在调用链中添加“.onBackpressureBuffer()”方法。但是后来我没有得到异常,但完成的调用也没有被解雇。
处理我们有一个快速生成的 Observable 的场景的正确方法是什么?
这里的问题是 observeOn 运算符,因为每个 Observer 的 onNext() 调用都被安排在一个单独的线程上调用,所以您的 Observable 会继续在循环中产生这些预定的调用,无论订户 (observeOn) 容量。
如果你保持同步,Observable 将不会发出下一个元素,直到订阅者完成前一个元素,因为它都是在一个线程上完成的,你不会再有背压问题。
如果您仍想使用 observeOn,您将必须在 Observable 的 OnSubscribe#call 方法中实现背压逻辑
第一个问题是您的 readLine 逻辑忽略了背压。您可以在 observeOn
之前应用 onBackpressureBuffer()
开始,但最近添加了 SyncOnSubscribe
让您一个一个地生成值并处理背压:
SyncOnSubscribe.createSingleState(() => {
try {
InputStream in = getClass().getResourceAsStream("/trial.txt");
return new BufferedReader(new InputStreamReader(in));
} catch (IOException ex) {
throw new RuntimeException(ex);
}
},
(s, o) -> {
try {
String line = s.readLine();
if (line == null) {
o.onCompleted();
} else {
o.onNext(line);
}
} catch (IOException ex) {
s.onError(ex);
}
},
s -> {
try {
s.close();
} catch (IOException ex) {
}
});
第二个问题是你的线程将在 io 线程上的所有元素都被传递之前完成,因此主程序退出。删除 observeOn
、添加 .toBlocking
或使用 CountDownLatch
.
RxObserver observer = new RxObserver();
try {
CountDownLatch cdl = new CountDownLatch(1);
observer.getObservable()
.observeOn(Schedulers.io())
.subscribe( x ->System.out.println(x),
t -> { System.out.println(t); cdl.countDown(); },
() -> { System.out.println("Completed"); cdl.countDown(); });
cdl.await();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}