RxJava 使用 Observable 和 take 运算符读取文件时出现问题
RxJava Problem with reading a file with Observable and take operator
我的工作环境是JDK1.6和RxJava 2
我想制作一个 Observable,它发出一个项目,该项目是通过 BufferedReader 读取的文件行字符串,如下所示:
...
Observable<String> fileLineObservable = Observable.defer(new Callable<String>(){
return new ObservableSource<String> call() throws Exception {
return new ObservableSource<String>() {
public void subscribe(Observer<String> observer) {
BufferedReader reader = null;
try {
reader = new BufferedReader(new FileReader(filePath));
String line = null;
while ((line = reader.readLine()) != null) {
observer.onNext(line);
}
observer.onComplete();
... catching exception and close reader
}
}
}
}
});
我还想制作一个 Observer,它使用一个 take(count) 运算符来观察上述 Observable,如下所示:
fileLineObservable.take(2)
.subscribe(new Consumer<String>() {
public void onNext(String line) {
... do something with the file line string
}
});
我在执行上面的代码时遇到了NullPointerException,我知道为什么了。 NPE是由于第二次调用onNext导致在TakeObserver实例上执行onComplete,在onComplete方法内部,调用了未set(null)的upstream.dispose。 TakeObserver 的上游变量应该在它订阅 Observable 时用 onSubscribe(Disposable disposable) 设置。
我该如何解决这个问题?我应该实现自己的 Disposable class 来设置 TakeObserver 的上游吗?
这个解决方案怎么样?
Observable<String> observableFile2(Path path) {
return Observable.using(
() -> Files.newBufferedReader(path),
reader -> {
return Observable.fromIterable(() -> {
return new Iterator<>() {
private String nextLine = null;
@Override
public boolean hasNext() {
try {
nextLine = reader.readLine();
return nextLine != null;
} catch (Exception ex) {
return false;
}
}
@Override
public String next() {
if (nextLine != null) {
return nextLine;
}
throw new IllegalStateException("nextLine can not be null.");
}
};
});
},
BufferedReader::close
);
}
- Observable#using 确保 BufferedReader 在一次性 / onError 上正确关闭
- Observable#fromIterable 包装 readLine 调用并为我们处理 onComplete。
测试
testImplementation("org.junit.jupiter:junit-jupiter-api:5.6.2")
testRuntimeOnly("org.junit.platform:junit-platform-launcher:1.6.2")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.6.2")
testRuntimeOnly("org.junit.vintage:junit-vintage-engine:5.6.2")
testImplementation("com.google.jimfs:jimfs:1.1")
测试
@Test
void name() {
observableFile2(hello).take(2)
.test()
.assertValues("line0", "line1")
.assertComplete();
}
@Test
void name2() {
observableFile2(hello).take(10)
.test()
.assertValues("line0", "line1", "line2", "line3")
.assertComplete();
}
@Test
void name3() {
observableFile2(hello2)
.test()
.assertComplete();
}
我的工作环境是JDK1.6和RxJava 2
我想制作一个 Observable,它发出一个项目,该项目是通过 BufferedReader 读取的文件行字符串,如下所示:
...
Observable<String> fileLineObservable = Observable.defer(new Callable<String>(){
return new ObservableSource<String> call() throws Exception {
return new ObservableSource<String>() {
public void subscribe(Observer<String> observer) {
BufferedReader reader = null;
try {
reader = new BufferedReader(new FileReader(filePath));
String line = null;
while ((line = reader.readLine()) != null) {
observer.onNext(line);
}
observer.onComplete();
... catching exception and close reader
}
}
}
}
});
我还想制作一个 Observer,它使用一个 take(count) 运算符来观察上述 Observable,如下所示:
fileLineObservable.take(2)
.subscribe(new Consumer<String>() {
public void onNext(String line) {
... do something with the file line string
}
});
我在执行上面的代码时遇到了NullPointerException,我知道为什么了。 NPE是由于第二次调用onNext导致在TakeObserver实例上执行onComplete,在onComplete方法内部,调用了未set(null)的upstream.dispose。 TakeObserver 的上游变量应该在它订阅 Observable 时用 onSubscribe(Disposable disposable) 设置。
我该如何解决这个问题?我应该实现自己的 Disposable class 来设置 TakeObserver 的上游吗?
这个解决方案怎么样?
Observable<String> observableFile2(Path path) {
return Observable.using(
() -> Files.newBufferedReader(path),
reader -> {
return Observable.fromIterable(() -> {
return new Iterator<>() {
private String nextLine = null;
@Override
public boolean hasNext() {
try {
nextLine = reader.readLine();
return nextLine != null;
} catch (Exception ex) {
return false;
}
}
@Override
public String next() {
if (nextLine != null) {
return nextLine;
}
throw new IllegalStateException("nextLine can not be null.");
}
};
});
},
BufferedReader::close
);
}
- Observable#using 确保 BufferedReader 在一次性 / onError 上正确关闭
- Observable#fromIterable 包装 readLine 调用并为我们处理 onComplete。
测试
testImplementation("org.junit.jupiter:junit-jupiter-api:5.6.2")
testRuntimeOnly("org.junit.platform:junit-platform-launcher:1.6.2")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.6.2")
testRuntimeOnly("org.junit.vintage:junit-vintage-engine:5.6.2")
testImplementation("com.google.jimfs:jimfs:1.1")
测试
@Test
void name() {
observableFile2(hello).take(2)
.test()
.assertValues("line0", "line1")
.assertComplete();
}
@Test
void name2() {
observableFile2(hello).take(10)
.test()
.assertValues("line0", "line1", "line2", "line3")
.assertComplete();
}
@Test
void name3() {
observableFile2(hello2)
.test()
.assertComplete();
}