模式匹配和 RDD
Pattern matching and RDDs
我有一个非常简单的 (n00b) 问题,但不知何故卡住了。我正在尝试使用 wholeTextFiles
读取 Spark 中的一组文件,并希望 return 和 RDD[LogEntry]
,其中 LogEntry
只是一个案例 class。我想以有效条目的 RDD 结束,我需要使用正则表达式来提取我的案例 class 的参数。当条目无效时,我不希望提取器逻辑失败,而只是在日志中写入一个条目。为此,我使用 LazyLogging。
object LogProcessors extends LazyLogging {
def extractLogs(sc: SparkContext, path: String, numPartitions: Int = 5): RDD[Option[CleaningLogEntry]] = {
val pattern = "<some pattern>".r
val logs = sc.wholeTextFiles(path, numPartitions)
val entries = logs.map(fileContent => {
val file = fileContent._1
val content = fileContent._2
content.split("\r?\n").map(line => line match {
case pattern(dt, ev, seq) => Some(LogEntry(<...>))
case _ => logger.error(s"Cannot parse $file: $line"); None
})
})
这给了我一个 RDD[Array[Option[LogEntry]]]
。有没有一种巧妙的方法来结束 LogEntry
s 的 RDD?我有点想念它。
我正在考虑改用 Try
,但我不确定那样是否更好。
非常感谢您的想法。
要摆脱 Array
- 只需将 map
命令替换为 flatMap
- flatMap 将处理 Traversable[T]
类型的结果每条记录作为 T
.
类型的单独记录
去掉 Option
- collect
只有成功的:entries.collect { case Some(entry) => entry }
。
请注意,此 collect(p: PartialFunction)
重载(执行与 map
和 filter
组合等效的操作)与 collect()
(将所有数据发送到驱动程序)非常不同。
总而言之,这将是这样的:
def extractLogs(sc: SparkContext, path: String, numPartitions: Int = 5): RDD[CleaningLogEntry] = {
val pattern = "<some pattern>".r
val logs = sc.wholeTextFiles(path, numPartitions)
val entries = logs.flatMap(fileContent => {
val file = fileContent._1
val content = fileContent._2
content.split("\r?\n").map(line => line match {
case pattern(dt, ev, seq) => Some(LogEntry(<...>))
case _ => logger.error(s"Cannot parse $file: $line"); None
})
})
entries.collect { case Some(entry) => entry }
}
我有一个非常简单的 (n00b) 问题,但不知何故卡住了。我正在尝试使用 wholeTextFiles
读取 Spark 中的一组文件,并希望 return 和 RDD[LogEntry]
,其中 LogEntry
只是一个案例 class。我想以有效条目的 RDD 结束,我需要使用正则表达式来提取我的案例 class 的参数。当条目无效时,我不希望提取器逻辑失败,而只是在日志中写入一个条目。为此,我使用 LazyLogging。
object LogProcessors extends LazyLogging {
def extractLogs(sc: SparkContext, path: String, numPartitions: Int = 5): RDD[Option[CleaningLogEntry]] = {
val pattern = "<some pattern>".r
val logs = sc.wholeTextFiles(path, numPartitions)
val entries = logs.map(fileContent => {
val file = fileContent._1
val content = fileContent._2
content.split("\r?\n").map(line => line match {
case pattern(dt, ev, seq) => Some(LogEntry(<...>))
case _ => logger.error(s"Cannot parse $file: $line"); None
})
})
这给了我一个 RDD[Array[Option[LogEntry]]]
。有没有一种巧妙的方法来结束 LogEntry
s 的 RDD?我有点想念它。
我正在考虑改用 Try
,但我不确定那样是否更好。
非常感谢您的想法。
要摆脱
Array
- 只需将map
命令替换为flatMap
- flatMap 将处理Traversable[T]
类型的结果每条记录作为T
. 类型的单独记录
去掉
Option
-collect
只有成功的:entries.collect { case Some(entry) => entry }
。 请注意,此collect(p: PartialFunction)
重载(执行与map
和filter
组合等效的操作)与collect()
(将所有数据发送到驱动程序)非常不同。
总而言之,这将是这样的:
def extractLogs(sc: SparkContext, path: String, numPartitions: Int = 5): RDD[CleaningLogEntry] = {
val pattern = "<some pattern>".r
val logs = sc.wholeTextFiles(path, numPartitions)
val entries = logs.flatMap(fileContent => {
val file = fileContent._1
val content = fileContent._2
content.split("\r?\n").map(line => line match {
case pattern(dt, ev, seq) => Some(LogEntry(<...>))
case _ => logger.error(s"Cannot parse $file: $line"); None
})
})
entries.collect { case Some(entry) => entry }
}