如何从文件系统中获取文件属性流?

How to get Streams of File Attributes from the FileSystem?

我正在编写一个 Web 服务器,并试图确保我尽可能高效,最大限度地减少文件系统调用。问题是 return 流式传输的方法,例如 java.nio.file.Files.list return a Stream of Paths, and I would like to have a Stream of BasicFileAttributes, so that I can return the creation time and update time for each Path (on say returning results for an LDP Container).

当然,一个简单的解决方案是 map Stream 的每个元素都有一个函数,该函数采用路径和 returns 文件属性 (p: Path) => Files.getAttributeView... 但这听起来像是为每个路径调用 FS,这似乎是一种浪费,因为要获取文件信息,JDK 不能远离属性信息。

我实际上是从 2009 OpenJDK mailing list 那里看到这封邮件的,邮件中说他们已经讨论过添加一个 API,这将 return 一对路径和属性...

我在 JDK java.nio.file.FileTreeWalker 上发现了一个非 public class,它有一个 api 允许获取属性 FileTreeWalker.Event。这实际上利用了 sun.nio.fs.BasicFileAttributesHolder 允许路径保留属性的缓存。但它不是 public 并且不清楚它在哪里工作。

当然还有完整的 FileVisitor API,它具有 return 两个 PathBasicFileAttributes 的方法,如下所示:

public FileVisitResult visitFile(Path file, BasicFileAttributes attr) {...}

所以我正在寻找是否有一种方法可以将其转换为 Stream,它尊重 Reactive Manifesto that was pushed by Akka, without it hogging too many resources. I checked the open source Alpakka File 项目的背压原则,但它也在流式传输 Files 方法 return Paths ...

如果有什么方法可以直接获取这些属性,我不知道。

关于将 FileVisitor API 转换为反应流:

reactive streams backpressure 的机制是一个 pull-push 模型,其中需求首先由下游(拉动部分)发出信号,然后上游被允许发送的项目不超过需求发出的信号(推动部分) ).

FileVisitor 的问题API 是无法直接连接这种控制流机制。一旦你启动它,它就会调用你的回调,而不关心其他任何事情。

没有干净的方法来桥接这个,但是你可以做到这一点的一种方法是使用 Source.queue (https://doc.akka.io/docs/akka/current/stream/operators/Source/queue.html) 将 API 与你的流的其余部分隔离开来,比如那:

val queue = 
Source.queue[BasiFileAttributes](bufferSize, OverflowStrategy.backpressure)
      //the rest of your Akka Streams pipeline
      .run(system);

这将进入队列,您现在可以将其传递给 FileVisitor。您 offer 到该队列的任何内容都将顺流而下。如果当您 offer 没有需求并且队列已满时,offer 返回的 Future 将不会完成,直到队列中有 space 为止。所以在 API 你可以简单地做:

//inside the FileVisitor API callback
Await.result(queue.offer(attrs), Duration.Inf)

这会在流被背压时阻塞回调线程。丑陋但与世隔绝。

您可以使用接受 BiPredicate 并在测试每个路径时存储值的 Files.find 访问文件属性及其路径。

BiPredicate 中的副作用操作将启用对两个对象的操作,而无需触及路径中每个项目的文件系统。使用您的谓词条件 yourPred,下面的副作用 predicate 将收集属性供您在流处理中检索:

public static void main(String[] args) throws IOException {
    Path dir = Path.of(args[0]);

    // Use `ConcurrentHashMap` if using `stream.parallel()`
    HashMap <Path,BasicFileAttributes> attrs = new HashMap<>();

    BiPredicate<Path, BasicFileAttributes> yourPred = (p,a) -> true;

    BiPredicate<Path, BasicFileAttributes> predicate = (p,a) -> {
        return yourPred.test(p, a)
                // && p.getNameCount() == dir.getNameCount()+1 // Simulates Files.list
                && attrs.put(p, a) == null;
    };
    try(var stream = Files.find(dir, Integer.MAX_VALUE, predicate)) {
        stream.forEach(p-> System.out.println(p.toString()+" => "+attrs.get(p)));
        // Or: if your put all your handling code in the predicate use stream.count();
    }
}

要模拟 File.list 的效果,请使用一级 find 扫描器:

 BiPredicate<Path, BasicFileAttributes> yourPred = (p,a) -> p.getNameCount() == dir.getNameCount()+1;

对于大型文件夹扫描,您应该通过在使用路径后插入 attrs.remove(p); 来清理 attrs 映射。

编辑

上面的答案可以重构为 Map.Entry<Path, BasicFileAttributes> 的 3 行调用 returning 流,或者很容易添加 class/record 来保存 Path/BasicFileAttribute 对而 return Stream<PathInfo> 改为:

/**
 * Call Files.find() returning a stream with both Path+BasicFileAttributes
 * as type Map.Entry<Path, BasicFileAttributes>
 * <p>Could declare a specific record to replace Map.Entry as:
 *    record PathInfo(Path path, BasicFileAttributes attr) { };
 */
public static Stream<Map.Entry<Path, BasicFileAttributes>>
find(Path dir, int maxDepth, BiPredicate<Path, BasicFileAttributes> matcher, FileVisitOption... options) throws IOException {

    HashMap <Path,BasicFileAttributes> attrs = new HashMap<>();
    BiPredicate<Path, BasicFileAttributes> predicate = (p,a) -> (matcher == null || matcher.test(p, a)) && attrs.put(p, a) == null;

    return Files.find(dir, maxDepth, predicate, options).map(p -> Map.entry(p, attrs.remove(p)));
}

从 DuncG 的回答开始,我得到了以下在 Scala3 中工作的非常通用的 Akka Stream class。这实际上非常巧妙,因为它产生了 Files.find 函数的副作用,它立即封装回一个干净的功能反应流。

class DirectoryList(
    dir: Path, 
    matcher: (Path, BasicFileAttributes) => Boolean = (p,a) => true, 
    maxDepth: Int = 1
) extends GraphStage[SourceShape[(Path,BasicFileAttributes)]]:
    import scala.jdk.FunctionConverters.*
    import scala.jdk.OptionConverters.*
    
    val out: Outlet[(Path,BasicFileAttributes)] = Outlet("PathAttributeSource")
    override val shape = SourceShape(out)


    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
        new GraphStageLogic(shape) {
            private var next: (Path,BasicFileAttributes) = _

            def append(path: Path, att: BasicFileAttributes): Boolean = 
                val matched = matcher(path,att)
                if matched then next = (path,att)
                matched
            
            private val pathStream = Files.find(dir, maxDepth, append.asJava)
            private val sit = pathStream.iterator()
            
            setHandler(out, new OutHandler {
                override def onPull(): Unit = { 
                    if sit.hasNext then
                        sit.next()
                        push(out,next)
                    else
                        pathStream.close()  
                        complete(out)
                }

                override def onDownstreamFinish(cause: Throwable): Unit =
                    pathStream.close()  
                    super.onDownstreamFinish(cause)
            })
        }
end DirectoryList

然后可以按如下方式使用:

val sourceGraph = DirectoryList(Path.of("."),depth=10)
val result = Source.fromGraph(sourceGraph).map{ (p: Path,att: BasicFileAttributes) => 
        println(s"received <$p> : dir=${att.isDirectory}")}.run()

完整的源代码is here on github and an initial test here。 也许可以通过调整答案来改进它,以便批量传递一定数量的路径属性对。