如何将 Scala 流的内容写入文件?
How to write the contents of a Scala stream to a file?
我有一个要写入文件的 Scala 字节流。流中的数据太多,无法缓冲所有内存。
作为第一次尝试,我创建了一个类似于此的 InputStream
:
class MyInputStream(data: Stream[Byte]) extends InputStream {
private val iterator = data.iterator
override def read(): Int = if (iterator.hasNext) iterator.next else -1
}
然后我用Apache Commons写文件:
val source = new MyInputStream(dataStream)
val target = new FileOutputStream(file)
try {
IOUtils.copy(source, target)
} finally {
target.close
}
这可行,但我对性能不太满意。我猜想为每个字节调用 MyInputStream.read
会引入很多开销。有没有更好的方法?
您应该在 InputStream 实现中实现批量读取覆盖:
override def read(b: Array[Byte], off: Int, len: Int)
IOUtils.copy
使用该签名 read/write 4K 块。
我推荐 java.nio.file
包。使用 Files.write
,您可以将 Array
或 Byte
写入由文件名构造的 Path
。
如何提供 Byte
取决于您。您可以使用 .toArray
将 Stream
转换为 Array
,或者您可以一次一个(或少数)从流中取出 take
个字节并将它们转换为数组。
这是演示 .toArray
方法的简单代码块。
import java.nio.file.{Files, Paths}
val filename: String = "output.bin"
val bytes: Stream[Byte] = ...
Files.write(Paths.get(filename), bytes.toArray)
鉴于 StreamIterator
一次读取一个字节可能是瓶颈,我设计了一种将流写入 OutputStream
的方法,该方法不依赖于它并且希望更多高效:
object StreamCopier {
def copy(data: Stream[Byte], output: OutputStream) = {
def write(d: Stream[Byte]): Unit = if (d.nonEmpty) {
val (head, tail) = d.splitAt(4 * 1024)
val bytes = head.toArray
output.write(bytes, 0, bytes.length)
write(tail)
}
write(data)
}
}
编辑: 通过在尾递归 write
函数中用 d
替换 data
修复了一个错误。
此方法通过 splitAt
使用递归方法将流拆分为前 ~4K 和其余部分,将其写入 OutputStream
并在流的尾部递归,直到 splitAt
returns 一个空流。
既然你有性能基准,我会留给你来判断结果是否更有效率。
您可能(也可能不会!)误认为读取端是性能问题的根源。可能是因为您正在使用无缓冲的 FileOutputStream(...),强制为写入的每个字节进行单独的系统调用。
这是我的看法,快速简单:
def writeBytes( data : Stream[Byte], file : File ) = {
val target = new BufferedOutputStream( new FileOutputStream(file) )
try data.foreach( target.write(_) ) finally target.close
}
我有一个要写入文件的 Scala 字节流。流中的数据太多,无法缓冲所有内存。
作为第一次尝试,我创建了一个类似于此的 InputStream
:
class MyInputStream(data: Stream[Byte]) extends InputStream {
private val iterator = data.iterator
override def read(): Int = if (iterator.hasNext) iterator.next else -1
}
然后我用Apache Commons写文件:
val source = new MyInputStream(dataStream)
val target = new FileOutputStream(file)
try {
IOUtils.copy(source, target)
} finally {
target.close
}
这可行,但我对性能不太满意。我猜想为每个字节调用 MyInputStream.read
会引入很多开销。有没有更好的方法?
您应该在 InputStream 实现中实现批量读取覆盖:
override def read(b: Array[Byte], off: Int, len: Int)
IOUtils.copy
使用该签名 read/write 4K 块。
我推荐 java.nio.file
包。使用 Files.write
,您可以将 Array
或 Byte
写入由文件名构造的 Path
。
如何提供 Byte
取决于您。您可以使用 .toArray
将 Stream
转换为 Array
,或者您可以一次一个(或少数)从流中取出 take
个字节并将它们转换为数组。
这是演示 .toArray
方法的简单代码块。
import java.nio.file.{Files, Paths}
val filename: String = "output.bin"
val bytes: Stream[Byte] = ...
Files.write(Paths.get(filename), bytes.toArray)
鉴于 StreamIterator
一次读取一个字节可能是瓶颈,我设计了一种将流写入 OutputStream
的方法,该方法不依赖于它并且希望更多高效:
object StreamCopier {
def copy(data: Stream[Byte], output: OutputStream) = {
def write(d: Stream[Byte]): Unit = if (d.nonEmpty) {
val (head, tail) = d.splitAt(4 * 1024)
val bytes = head.toArray
output.write(bytes, 0, bytes.length)
write(tail)
}
write(data)
}
}
编辑: 通过在尾递归 write
函数中用 d
替换 data
修复了一个错误。
此方法通过 splitAt
使用递归方法将流拆分为前 ~4K 和其余部分,将其写入 OutputStream
并在流的尾部递归,直到 splitAt
returns 一个空流。
既然你有性能基准,我会留给你来判断结果是否更有效率。
您可能(也可能不会!)误认为读取端是性能问题的根源。可能是因为您正在使用无缓冲的 FileOutputStream(...),强制为写入的每个字节进行单独的系统调用。
这是我的看法,快速简单:
def writeBytes( data : Stream[Byte], file : File ) = {
val target = new BufferedOutputStream( new FileOutputStream(file) )
try data.foreach( target.write(_) ) finally target.close
}