如何使用正则表达式格式化 RDD,然后将其存储到 spark 中的 mongoDB
How to format a RDD using regular expression then store it to mongoDB in spark
原始数据只是原始网络日志,使用 Flume 聚合并使用 Kafka 发布。喜欢:
60.175.130.12 - - [21/Apr/2018:20:46:35 +0800] "GET /wp-admin/edit.php HTTP/1.1" 200 13347 "http://.....php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/11.1 Safari/605.1.15"
我想使用 Spark Streaming 接收一批日志,然后使用正则表达式拆分它:
val regex = """^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)\s?(\S+)?\s?(\S+)?" (\d{3}|-) (\d+|-)\s?"?([^"]*)"?\s?"?([^"]*)?"?$""".r
拆分成数据库友好的形式:
case class log(
host: String,
rfc931: String,
username: String,
data_time: String,
req_method: String,
req_url: String,
req_protocol: String,
statuscode: String,
bytes: Int,
referrer: String,
user_agent: String)
然后只需将批次附加到 mongoDB。
但是我在拆分批处理时遇到了问题:
val lines = stream.flatMap{ batch =>
batch.value().split("\n")
}
val records = lines.map { record =>
val regex = """^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)\s?(\S+)?\s?(\S+)?" (\d{3}|-) (\d+|-)\s?"?([^"]*)"?\s?"?([^"]*)?"?$""".r
val matched = regex.findAllIn(record)
log(matched.group(1), matched.group(2), matched.group(3), matched.group(4), matched.group(5), matched.group(6), matched.group(7), matched.group(8), matched.group(9).toInt, matched.group(10), matched.group(11))
}
records.foreachRDD{ record =>
import db.implicits._
val record_DF = record.toDF()
record_DF.write.mode("append").mongo()
}
这就是我认为应该实现的方式。
先把stream拆分成行,然后用regex映射每一行,拆分成log格式,最后写入DB。
由于'No match available all the time'导致程序失败,或正则表达式匹配失败的类似问题。
...
初学者,需要帮助..
修改后解决的问题:
val records = lines.map { record =>
val PATTERN = """^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)\s?(\S+)?\s?(\S+)?" (\d{3}|-) (\d+|-)\s?"?([^"]*)"?\s?"?([^"]*)?"?$""".r
val options = PATTERN.findFirstMatchIn(record)
val matched = options.get
log(matched.group(1), matched.group(2), matched.group(3), matched.group(4), matched.group(5), matched.group(6), matched.group(7), matched.group(8), matched.group(9).toInt, matched.group(10), matched.group(11))
}
records.print()
原始数据只是原始网络日志,使用 Flume 聚合并使用 Kafka 发布。喜欢:
60.175.130.12 - - [21/Apr/2018:20:46:35 +0800] "GET /wp-admin/edit.php HTTP/1.1" 200 13347 "http://.....php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_4) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/11.1 Safari/605.1.15"
我想使用 Spark Streaming 接收一批日志,然后使用正则表达式拆分它:
val regex = """^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)\s?(\S+)?\s?(\S+)?" (\d{3}|-) (\d+|-)\s?"?([^"]*)"?\s?"?([^"]*)?"?$""".r
拆分成数据库友好的形式:
case class log(
host: String,
rfc931: String,
username: String,
data_time: String,
req_method: String,
req_url: String,
req_protocol: String,
statuscode: String,
bytes: Int,
referrer: String,
user_agent: String)
然后只需将批次附加到 mongoDB。
但是我在拆分批处理时遇到了问题:
val lines = stream.flatMap{ batch =>
batch.value().split("\n")
}
val records = lines.map { record =>
val regex = """^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)\s?(\S+)?\s?(\S+)?" (\d{3}|-) (\d+|-)\s?"?([^"]*)"?\s?"?([^"]*)?"?$""".r
val matched = regex.findAllIn(record)
log(matched.group(1), matched.group(2), matched.group(3), matched.group(4), matched.group(5), matched.group(6), matched.group(7), matched.group(8), matched.group(9).toInt, matched.group(10), matched.group(11))
}
records.foreachRDD{ record =>
import db.implicits._
val record_DF = record.toDF()
record_DF.write.mode("append").mongo()
}
这就是我认为应该实现的方式。 先把stream拆分成行,然后用regex映射每一行,拆分成log格式,最后写入DB。
由于'No match available all the time'导致程序失败,或正则表达式匹配失败的类似问题。
...
初学者,需要帮助..
修改后解决的问题:
val records = lines.map { record =>
val PATTERN = """^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+)\s?(\S+)?\s?(\S+)?" (\d{3}|-) (\d+|-)\s?"?([^"]*)"?\s?"?([^"]*)?"?$""".r
val options = PATTERN.findFirstMatchIn(record)
val matched = options.get
log(matched.group(1), matched.group(2), matched.group(3), matched.group(4), matched.group(5), matched.group(6), matched.group(7), matched.group(8), matched.group(9).toInt, matched.group(10), matched.group(11))
}
records.print()