Spark 1.6.1:从 RDD[Array[Error]] 创建 DataFrame
Spark 1.6.1: creating DataFrame from RDD[Array[Error]]
在我正在编写的 Scala 应用程序中尝试创建 DataFrame 时,我 运行 遇到了问题。
我遇到的问题是编译 scala 时出现错误,toDF 不是 RDD 的一部分。我看到的答案建议将 case class 定义移出 main 并在 sqlContext 声明后导入隐式,但即使那样对我也不起作用。
这是我目前拥有的:
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql._
object ErrorParser {
case class Error(time: String, status: String, statusType: String, host: String, message: String)
def splitError(line: String) : Array[String] = {
var array:Array[String] = new Array[String](5)
...
return array
}
def filterErrors(errors: Array[Array[String]]) : Array[Array[String]] = {
var filteredErrors = ArrayBuffer[Array[String]]()
...
return filteredErrors.toArray
}
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("ErrorParserAPI")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
var logs = sc.textFile("hdfs://hadoop-master:9000/logs/data/logs/server.*")
var errors = logs.filter(line => line.contains("ERROR"))
val errors1 = errors.map(line => splitError(line))
val filteredErrors = filterErrors(errors1.collect)
val dfErrors = filteredErrors.map(p => Error(p(0).split(":")(0) + ":" + p(0).split(":")(1), p(1), p(2), p(3), p(4)))
val filteredRDD = sc.parallelize(dfErrors)
var errorDF = filteredRDD.toDF()
errorDF.write.json("hdfs://hadoop-master:9000/results/errorParserResult")
}
}
我很困惑,因为在 spark-shell 中事情是这样的。
我还看到一些答案建议将 RDD 更改为 RDD[Row] 的实例,然后使用
sc.createDataFrame(rdd, scheme)
但我想不通我会怎么做。
如有任何帮助,我们将不胜感激!
这是我的 .sbt 文件:
name := "ErrorParserAPI"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies ++= Seq(
"org.apache.spark" % "spark-core_2.10" % "1.6.1",
"org.apache.spark" % "spark-sql_2.10" % "1.6.1"
)
编辑:打字错误
我刚刚复制了您的代码并粘贴到我的 eclipse 中,它工作正常,没有任何编译错误。如果您使用的是 eclipse ,您可以尝试清理和刷新您的项目。
import scala.Array.canBuildFrom
import scala.collection.mutable.ArrayBuffer
import scala.reflect.runtime.universe
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object ErrorParser {
def filterErrors(errors: Array[Array[String]]): Array[Array[String]] = {
var filteredErrors = ArrayBuffer[Array[String]]()
return filteredErrors.toArray
}
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("ErrorParserAPI")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
var logs = sc.textFile("hdfs://hadoop-master:9000/logs/data/logs/server.*")
var errors = logs.filter(line => line.contains("ERROR"))
val errors1 = errors.map(line => splitError(line))
val filteredErrors = filterErrors(errors1.collect)
val dfErrors = filteredErrors.map(p => Error(p(0).split(":")(0) + ":" + p(0).split(":")(1), p(1), p(2), p(3), p(4)))
val filteredRDD = sc.parallelize(dfErrors)
var errorDF = filteredRDD.toDF()
}
case class Error(time: String, status: String, statusType: String, host: String, message: String)
def splitError(line: String): Array[String] = {
var array: Array[String] = new Array[String](5)
return array
}
}
在我正在编写的 Scala 应用程序中尝试创建 DataFrame 时,我 运行 遇到了问题。
我遇到的问题是编译 scala 时出现错误,toDF 不是 RDD 的一部分。我看到的答案建议将 case class 定义移出 main 并在 sqlContext 声明后导入隐式,但即使那样对我也不起作用。
这是我目前拥有的:
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql._
object ErrorParser {
case class Error(time: String, status: String, statusType: String, host: String, message: String)
def splitError(line: String) : Array[String] = {
var array:Array[String] = new Array[String](5)
...
return array
}
def filterErrors(errors: Array[Array[String]]) : Array[Array[String]] = {
var filteredErrors = ArrayBuffer[Array[String]]()
...
return filteredErrors.toArray
}
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("ErrorParserAPI")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
var logs = sc.textFile("hdfs://hadoop-master:9000/logs/data/logs/server.*")
var errors = logs.filter(line => line.contains("ERROR"))
val errors1 = errors.map(line => splitError(line))
val filteredErrors = filterErrors(errors1.collect)
val dfErrors = filteredErrors.map(p => Error(p(0).split(":")(0) + ":" + p(0).split(":")(1), p(1), p(2), p(3), p(4)))
val filteredRDD = sc.parallelize(dfErrors)
var errorDF = filteredRDD.toDF()
errorDF.write.json("hdfs://hadoop-master:9000/results/errorParserResult")
}
}
我很困惑,因为在 spark-shell 中事情是这样的。
我还看到一些答案建议将 RDD 更改为 RDD[Row] 的实例,然后使用
sc.createDataFrame(rdd, scheme)
但我想不通我会怎么做。
如有任何帮助,我们将不胜感激!
这是我的 .sbt 文件:
name := "ErrorParserAPI"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies ++= Seq(
"org.apache.spark" % "spark-core_2.10" % "1.6.1",
"org.apache.spark" % "spark-sql_2.10" % "1.6.1"
)
编辑:打字错误
我刚刚复制了您的代码并粘贴到我的 eclipse 中,它工作正常,没有任何编译错误。如果您使用的是 eclipse ,您可以尝试清理和刷新您的项目。
import scala.Array.canBuildFrom
import scala.collection.mutable.ArrayBuffer
import scala.reflect.runtime.universe
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object ErrorParser {
def filterErrors(errors: Array[Array[String]]): Array[Array[String]] = {
var filteredErrors = ArrayBuffer[Array[String]]()
return filteredErrors.toArray
}
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("ErrorParserAPI")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
var logs = sc.textFile("hdfs://hadoop-master:9000/logs/data/logs/server.*")
var errors = logs.filter(line => line.contains("ERROR"))
val errors1 = errors.map(line => splitError(line))
val filteredErrors = filterErrors(errors1.collect)
val dfErrors = filteredErrors.map(p => Error(p(0).split(":")(0) + ":" + p(0).split(":")(1), p(1), p(2), p(3), p(4)))
val filteredRDD = sc.parallelize(dfErrors)
var errorDF = filteredRDD.toDF()
}
case class Error(time: String, status: String, statusType: String, host: String, message: String)
def splitError(line: String): Array[String] = {
var array: Array[String] = new Array[String](5)
return array
}
}