RxJava 使用 Observable 和 take 运算符读取文件时出现问题

RxJava Problem with reading a file with Observable and take operator

我的工作环境是JDK1.6和RxJava 2

我想制作一个 Observable,它发出一个项目,该项目是通过 BufferedReader 读取的文件行字符串,如下所示:

...
Observable<String> fileLineObservable = Observable.defer(new Callable<String>(){
    return new ObservableSource<String> call() throws Exception {
        return new ObservableSource<String>() {
            public void subscribe(Observer<String> observer) {
                BufferedReader reader = null;
                try {
                    reader = new BufferedReader(new FileReader(filePath));
                    String line = null;
                    while ((line = reader.readLine()) != null) {
                        observer.onNext(line);
                    }
                    observer.onComplete();

                    ... catching exception and close reader
                }
            }
        }
    }
});

我还想制作一个 Observer,它使用一个 take(count) 运算符来观察上述 Observable,如下所示:

fileLineObservable.take(2)
                  .subscribe(new Consumer<String>() {
                       public void onNext(String line) {
                           ... do something with the file line string
                       }
                  });

我在执行上面的代码时遇到了NullPointerException,我知道为什么了。 NPE是由于第二次调用onNext导致在TakeObserver实例上执行onComplete,在onComplete方法内部,调用了未set(null)的upstream.dispose。 TakeObserver 的上游变量应该在它订阅 Observable 时用 onSubscribe(Disposable disposable) 设置。

我该如何解决这个问题?我应该实现自己的 Disposable class 来设置 TakeObserver 的上游吗?

这个解决方案怎么样?

Observable<String> observableFile2(Path path) {
        return Observable.using(
                () -> Files.newBufferedReader(path),
                reader -> {
                    return Observable.fromIterable(() -> {
                        return new Iterator<>() {
                            private String nextLine = null;

                            @Override
                            public boolean hasNext() {
                                try {
                                    nextLine = reader.readLine();
                                    return nextLine != null;
                                } catch (Exception ex) {
                                    return false;
                                }
                            }

                            @Override
                            public String next() {
                                if (nextLine != null) {
                                    return nextLine;
                                }
                                throw new IllegalStateException("nextLine can not be null.");
                            }
                        };
                    });
                },
                BufferedReader::close
        );
    }
  • Observable#using 确保 BufferedReader 在一次性 / onError 上正确关闭
  • Observable#fromIterable 包装 readLine 调用并为我们处理 onComplete。

测试

testImplementation("org.junit.jupiter:junit-jupiter-api:5.6.2")
testRuntimeOnly("org.junit.platform:junit-platform-launcher:1.6.2")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.6.2")
testRuntimeOnly("org.junit.vintage:junit-vintage-engine:5.6.2")
testImplementation("com.google.jimfs:jimfs:1.1")

测试

@Test
void name() {
    observableFile2(hello).take(2)
            .test()
            .assertValues("line0", "line1")
            .assertComplete();
}

@Test
void name2() {
    observableFile2(hello).take(10)
            .test()
            .assertValues("line0", "line1", "line2", "line3")
            .assertComplete();
}

@Test
void name3() {
    observableFile2(hello2)
            .test()
            .assertComplete();
}