在 Apache Spark 1.3 中向数据框追加一列
Append a column to Data Frame in Apache Spark 1.3
向数据框添加列是否可行?最有效的方法是什么?
更具体地说,列可以作为现有数据框的行 ID。
在简化的情况下,从文件中读取而不对其进行标记化,我可以想到如下所示的内容(在 Scala 中),但它完成时出现错误(在第 3 行),而且无论如何看起来都不是最好的可能的路线:
var dataDF = sc.textFile("path/file").toDF()
val rowDF = sc.parallelize(1 to DataDF.count().toInt).toDF("ID")
dataDF = dataDF.withColumn("ID", rowDF("ID"))
我发布问题已经有一段时间了,似乎其他人也想得到答案。以下是我找到的内容。
所以最初的任务是将带有行标识符的列(基本上,一个序列 1 to numRows
)附加到任何给定的数据框,以便可以跟踪行 order/presence(例如,当您采样时).这可以通过以下方式实现:
sqlContext.textFile(file).
zipWithIndex().
map(case(d, i)=>i.toString + delimiter + d).
map(_.split(delimiter)).
map(s=>Row.fromSeq(s.toSeq))
关于将任何列附加到任何数据框的一般情况:
Spark API 中此功能的 "closest" 是 withColumn
和 withColumnRenamed
。根据Scala docs,前Returns一个新的DataFrame,增加一列。在我看来,这是一个有点混乱和不完整的定义。这两个函数只能在 this
数据帧上运行,即给定两个数据帧 df1
和 df2
列 col
:
val df = df1.withColumn("newCol", df1("col") + 1) // -- OK
val df = df1.withColumn("newCol", df2("col") + 1) // -- FAIL
因此,除非您能够设法将现有数据框中的列转换为您需要的形状,否则您不能使用 withColumn
或 withColumnRenamed
来附加任意列(独立或其他数据框) ).
正如上面评论的那样,变通解决方案可能是使用 join
- 这会很混乱,尽管可能 - 将上面带有 zipWithIndex
的唯一键附加到两个数据框或专栏可能会起作用。虽然效率是...
很明显,将列附加到数据框对于分布式环境来说并不是一项简单的功能,而且可能根本没有非常有效、简洁的方法。但我认为,即使有性能警告,让这个核心功能可用仍然非常重要。
我从上面的答案中得到了帮助。但是,如果我们想要更改 DataFrame
并且当前的 API 在 Spark 1.6
中几乎没有什么不同,我发现它不完整。
zipWithIndex()
returns Tuple
of (Row, Long)
其中包含每一行和相应的索引。我们可以根据需要使用它来创建新的 Row
。
val rdd = df.rdd.zipWithIndex()
.map(indexedRow => Row.fromSeq(indexedRow._2.toString +: indexedRow._1.toSeq))
val newstructure = StructType(Seq(StructField("Row number", StringType, true)).++(df.schema.fields))
sqlContext.createDataFrame(rdd, newstructure ).show
希望对您有所帮助。
不确定它是否适用于 spark 1.3 但在 spark 1.5 中我使用 withColumn:
import sqlContext.implicits._
import org.apache.spark.sql.functions._
df.withColumn("newName",lit("newValue"))
当我需要使用与数据框的现有列无关的值时,我会使用它
这与@NehaM 的回答相似但更简单
您可以使用如下所示的 row_number with Window function 来获取数据框中每一行的不同 ID。
df.withColumn("ID", row_number() over Window.orderBy("any column name in the dataframe"))
您也可以使用 monotonically_increasing_id
与
相同
df.withColumn("ID", monotonically_increasing_id())
还有一些other ways。
向数据框添加列是否可行?最有效的方法是什么?
更具体地说,列可以作为现有数据框的行 ID。
在简化的情况下,从文件中读取而不对其进行标记化,我可以想到如下所示的内容(在 Scala 中),但它完成时出现错误(在第 3 行),而且无论如何看起来都不是最好的可能的路线:
var dataDF = sc.textFile("path/file").toDF()
val rowDF = sc.parallelize(1 to DataDF.count().toInt).toDF("ID")
dataDF = dataDF.withColumn("ID", rowDF("ID"))
我发布问题已经有一段时间了,似乎其他人也想得到答案。以下是我找到的内容。
所以最初的任务是将带有行标识符的列(基本上,一个序列 1 to numRows
)附加到任何给定的数据框,以便可以跟踪行 order/presence(例如,当您采样时).这可以通过以下方式实现:
sqlContext.textFile(file).
zipWithIndex().
map(case(d, i)=>i.toString + delimiter + d).
map(_.split(delimiter)).
map(s=>Row.fromSeq(s.toSeq))
关于将任何列附加到任何数据框的一般情况:
Spark API 中此功能的 "closest" 是 withColumn
和 withColumnRenamed
。根据Scala docs,前Returns一个新的DataFrame,增加一列。在我看来,这是一个有点混乱和不完整的定义。这两个函数只能在 this
数据帧上运行,即给定两个数据帧 df1
和 df2
列 col
:
val df = df1.withColumn("newCol", df1("col") + 1) // -- OK
val df = df1.withColumn("newCol", df2("col") + 1) // -- FAIL
因此,除非您能够设法将现有数据框中的列转换为您需要的形状,否则您不能使用 withColumn
或 withColumnRenamed
来附加任意列(独立或其他数据框) ).
正如上面评论的那样,变通解决方案可能是使用 join
- 这会很混乱,尽管可能 - 将上面带有 zipWithIndex
的唯一键附加到两个数据框或专栏可能会起作用。虽然效率是...
很明显,将列附加到数据框对于分布式环境来说并不是一项简单的功能,而且可能根本没有非常有效、简洁的方法。但我认为,即使有性能警告,让这个核心功能可用仍然非常重要。
我从上面的答案中得到了帮助。但是,如果我们想要更改 DataFrame
并且当前的 API 在 Spark 1.6
中几乎没有什么不同,我发现它不完整。
zipWithIndex()
returns Tuple
of (Row, Long)
其中包含每一行和相应的索引。我们可以根据需要使用它来创建新的 Row
。
val rdd = df.rdd.zipWithIndex()
.map(indexedRow => Row.fromSeq(indexedRow._2.toString +: indexedRow._1.toSeq))
val newstructure = StructType(Seq(StructField("Row number", StringType, true)).++(df.schema.fields))
sqlContext.createDataFrame(rdd, newstructure ).show
希望对您有所帮助。
不确定它是否适用于 spark 1.3 但在 spark 1.5 中我使用 withColumn:
import sqlContext.implicits._
import org.apache.spark.sql.functions._
df.withColumn("newName",lit("newValue"))
当我需要使用与数据框的现有列无关的值时,我会使用它
这与@NehaM 的回答相似但更简单
您可以使用如下所示的 row_number with Window function 来获取数据框中每一行的不同 ID。
df.withColumn("ID", row_number() over Window.orderBy("any column name in the dataframe"))
您也可以使用 monotonically_increasing_id
与
df.withColumn("ID", monotonically_increasing_id())
还有一些other ways。