在 scala 中,是否有一种惯用的方法来处理关闭和重新打开底层流,当它试图用其他流包装时?

In scala, is there an idiomatic way to handle closing and reopening an underlying stream when trying to wrap it with other streams?

我正在尝试打开一个流,尝试将其解压为 gzip,如果失败尝试将其解压为 zlib,然后 return 流以供进一步使用。底层流 必须 在创建包装解压缩流的异常情况下关闭,否则我将 运行 陷入资源耗尽问题。

pbData 是一个标准的、不可重置的 InputStream

必须有更简洁的方法来做到这一点。

    val input = {
      var pb = pbData.open()
      try {
        log.trace("Attempting to create GZIPInputStream")
        new GZIPInputStream(pb)
      } catch {
        case e: ZipException => {
          log.trace("Attempting to create InflaterInputStream")
          pb.close()
          pb = pbData.open()
          try {
            new InflaterInputStream(pb)
          } catch {
            case e: ZipException => {
              pb.close()
              throw e
            }
          }
        }
      }

可能是这样的:

def tryDecompress(pbData: PBData, ds: InputStream => InputStream *): InputStream = {
  def tryIt(s: InputStream, dc: InputStream => InputStream) = try {
    dc(s) 
  } catch { case NonFatal(e) => 
    close(s)
    throw e
  }

  val (first, rest) = ds.head -> ds.tail
  try { 
    tryIt(pbData.open, first) 
  } catch { 
    case _: ZipException if rest.nonEmpty => 
      tryDecompress(pbData, rest)
  }
}

val stream = tryDecompress(pbData, new GZipInputStream(_), new InflaterInputStream(_))

(太糟糕了,scala 的 Try 没有 onFailure ... 如果有,这看起来会好得多:/)

val result: Try[InflaterInputStream] = Try(new GZIPInputStream(pb)) match {
    case res@Success(x) => res
    case Failure(e) => e match {
      case e: ZipException =>
        Try(new InflaterInputStream(pb)) match {
          case res2@Success(x2) => res2
          case Failure(e2) => e match {
            case _ => pb.close()
                      Failure(e2)
          }
        }
      case  ex@_ => pb.close()
                    Failure(ex)
    }
  }

我发现使用 BufferedInputStream 包装底层流,然后在每次解压库尝试之间重置它,看起来很干净。

val bis = new BufferedInputStream(pbData.open())
// allows us to read and reset 16 bytes
bis.mark(16)
val input: InputStream = {
  try {
    log.trace("attempting to open as gzip")
    new GZIPInputStream(bis)
  } catch {
    case e: ZipException => try {
        bis.reset()
        log.trace("attempting to open as zlib")
        new InflaterInputStream(bis)
      } catch {
        case e: ZipException => {
            bis.reset()
            log.trace("attempting to open as uncompressed")
            bis
          }
      }
  }
}

您的过程实际上是对 InputStream 个实例生成器的迭代。检查这个,与许多嵌套的 try-catch'es 相比,更加惯用的解决方案:

val bis = new BufferedInputStream(pbData.open())
// allows us to read and reset 16 bytes, as in your answer
bis.mark(16)

// list of functions, because we need lazy evaluation of streams
val streamGens: List[Any => InputStream] = List(
  _ => new GZIPInputStream(bis),
  _ => new InflaterInputStream(bis),
  _ => bis
)

def firstStreamOf(streamGens: List[Any => InputStream]): Try[InputStream] =
  streamGens match {
    case x :: xs =>
      Try(x()).recoverWith {
        case NonFatal(_) =>
          // reset in case of failure
          bis.reset()
          firstStreamOf(xs)
      }
    case Nil =>
      // shouldn't get to this line because of last streamGens element
      Failure(new Exception)
  }

val tryStream = firstStreamOf(streamGens)

tryStream.foreach { stream =>
  // do something with stream...
}

作为奖励,如果您需要尝试添加更多的流生成器​​,则必须在 streamGens 初始化中添加一行。此外,我们不需要手动添加 bit.reset() 调用。