在 Java 中使用带有反应性包装器的非阻塞 IO 逐行读取文件
Read file line by line using non-blocking IO with reactive wrapper in Java
有什么方法可以逐行读取本地文件而不阻塞任何线程(后台线程池算作阻塞) 使用内置的 CompletableFuture 或类似的反应流库RxJava 还是 Reactor?
(有趣的是,有许多用于 HTTP 和不同数据库(如 Mongo、Redis 等)的非阻塞 IO 库,但我无法找到任何用于简单文件读取的内容。)
还有类似问题:
- Why FileChannel in Java is not non-blocking?
- Non-blocking file IO in Java
Java没有通用的非阻塞文件IO的主要原因如下:Java是一门跨平台的语言,但是Unix没有非阻塞访问文件的能力。
如果您为 Windows 编程,则有一个特定于平台的实现 WindowsAsynchronousFileChannelImpl,它使用非阻塞机制。
尽管我从 Alexander 那里得到了问题的答案,但我还是想分享一下我发现的逐行读取文件的最具反应性的方法。两种解决方案都在幕后使用 AsynchronousFileChannel
,这并不总是 non-blocking,但仍然可以在反应环境中使用,因为它使用专用线程池进行 IO 工作。
使用来自 Spring Framework 的实用程序(WebFlux 应用程序中的理想选择)
import org.springframework.core.codec.StringDecoder;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
public class AsyncFileRead {
public Flux<String> lines() {
StringDecoder stringDecoder = StringDecoder.textPlainOnly();
return DataBufferUtils.readAsynchronousFileChannel(() -> AsynchronousFileChannel.open(Path.of("test/sample.txt"),
StandardOpenOption.READ), DefaultDataBufferFactory.sharedInstance, 4096)
.transform(dataBufferFlux -> stringDecoder.decode(dataBufferFlux, null, null, null));
}
}
使用 RxIo 库
import org.javaync.io.AsyncFiles;
import reactor.core.publisher.Flux;
import java.nio.file.Path;
public class AsyncFileRead {
public Flux<String> lines() {
return Flux.from(AsyncFiles.lines(Path.of("test/sample.txt")));
}
}
有什么方法可以逐行读取本地文件而不阻塞任何线程(后台线程池算作阻塞) 使用内置的 CompletableFuture 或类似的反应流库RxJava 还是 Reactor?
(有趣的是,有许多用于 HTTP 和不同数据库(如 Mongo、Redis 等)的非阻塞 IO 库,但我无法找到任何用于简单文件读取的内容。)
还有类似问题:
- Why FileChannel in Java is not non-blocking?
- Non-blocking file IO in Java
Java没有通用的非阻塞文件IO的主要原因如下:Java是一门跨平台的语言,但是Unix没有非阻塞访问文件的能力。
如果您为 Windows 编程,则有一个特定于平台的实现 WindowsAsynchronousFileChannelImpl,它使用非阻塞机制。
尽管我从 Alexander 那里得到了问题的答案,但我还是想分享一下我发现的逐行读取文件的最具反应性的方法。两种解决方案都在幕后使用 AsynchronousFileChannel
,这并不总是 non-blocking,但仍然可以在反应环境中使用,因为它使用专用线程池进行 IO 工作。
使用来自 Spring Framework 的实用程序(WebFlux 应用程序中的理想选择)
import org.springframework.core.codec.StringDecoder;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
public class AsyncFileRead {
public Flux<String> lines() {
StringDecoder stringDecoder = StringDecoder.textPlainOnly();
return DataBufferUtils.readAsynchronousFileChannel(() -> AsynchronousFileChannel.open(Path.of("test/sample.txt"),
StandardOpenOption.READ), DefaultDataBufferFactory.sharedInstance, 4096)
.transform(dataBufferFlux -> stringDecoder.decode(dataBufferFlux, null, null, null));
}
}
使用 RxIo 库
import org.javaync.io.AsyncFiles;
import reactor.core.publisher.Flux;
import java.nio.file.Path;
public class AsyncFileRead {
public Flux<String> lines() {
return Flux.from(AsyncFiles.lines(Path.of("test/sample.txt")));
}
}