如何在 Scala 的 Apache Spark 中将 RDD(在文本文件目录中读取)转换为数据帧?
How to convert an RDD (that read in a directory of text files) into dataFrame in Apache Spark in Scala?
我正在使用 Apache Spark TF-IDF 开发 Scala 特征提取应用程序。我需要从文本文件目录中读入。我正在尝试将 RDD 转换为数据帧,但出现错误 "value toDF() is not a member of org.apache.spark.rdd.RDD[streamedRDD]"。这就是我现在拥有的...
我有 spark-2.2.1 和 Scala 2.1.11。提前致谢。
代码:
// Creating the Spark context that will interface with Spark
val conf = new SparkConf()
.setMaster("local")
.setAppName("TextClassification")
val sc = new SparkContext(conf)
// Load documents (one per line)
val data = sc.wholeTextFiles("C:/Users/*")
val text = data.map{case(filepath,text) => text}
val id = data.map{case(filepath, text) => text.split("@").takeRight(1)(0)}
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
case class dataStreamed(id: String, input: String)
val tweetsDF = data
.map{case (filepath, text) =>
val id = text.split("@").takeRight(1)(0)
val input = text.split(":").takeRight(2)(0)
dataStreamed(id, input)}
.as[dataStreamed]
.toDF()
.cache()
// -------------------- TF-IDF --------------------
// From spark.apache.org
// URL http://spark.apache.org/docs/latest/ml-features.html#tf-idf
val tokenizer = new Tokenizer().setInputCol("input").setOutputCol("words")
val wordsData = tokenizer.transform(tweetsDF)
val hashingTF = new HashingTF()
.setInputCol("words")
.setOutputCol("rawFeatures")
val tf = hashingTF.transform(wordsData).cache() // Hashed words
// Compute for the TFxIDF
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val tfidf = idf.fit(tf)
数据:(文件夹中的这些文本文件是我需要读入的)
https://www.dropbox.com/s/cw3okhaosu7i1md/cars.txt?dl=0
https://www.dropbox.com/s/29tgqg7ifpxzwwz/Italy.txt?dl=0
这里的问题是 map 函数 returns 您分配给 tweetsDF 的一种 Dataset[Row] 类型。应该是:
case class dataStreamed(id: String, input: String)
def test() = {
val sparkConf = new SparkConf().setAppName("TextClassification").setMaster("local")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val sqlContext = spark.sqlContext
import sqlContext.implicits._
// Load documents (one per line)
val data = spark.sparkContext.wholeTextFiles("C:\tmp\Whosebug\*")
val dataset = spark.createDataset(data)
val tweetsDF = dataset
.map{case (id : String, input : String) =>
val file = id.split("@").takeRight(1)(0)
val content = input.split(":").takeRight(2)(0)
dataStreamed(file, content)}
.as[dataStreamed]
tweetsDF.printSchema()
tweetsDF.show(10)
}
第一个数据将是一个 RDD(String, String),然后我用 spark.createDataset 创建一个新数据集,以便能够将 map 与 case class 一起正确使用。请注意,您必须在您的方法中定义 dataStreamedclass(在这种情况下进行测试)
祝你好运
我们可以用几个 commands/functions:
调用spark/scalashell,可以使用driver-memory、executor-memory、executor-cores等适合你的工作
spark-shell
从HDFS读取文本文件
val text_rdd = sc.textFile("path/to/file/on/hdfs")
将文本rdd转换为DataFrame
val text_df = text_rdd.toDF
在 HDFS 中保存为计划文本格式
text_df.saveAsTextFile("path/to/hdfs")
在 HDFS 中保存为可拆分压缩格式
text_df.coalesce(1).write.parquet("path/to/hdfs")
我正在使用 Apache Spark TF-IDF 开发 Scala 特征提取应用程序。我需要从文本文件目录中读入。我正在尝试将 RDD 转换为数据帧,但出现错误 "value toDF() is not a member of org.apache.spark.rdd.RDD[streamedRDD]"。这就是我现在拥有的...
我有 spark-2.2.1 和 Scala 2.1.11。提前致谢。
代码:
// Creating the Spark context that will interface with Spark
val conf = new SparkConf()
.setMaster("local")
.setAppName("TextClassification")
val sc = new SparkContext(conf)
// Load documents (one per line)
val data = sc.wholeTextFiles("C:/Users/*")
val text = data.map{case(filepath,text) => text}
val id = data.map{case(filepath, text) => text.split("@").takeRight(1)(0)}
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
case class dataStreamed(id: String, input: String)
val tweetsDF = data
.map{case (filepath, text) =>
val id = text.split("@").takeRight(1)(0)
val input = text.split(":").takeRight(2)(0)
dataStreamed(id, input)}
.as[dataStreamed]
.toDF()
.cache()
// -------------------- TF-IDF --------------------
// From spark.apache.org
// URL http://spark.apache.org/docs/latest/ml-features.html#tf-idf
val tokenizer = new Tokenizer().setInputCol("input").setOutputCol("words")
val wordsData = tokenizer.transform(tweetsDF)
val hashingTF = new HashingTF()
.setInputCol("words")
.setOutputCol("rawFeatures")
val tf = hashingTF.transform(wordsData).cache() // Hashed words
// Compute for the TFxIDF
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val tfidf = idf.fit(tf)
数据:(文件夹中的这些文本文件是我需要读入的) https://www.dropbox.com/s/cw3okhaosu7i1md/cars.txt?dl=0 https://www.dropbox.com/s/29tgqg7ifpxzwwz/Italy.txt?dl=0
这里的问题是 map 函数 returns 您分配给 tweetsDF 的一种 Dataset[Row] 类型。应该是:
case class dataStreamed(id: String, input: String)
def test() = {
val sparkConf = new SparkConf().setAppName("TextClassification").setMaster("local")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val sqlContext = spark.sqlContext
import sqlContext.implicits._
// Load documents (one per line)
val data = spark.sparkContext.wholeTextFiles("C:\tmp\Whosebug\*")
val dataset = spark.createDataset(data)
val tweetsDF = dataset
.map{case (id : String, input : String) =>
val file = id.split("@").takeRight(1)(0)
val content = input.split(":").takeRight(2)(0)
dataStreamed(file, content)}
.as[dataStreamed]
tweetsDF.printSchema()
tweetsDF.show(10)
}
第一个数据将是一个 RDD(String, String),然后我用 spark.createDataset 创建一个新数据集,以便能够将 map 与 case class 一起正确使用。请注意,您必须在您的方法中定义 dataStreamedclass(在这种情况下进行测试)
祝你好运
我们可以用几个 commands/functions:
调用spark/scalashell,可以使用driver-memory、executor-memory、executor-cores等适合你的工作
spark-shell
从HDFS读取文本文件
val text_rdd = sc.textFile("path/to/file/on/hdfs")
将文本rdd转换为DataFrame
val text_df = text_rdd.toDF
在 HDFS 中保存为计划文本格式
text_df.saveAsTextFile("path/to/hdfs")
在 HDFS 中保存为可拆分压缩格式
text_df.coalesce(1).write.parquet("path/to/hdfs")