如何从 Spark 中的文本文件创建 DataFrame
How to create a DataFrame from a text file in Spark
我在 HDFS 上有一个文本文件,我想将其转换为 Spark 中的数据帧。
我正在使用 Spark 上下文加载文件,然后尝试从该文件生成单独的列。
val myFile = sc.textFile("file.txt")
val myFile1 = myFile.map(x=>x.split(";"))
完成此操作后,我正在尝试以下操作。
myFile1.toDF()
我遇到了一个问题,因为 myFile1 RDD 中的元素现在是数组类型。
我该如何解决这个问题?
如果您想使用 toDF
方法,您必须将 RDD
of Array[String]
转换为 RDD
of case class。例如,你必须这样做:
case class Test(id:String,filed2:String)
val myFile = sc.textFile("file.txt")
val df= myFile.map( x => x.split(";") ).map( x=> Test(x(0),x(1)) ).toDF()
更新 - 从 Spark 1.6 开始,您可以简单地使用 built-in csv 数据源:
spark: SparkSession = // create the Spark Session
val df = spark.read.csv("file.txt")
您还可以使用各种选项来控制 CSV 解析,例如:
val df = spark.read.option("header", "false").csv("file.txt")
对于 Spark 版本 < 1.6:
最简单的方法是使用 spark-csv - 将其包含在您的依赖项中并遵循 README,它允许设置自定义分隔符 (;
),可以读取 CSV headers(如果您有) ,它可以推断模式 types(需要额外扫描数据)。
或者,如果您知道模式,您可以创建一个 case-class 来表示它,并将您的 RDD 元素映射到此 class 的实例中,然后再转换为 DataFrame,例如:
case class Record(id: Int, name: String)
val myFile1 = myFile.map(x=>x.split(";")).map {
case Array(id, name) => Record(id.toInt, name)
}
myFile1.toDF() // DataFrame will have columns "id" and "name"
val df = spark.read.textFile("abc.txt")
case class Abc (amount:Int, types: String, id:Int) //columns and data types
val df2 = df.map(rec=>Amount(rec(0).toInt, rec(1), rec(2).toInt))
rdd2.printSchema
root
|-- amount: integer (nullable = true)
|-- types: string (nullable = true)
|-- id: integer (nullable = true)
我知道我回答这个问题已经很晚了,但我想出了一个不同的答案:
val rdd = sc.textFile("/home/training/mydata/file.txt")
val text = rdd.map(lines=lines.split(",")).map(arrays=>(ararys(0),arrays(1))).toDF("id","name").show
我给出了从文本文件创建 DataFrame 的不同方法
val conf = new SparkConf().setAppName(appName).setMaster("local")
val sc = SparkContext(conf)
原始文本文件
val file = sc.textFile("C:\vikas\spark\Interview\text.txt")
val fileToDf = file.map(_.split(",")).map{case Array(a,b,c) =>
(a,b.toInt,c)}.toDF("name","age","city")
fileToDf.foreach(println(_))
没有架构的 spark 会话
import org.apache.spark.sql.SparkSession
val sparkSess =
SparkSession.builder().appName("SparkSessionZipsExample")
.config(conf).getOrCreate()
val df = sparkSess.read.option("header",
"false").csv("C:\vikas\spark\Interview\text.txt")
df.show()
使用模式启动会话
import org.apache.spark.sql.types._
val schemaString = "name age city"
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName,
StringType, nullable=true))
val schema = StructType(fields)
val dfWithSchema = sparkSess.read.option("header",
"false").schema(schema).csv("C:\vikas\spark\Interview\text.txt")
dfWithSchema.show()
使用 sql 上下文
import org.apache.spark.sql.SQLContext
val fileRdd =
sc.textFile("C:\vikas\spark\Interview\text.txt").map(_.split(",")).map{x
=> org.apache.spark.sql.Row(x:_*)}
val sqlDf = sqlCtx.createDataFrame(fileRdd,schema)
sqlDf.show()
您可以读取一个文件以获得一个 RDD,然后为其分配模式。创建模式的两种常见方法是使用案例 class 或模式对象 [我的首选]。遵循您可能使用的快速代码片段。
案例Class方法
case class Test(id:String,name:String)
val myFile = sc.textFile("file.txt")
val df= myFile.map( x => x.split(";") ).map( x=> Test(x(0),x(1)) ).toDF()
模式方法
import org.apache.spark.sql.types._
val schemaString = "id name"
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable=true))
val schema = StructType(fields)
val dfWithSchema = sparkSess.read.option("header","false").schema(schema).csv("file.txt")
dfWithSchema.show()
第二种是我的首选方法,因为案例 class 有最多 22 个字段的限制,如果您的文件超过 22 个字段,这将是一个问题!
除非使用隐式转换,否则您无法将其转换为数据框。
val sqlContext = new SqlContext(new SparkContext())
import sqlContext.implicits._
之后只有你可以将其转换为数据框
case class Test(id:String,filed2:String)
val myFile = sc.textFile("file.txt")
val df= myFile.map( x => x.split(";") ).map( x=> Test(x(0),x(1)) ).toDF()
用竖线 (|) 分隔的 txt 文件可以读作:
df = spark.read.option("sep", "|").option("header", "true").csv("s3://bucket_name/folder_path/file_name.txt")
我在 HDFS 上有一个文本文件,我想将其转换为 Spark 中的数据帧。
我正在使用 Spark 上下文加载文件,然后尝试从该文件生成单独的列。
val myFile = sc.textFile("file.txt")
val myFile1 = myFile.map(x=>x.split(";"))
完成此操作后,我正在尝试以下操作。
myFile1.toDF()
我遇到了一个问题,因为 myFile1 RDD 中的元素现在是数组类型。
我该如何解决这个问题?
如果您想使用 toDF
方法,您必须将 RDD
of Array[String]
转换为 RDD
of case class。例如,你必须这样做:
case class Test(id:String,filed2:String)
val myFile = sc.textFile("file.txt")
val df= myFile.map( x => x.split(";") ).map( x=> Test(x(0),x(1)) ).toDF()
更新 - 从 Spark 1.6 开始,您可以简单地使用 built-in csv 数据源:
spark: SparkSession = // create the Spark Session
val df = spark.read.csv("file.txt")
您还可以使用各种选项来控制 CSV 解析,例如:
val df = spark.read.option("header", "false").csv("file.txt")
对于 Spark 版本 < 1.6:
最简单的方法是使用 spark-csv - 将其包含在您的依赖项中并遵循 README,它允许设置自定义分隔符 (;
),可以读取 CSV headers(如果您有) ,它可以推断模式 types(需要额外扫描数据)。
或者,如果您知道模式,您可以创建一个 case-class 来表示它,并将您的 RDD 元素映射到此 class 的实例中,然后再转换为 DataFrame,例如:
case class Record(id: Int, name: String)
val myFile1 = myFile.map(x=>x.split(";")).map {
case Array(id, name) => Record(id.toInt, name)
}
myFile1.toDF() // DataFrame will have columns "id" and "name"
val df = spark.read.textFile("abc.txt")
case class Abc (amount:Int, types: String, id:Int) //columns and data types
val df2 = df.map(rec=>Amount(rec(0).toInt, rec(1), rec(2).toInt))
rdd2.printSchema
root
|-- amount: integer (nullable = true)
|-- types: string (nullable = true)
|-- id: integer (nullable = true)
我知道我回答这个问题已经很晚了,但我想出了一个不同的答案:
val rdd = sc.textFile("/home/training/mydata/file.txt")
val text = rdd.map(lines=lines.split(",")).map(arrays=>(ararys(0),arrays(1))).toDF("id","name").show
我给出了从文本文件创建 DataFrame 的不同方法
val conf = new SparkConf().setAppName(appName).setMaster("local")
val sc = SparkContext(conf)
原始文本文件
val file = sc.textFile("C:\vikas\spark\Interview\text.txt")
val fileToDf = file.map(_.split(",")).map{case Array(a,b,c) =>
(a,b.toInt,c)}.toDF("name","age","city")
fileToDf.foreach(println(_))
没有架构的 spark 会话
import org.apache.spark.sql.SparkSession
val sparkSess =
SparkSession.builder().appName("SparkSessionZipsExample")
.config(conf).getOrCreate()
val df = sparkSess.read.option("header",
"false").csv("C:\vikas\spark\Interview\text.txt")
df.show()
使用模式启动会话
import org.apache.spark.sql.types._
val schemaString = "name age city"
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName,
StringType, nullable=true))
val schema = StructType(fields)
val dfWithSchema = sparkSess.read.option("header",
"false").schema(schema).csv("C:\vikas\spark\Interview\text.txt")
dfWithSchema.show()
使用 sql 上下文
import org.apache.spark.sql.SQLContext
val fileRdd =
sc.textFile("C:\vikas\spark\Interview\text.txt").map(_.split(",")).map{x
=> org.apache.spark.sql.Row(x:_*)}
val sqlDf = sqlCtx.createDataFrame(fileRdd,schema)
sqlDf.show()
您可以读取一个文件以获得一个 RDD,然后为其分配模式。创建模式的两种常见方法是使用案例 class 或模式对象 [我的首选]。遵循您可能使用的快速代码片段。
案例Class方法
case class Test(id:String,name:String)
val myFile = sc.textFile("file.txt")
val df= myFile.map( x => x.split(";") ).map( x=> Test(x(0),x(1)) ).toDF()
模式方法
import org.apache.spark.sql.types._
val schemaString = "id name"
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable=true))
val schema = StructType(fields)
val dfWithSchema = sparkSess.read.option("header","false").schema(schema).csv("file.txt")
dfWithSchema.show()
第二种是我的首选方法,因为案例 class 有最多 22 个字段的限制,如果您的文件超过 22 个字段,这将是一个问题!
除非使用隐式转换,否则您无法将其转换为数据框。
val sqlContext = new SqlContext(new SparkContext())
import sqlContext.implicits._
之后只有你可以将其转换为数据框
case class Test(id:String,filed2:String)
val myFile = sc.textFile("file.txt")
val df= myFile.map( x => x.split(";") ).map( x=> Test(x(0),x(1)) ).toDF()
用竖线 (|) 分隔的 txt 文件可以读作:
df = spark.read.option("sep", "|").option("header", "true").csv("s3://bucket_name/folder_path/file_name.txt")