在 Java 中使用带有反应性包装器的非阻塞 IO 逐行读取文件

Read file line by line using non-blocking IO with reactive wrapper in Java

有什么方法可以逐行读取本地文件而不阻塞任何线程(后台线程池算作阻塞) 使用内置的 CompletableFuture 或类似的反应流库RxJava 还是 Reactor?

(有趣的是,有许多用于 HTTP 和不同数据库(如 Mongo、Redis 等)的非阻塞 IO 库,但我无法找到任何用于简单文件读取的内容。)

还有类似问题:

  • Why FileChannel in Java is not non-blocking?
  • Non-blocking file IO in Java

Java没有通用的非阻塞文件IO的主要原因如下:Java是一门跨平台的语言,但是Unix没有非阻塞访问文件的能力。

如果您为 Windows 编程,则有一个特定于平台的实现 WindowsAsynchronousFileChannelImpl,它使用非阻塞机制。

尽管我从 Alexander 那里得到了问题的答案,但我还是想分享一下我发现的逐行读取文件的最具反应性的方法。两种解决方案都在幕后使用 AsynchronousFileChannel,这并不总是 non-blocking,但仍然可以在反应环境中使用,因为它使用专用线程池进行 IO 工作。

使用来自 Spring Framework 的实用程序(WebFlux 应用程序中的理想选择)

import org.springframework.core.codec.StringDecoder;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;

import java.nio.channels.AsynchronousFileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;

public class AsyncFileRead {
    public Flux<String> lines() {
        StringDecoder stringDecoder = StringDecoder.textPlainOnly();

        return DataBufferUtils.readAsynchronousFileChannel(() -> AsynchronousFileChannel.open(Path.of("test/sample.txt"),
                StandardOpenOption.READ), DefaultDataBufferFactory.sharedInstance, 4096)
            .transform(dataBufferFlux -> stringDecoder.decode(dataBufferFlux, null, null, null));
    }
}

使用 RxIo

import org.javaync.io.AsyncFiles;
import reactor.core.publisher.Flux;

import java.nio.file.Path;

public class AsyncFileRead {
    public Flux<String> lines() {
        return Flux.from(AsyncFiles.lines(Path.of("test/sample.txt")));
    }
}