如何使用 scala 在 Spark 中使用 DataSet?
How to work with DataSet in Spark using scala?
我使用 DataFrame 加载 CSV,然后转换为 DataSet,但显示如下
此行有多个标记:
- 无法找到存储在数据集中的类型的编码器。导入支持原始类型(Int、String 等)和产品类型(case 类)
spark.implicits._ 将在未来的版本中添加对序列化其他类型的支持。
- 方法参数不足:(隐含证据 $2:
org.apache.spark.sql.Encoder[DataSet.spark.aacsv])org.apache.spark.sql.Dataset[DataSet.spark.aacsv]。未指定值参数证据$2
如何解决这个问题?。
我的代码是 -
case class aaCSV(
a: String,
b: String
)
object WorkShop {
def main(args: Array[String]) = {
val conf = new SparkConf()
.setAppName("readCSV")
.setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val customSchema = StructType(Array(
StructField("a", StringType, true),
StructField("b", StringType, true)))
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").schema(customSchema).load("/xx/vv/ss.csv")
df.printSchema()
df.show()
val googleDS = df.as[aaCSV]
googleDS.show()
}
}
现在我像这样更改了 main 函数 -
def main(args: Array[String]) = {
val conf = new SparkConf()
.setAppName("readCSV")
.setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._;
val sa = sqlContext.read.csv("/xx/vv/ss.csv").as[aaCSV]
sa.printSchema()
sa.show()
}
但它抛出错误 - 线程 "main" org.apache.spark.sql.AnalysisException 中的异常:无法解析给定输入列的“Adj_Close
”:[_c1、_c2、_c5、_c4 , _c6, _c3, _c0];第 1 行位置 7。我该怎么办?
现在我使用 spark 调度程序根据给定的时间间隔执行我的方法。但我指的是 link - https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application。请帮助我们。
尝试添加以下导入,然后再将 DF
转换为 DS
。
sc.implicits._
或
sqlContext.implicits._
有关使用数据集的更多信息https://spark.apache.org/docs/latest/sql-programming-guide.html#creating-datasets
您的 csv 文件中是否有 header(列名)?如果是,请尝试添加
.option("header","true")
在读取语句中。
例子:
sqlContext.read.option("header","true").csv("/xx/vv/ss.csv").as[aaCSV]
.
下面的博客有不同的 Dataframes 和 Dataset 示例:http://technippet.blogspot.in/2016/10/different-ways-of-creating.html
我使用 DataFrame 加载 CSV,然后转换为 DataSet,但显示如下
此行有多个标记:
- 无法找到存储在数据集中的类型的编码器。导入支持原始类型(Int、String 等)和产品类型(case 类)
spark.implicits._ 将在未来的版本中添加对序列化其他类型的支持。
- 方法参数不足:(隐含证据 $2:
org.apache.spark.sql.Encoder[DataSet.spark.aacsv])org.apache.spark.sql.Dataset[DataSet.spark.aacsv]。未指定值参数证据$2
如何解决这个问题?。 我的代码是 -
case class aaCSV(
a: String,
b: String
)
object WorkShop {
def main(args: Array[String]) = {
val conf = new SparkConf()
.setAppName("readCSV")
.setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val customSchema = StructType(Array(
StructField("a", StringType, true),
StructField("b", StringType, true)))
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").schema(customSchema).load("/xx/vv/ss.csv")
df.printSchema()
df.show()
val googleDS = df.as[aaCSV]
googleDS.show()
}
}
现在我像这样更改了 main 函数 -
def main(args: Array[String]) = {
val conf = new SparkConf()
.setAppName("readCSV")
.setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._;
val sa = sqlContext.read.csv("/xx/vv/ss.csv").as[aaCSV]
sa.printSchema()
sa.show()
}
但它抛出错误 - 线程 "main" org.apache.spark.sql.AnalysisException 中的异常:无法解析给定输入列的“Adj_Close
”:[_c1、_c2、_c5、_c4 , _c6, _c3, _c0];第 1 行位置 7。我该怎么办?
现在我使用 spark 调度程序根据给定的时间间隔执行我的方法。但我指的是 link - https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application。请帮助我们。
尝试添加以下导入,然后再将 DF
转换为 DS
。
sc.implicits._
或
sqlContext.implicits._
有关使用数据集的更多信息https://spark.apache.org/docs/latest/sql-programming-guide.html#creating-datasets
您的 csv 文件中是否有 header(列名)?如果是,请尝试添加
.option("header","true")
在读取语句中。
例子:
sqlContext.read.option("header","true").csv("/xx/vv/ss.csv").as[aaCSV]
.
下面的博客有不同的 Dataframes 和 Dataset 示例:http://technippet.blogspot.in/2016/10/different-ways-of-creating.html