Spark 1.6.1:从 RDD[Array[Error]] 创建 DataFrame

Spark 1.6.1: creating DataFrame from RDD[Array[Error]]

在我正在编写的 Scala 应用程序中尝试创建 DataFrame 时,我 运行 遇到了问题。

我遇到的问题是编译 scala 时出现错误,toDF 不是 RDD 的一部分。我看到的答案建议将 case class 定义移出 main 并在 sqlContext 声明后导入隐式,但即使那样对我也不起作用。

这是我目前拥有的:

import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql._

object ErrorParser {

    case class Error(time: String, status: String, statusType: String, host: String, message: String)

    def splitError(line: String) : Array[String] = {

        var array:Array[String] = new Array[String](5)

        ...

        return array

    }

    def filterErrors(errors: Array[Array[String]]) : Array[Array[String]] = {

        var filteredErrors = ArrayBuffer[Array[String]]()

        ...

        return filteredErrors.toArray
    }

    def main(args: Array[String]) {

        val conf = new SparkConf().setAppName("ErrorParserAPI")
        val sc = new SparkContext(conf)

        val sqlContext = new org.apache.spark.sql.SQLContext(sc)
        import sqlContext.implicits._

        var logs = sc.textFile("hdfs://hadoop-master:9000/logs/data/logs/server.*")
        var errors = logs.filter(line => line.contains("ERROR"))

        val errors1 = errors.map(line => splitError(line))
        val filteredErrors = filterErrors(errors1.collect)

        val dfErrors = filteredErrors.map(p => Error(p(0).split(":")(0) + ":" + p(0).split(":")(1), p(1), p(2), p(3), p(4)))
        val filteredRDD = sc.parallelize(dfErrors)
        var errorDF = filteredRDD.toDF()

        errorDF.write.json("hdfs://hadoop-master:9000/results/errorParserResult")

   }

}

我很困惑,因为在 spark-shell 中事情是这样的。

我还看到一些答案建议将 RDD 更改为 RDD[Row] 的实例,然后使用

sc.createDataFrame(rdd, scheme)

但我想不通我会怎么做。

如有任何帮助,我们将不胜感激!

这是我的 .sbt 文件:

name := "ErrorParserAPI"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies ++= Seq(
        "org.apache.spark" % "spark-core_2.10" % "1.6.1",
        "org.apache.spark" % "spark-sql_2.10" % "1.6.1"
)

编辑:打字错误

我刚刚复制了您的代码并粘贴到我的 eclipse 中,它工作正常,没有任何编译错误。如果您使用的是 eclipse ,您可以尝试清理和刷新您的项目。

import scala.Array.canBuildFrom
import scala.collection.mutable.ArrayBuffer
import scala.reflect.runtime.universe

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object ErrorParser {


  def filterErrors(errors: Array[Array[String]]): Array[Array[String]] = {

    var filteredErrors = ArrayBuffer[Array[String]]()

    return filteredErrors.toArray
  }

  def main(args: Array[String]) {



    val conf = new SparkConf().setAppName("ErrorParserAPI")
    val sc = new SparkContext(conf)

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

    var logs = sc.textFile("hdfs://hadoop-master:9000/logs/data/logs/server.*")
    var errors = logs.filter(line => line.contains("ERROR"))

    val errors1 = errors.map(line => splitError(line))
    val filteredErrors = filterErrors(errors1.collect)

    val dfErrors = filteredErrors.map(p => Error(p(0).split(":")(0) + ":" + p(0).split(":")(1), p(1), p(2), p(3), p(4)))
    val filteredRDD = sc.parallelize(dfErrors)
    var errorDF = filteredRDD.toDF()
  }

  case class Error(time: String, status: String, statusType: String, host: String, message: String)

  def splitError(line: String): Array[String] = {

    var array: Array[String] = new Array[String](5)

    return array

  }
}