如何将 Scala 流的内容写入文件?

How to write the contents of a Scala stream to a file?

我有一个要写入文件的 Scala 字节流。流中的数据太多,无法缓冲所有内存。

作为第一次尝试,我创建了一个类似于此的 InputStream

class MyInputStream(data: Stream[Byte]) extends InputStream {
  private val iterator = data.iterator
  override def read(): Int = if (iterator.hasNext) iterator.next else -1
}

然后我用Apache Commons写文件:

val source = new MyInputStream(dataStream)
val target = new FileOutputStream(file)
try {
  IOUtils.copy(source, target)
} finally {
  target.close
}

这可行,但我对性能不太满意。我猜想为每个字节调用 MyInputStream.read 会引入很多开销。有没有更好的方法?

您应该在 InputStream 实现中实现批量读取覆盖:

override def read(b: Array[Byte], off: Int, len: Int)

IOUtils.copy 使用该签名 read/write 4K 块。

我推荐 java.nio.file 包。使用 Files.write,您可以将 ArrayByte 写入由文件名构造的 Path

如何提供 Byte 取决于您。您可以使用 .toArrayStream 转换为 Array,或者您可以一次一个(或少数)从流中取出 take 个字节并将它们转换为数组。

这是演示 .toArray 方法的简单代码块。

import java.nio.file.{Files, Paths}

val filename: String = "output.bin"
val bytes: Stream[Byte] = ...
Files.write(Paths.get(filename), bytes.toArray)

鉴于 StreamIterator 一次读取一个字节可能是瓶颈,我设计了一种将流写入 OutputStream 的方法,该方法不依赖于它并且希望更多高效:

object StreamCopier {
  def copy(data: Stream[Byte], output: OutputStream) = {
    def write(d: Stream[Byte]): Unit = if (d.nonEmpty) {
      val (head, tail) = d.splitAt(4 * 1024)
      val bytes = head.toArray
      output.write(bytes, 0, bytes.length)
      write(tail)
    }
    write(data)
  }
}

编辑: 通过在尾递归 write 函数中用 d 替换 data 修复了一个错误。

此方法通过 splitAt 使用递归方法将流拆分为前 ~4K 和其余部分,将其写入 OutputStream 并在流的尾部递归,直到 splitAt returns 一个空流。

既然你有性能基准,我会留给你来判断结果是否更有效率。

您可能(也可能不会!)误认为读取端是性能问题的根源。可能是因为您正在使用无缓冲的 FileOutputStream(...),强制为写入的每个字节进行单独的系统调用。

这是我的看法,快速简单:

def writeBytes( data : Stream[Byte], file : File ) = {
  val target = new BufferedOutputStream( new FileOutputStream(file) )
  try data.foreach( target.write(_) ) finally target.close
}