覆盖 Spark 数据框架构
Overwrite Spark dataframe schema
后期编辑:
基于此 article 似乎 Spark 无法编辑 RDD 或列。必须使用新类型创建一个新类型并删除旧类型。下面建议的 for 循环和 .withColumn 方法似乎是完成工作的最简单方法。
原问题:
有没有一种简单的方法(对于人和机器)将多列转换为不同的数据类型?
我尝试手动定义架构,然后使用此架构从 parquet 文件加载数据并将其保存到另一个文件,但我每次都得到 "Job aborted."..."Task failed while writing rows"每个DF。对我来说有点容易,对 Spark 来说有点费力......而且它不起作用。
另一个选项正在使用:
df = df.withColumn("new_col", df("old_col").cast(type)).drop("old_col").withColumnRenamed("new_col", "old_col")
我需要做更多的工作,因为有将近 100 列,而且如果 Spark 必须复制内存中的每一列,那么这听起来也不是最佳选择。有没有更简单的方法?
根据转换规则的复杂程度,您可以使用此循环完成您的要求:
scala> var df = Seq((1,2),(3,4)).toDF("a", "b")
df: org.apache.spark.sql.DataFrame = [a: int, b: int]
scala> df.show
+---+---+
| a| b|
+---+---+
| 1| 2|
| 3| 4|
+---+---+
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
scala> > df.columns.foreach{c => df = df.withColumn(c, df(c).cast(DoubleType))}
scala> df.show
+---+---+
| a| b|
+---+---+
|1.0|2.0|
|3.0|4.0|
+---+---+
这应该与任何其他列操作一样有效。
后期编辑: 基于此 article 似乎 Spark 无法编辑 RDD 或列。必须使用新类型创建一个新类型并删除旧类型。下面建议的 for 循环和 .withColumn 方法似乎是完成工作的最简单方法。
原问题: 有没有一种简单的方法(对于人和机器)将多列转换为不同的数据类型?
我尝试手动定义架构,然后使用此架构从 parquet 文件加载数据并将其保存到另一个文件,但我每次都得到 "Job aborted."..."Task failed while writing rows"每个DF。对我来说有点容易,对 Spark 来说有点费力......而且它不起作用。
另一个选项正在使用:
df = df.withColumn("new_col", df("old_col").cast(type)).drop("old_col").withColumnRenamed("new_col", "old_col")
我需要做更多的工作,因为有将近 100 列,而且如果 Spark 必须复制内存中的每一列,那么这听起来也不是最佳选择。有没有更简单的方法?
根据转换规则的复杂程度,您可以使用此循环完成您的要求:
scala> var df = Seq((1,2),(3,4)).toDF("a", "b")
df: org.apache.spark.sql.DataFrame = [a: int, b: int]
scala> df.show
+---+---+
| a| b|
+---+---+
| 1| 2|
| 3| 4|
+---+---+
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
scala> > df.columns.foreach{c => df = df.withColumn(c, df(c).cast(DoubleType))}
scala> df.show
+---+---+
| a| b|
+---+---+
|1.0|2.0|
|3.0|4.0|
+---+---+
这应该与任何其他列操作一样有效。