如何在 Spark 过滤器函数中收集或存储过滤出的 json
How to collect or store filtered out jsons in Spark filter function
我想存储或收集过滤掉的数据,即未能通过 hdfs 或 hbase 验证的 json。
dstream.filter { data => VitalValidator.isVitalJSONValid(data) }
其中 dstream 是 DStream[String] 并且 isVitalJSONValid 接受字符串并且 returns 布尔值
我会用 Scala 做这样的事情。
def isVitalJSONValid(data: String): Boolean = {
var isValid = false
//peroforms some validation
if(data.equals("some/validation")){
isValid = true
}
!isValid
}
//existing goes on here
dstream.filter(data => isVitalJSONValid(data)).saveAsHadoopFiles("file_prefix")
我想存储或收集过滤掉的数据,即未能通过 hdfs 或 hbase 验证的 json。
dstream.filter { data => VitalValidator.isVitalJSONValid(data) }
其中 dstream 是 DStream[String] 并且 isVitalJSONValid 接受字符串并且 returns 布尔值
我会用 Scala 做这样的事情。
def isVitalJSONValid(data: String): Boolean = {
var isValid = false
//peroforms some validation
if(data.equals("some/validation")){
isValid = true
}
!isValid
}
//existing goes on here
dstream.filter(data => isVitalJSONValid(data)).saveAsHadoopFiles("file_prefix")