RxJava 文件和运算符链接

RxJava-file and operator chaining

我正在尝试使用 RxJava-File:

响应式跟踪日志文件
    File file = new File(".\server.log");
    Observable<String> newLines =
            FileObservable.tailer()
                    .file(file)
                    .startPosition(file.length())
                    .sampleTimeMs(1000)
                    .chunkSize(8192)
                    .utf8()
                    .tailText();

    newLines.subscribe(System.out::println);

并且它按预期工作。 但是,一旦我尝试链接更多的运算符,我就会遇到问题。例如,更改为

  newLines.filter(LogfileWatcher::error).subscribe(System.out::println);

(其中 error() 是一个简单函数 String -> Boolean)我仅在第一次追加到文件后才获得输出,但随后的追加则没有。 使用 window() 或其他几个运算符时会出现类似问题。

我做错了什么?

必须在 rxjava-file 中修复背压支持,据报道您的测试用例从 rxjava-file 0.3 开始工作.3 在 Maven Central 上。