如何从 InputStream 创建基于行的可观察对象?

How to create a line-based observable from InputStream?

抱歉这个基本问题... 我有一个函数,它接受一个包含文件内容的 InputStream 和 returns 一个对象列表,比方说 Person。

输入文件的每一行都有一个人,所以我想逐行解析。 没什么难的,但是...这次我想使用响应式编程。

类似于:

public List<Person> parse(final InputStream is) throws IOException {
    return
    //create an observable wich will split the input in many lines, "\n"
            .map(Person::new)
            .collect(toList());
}

我错过了注释的步骤,即:创建一个不是基于字节而是基于行的可观察对象。

您可以使用 BufferedReaderlines 方法创建 Stringstream:

Returns a Stream, the elements of which are lines read from this BufferedReader.

使用与此类似的代码:

Stream<String> lines = new BufferedReader(new InputStreamReader(is, cs)).lines();

所以你的代码应该是这样的:

public List<Person> parse(final InputStream is) throws IOException {
    CharSet cs = ... // Use the right charset for your file
    Stream<String> lines = new BufferedReader(new InputStreamReader(is, cs)).lines();
    return  lines
            .map(Person::new)   
            .collect(toList());
}  

根据您的代码,您不需要实现 Observable of lines,因为您之后想使用 Java 的 Streams。您不能从 Rx 返回到 java 流。

但是如果你想完全基于 Observables,你可以从这个开始:

        try (BufferedReader reader = new BufferedReader(new InputStreamReader(is))) {
            Observable<String> linesObs = Observable.from(reader.lines()::iterator);
            // In rxjava v2:
            // Observable<String> linesObs = Observable.fromIterable(reader.lines()::iterator);
            ...
        }