模式匹配和 RDD

Pattern matching and RDDs

我有一个非常简单的 (n00b) 问题,但不知何故卡住了。我正在尝试使用 wholeTextFiles 读取 Spark 中的一组文件,并希望 return 和 RDD[LogEntry],其中 LogEntry 只是一个案例 class。我想以有效条目的 RDD 结束,我需要使用正则表达式来提取我的案例 class 的参数。当条目无效时,我不希望提取器逻辑失败,而只是在日志中写入一个条目。为此,我使用 LazyLogging。

object LogProcessors extends LazyLogging {

  def extractLogs(sc: SparkContext, path: String, numPartitions: Int = 5): RDD[Option[CleaningLogEntry]] = {

    val pattern = "<some pattern>".r

    val logs = sc.wholeTextFiles(path, numPartitions)
    val entries = logs.map(fileContent => {
      val file = fileContent._1
      val content = fileContent._2
      content.split("\r?\n").map(line => line match {
        case pattern(dt, ev, seq) => Some(LogEntry(<...>))
        case _ => logger.error(s"Cannot parse $file: $line"); None
      })
    })

这给了我一个 RDD[Array[Option[LogEntry]]]。有没有一种巧妙的方法来结束 LogEntrys 的 RDD?我有点想念它。

我正在考虑改用 Try,但我不确定那样是否更好。

非常感谢您的想法。

  1. 要摆脱 Array - 只需将 map 命令替换为 flatMap - flatMap 将处理 Traversable[T] 类型的结果每条记录作为 T.

  2. 类型的单独记录
  3. 去掉 Option - collect 只有成功的:entries.collect { case Some(entry) => entry }。 请注意,此 collect(p: PartialFunction) 重载(执行与 mapfilter 组合等效的操作)与 collect()(将所有数据发送到驱动程序)非常不同。

总而言之,这将是这样的:

def extractLogs(sc: SparkContext, path: String, numPartitions: Int = 5): RDD[CleaningLogEntry] = {

  val pattern = "<some pattern>".r

  val logs = sc.wholeTextFiles(path, numPartitions)
  val entries = logs.flatMap(fileContent => {
    val file = fileContent._1
    val content = fileContent._2
    content.split("\r?\n").map(line => line match {
      case pattern(dt, ev, seq) => Some(LogEntry(<...>))
      case _ => logger.error(s"Cannot parse $file: $line"); None
    })
  })

  entries.collect { case Some(entry) => entry }
}