如何在 Scala 的 Apache Spark 中将 RDD(在文本文件目录中读取)转换为数据帧?

How to convert an RDD (that read in a directory of text files) into dataFrame in Apache Spark in Scala?

我正在使用 Apache Spark TF-IDF 开发 Scala 特征提取应用程序。我需要从文本文件目录中读入。我正在尝试将 RDD 转换为数据帧,但出现错误 "value toDF() is not a member of org.apache.spark.rdd.RDD[streamedRDD]"。这就是我现在拥有的...

我有 spark-2.2.1 和 Scala 2.1.11。提前致谢。

代码:

// Creating the Spark context that will interface with Spark
val conf = new SparkConf()
           .setMaster("local")
           .setAppName("TextClassification")
val sc = new SparkContext(conf)

// Load documents (one per line)
val data = sc.wholeTextFiles("C:/Users/*")    
val text = data.map{case(filepath,text) => text}    
val id = data.map{case(filepath, text) => text.split("@").takeRight(1)(0)}

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

case class dataStreamed(id: String, input: String)

val tweetsDF = data
              .map{case (filepath, text) => 
                   val id = text.split("@").takeRight(1)(0)
                   val input = text.split(":").takeRight(2)(0)
                   dataStreamed(id, input)}
              .as[dataStreamed]
              .toDF()
              .cache()

// -------------------- TF-IDF --------------------
// From spark.apache.org
// URL http://spark.apache.org/docs/latest/ml-features.html#tf-idf

val tokenizer = new Tokenizer().setInputCol("input").setOutputCol("words")
val wordsData = tokenizer.transform(tweetsDF)
val hashingTF = new HashingTF()
                .setInputCol("words")
                .setOutputCol("rawFeatures")

val tf = hashingTF.transform(wordsData).cache()  // Hashed words

// Compute for the TFxIDF    
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val tfidf = idf.fit(tf)

数据:(文件夹中的这些文本文件是我需要读入的) https://www.dropbox.com/s/cw3okhaosu7i1md/cars.txt?dl=0 https://www.dropbox.com/s/29tgqg7ifpxzwwz/Italy.txt?dl=0

这里的问题是 map 函数 returns 您分配给 tweetsDF 的一种 Dataset[Row] 类型。应该是:

case class dataStreamed(id: String, input: String)
def test() = {

  val sparkConf = new SparkConf().setAppName("TextClassification").setMaster("local")
  val spark = SparkSession.builder().config(sparkConf).getOrCreate()

  val sqlContext = spark.sqlContext
  import sqlContext.implicits._

  // Load documents (one per line)
  val data = spark.sparkContext.wholeTextFiles("C:\tmp\Whosebug\*")

  val dataset = spark.createDataset(data)

  val tweetsDF = dataset
    .map{case (id : String, input : String) =>
      val file = id.split("@").takeRight(1)(0)
      val content = input.split(":").takeRight(2)(0)
      dataStreamed(file, content)}
    .as[dataStreamed]

  tweetsDF.printSchema()
  tweetsDF.show(10)
}

第一个数据将是一个 RDD(String, String),然后我用 spark.createDataset 创建一个新数据集,以便能够将 map 与 case class 一起正确使用。请注意,您必须在您的方法中定义 dataStreamedclass(在这种情况下进行测试)

祝你好运

我们可以用几个 commands/functions:

  • 调用spark/scalashell,可以使用driver-memory、executor-memory、executor-cores等适合你的工作

    spark-shell

  • 从HDFS读取文本文件

    val text_rdd = sc.textFile("path/to/file/on/hdfs")

  • 将文本rdd转换为DataFrame

    val text_df = text_rdd.toDF

  • 在 HDFS 中保存为计划文本格式

    text_df.saveAsTextFile("path/to/hdfs")

  • 在 HDFS 中保存为可拆分压缩格式

    text_df.coalesce(1).write.parquet("path/to/hdfs")