使用 Spark Dataframe scala 将多个不同的列转换为 Map 列
Converting multiple different columns to Map column with Spark Dataframe scala
我有一个包含列的数据框:user, address1, address2, address3, phone1, phone2
等等。
我想将此数据框转换为 - user, address, phone where address = Map("address1" -> address1.value, "address2" -> address2.value, "address3" -> address3.value)
我能够使用以下方法将列转换为地图:
val mapData = List("address1", "address2", "address3")
df.map(_.getValuesMap[Any](mapData))
但我不确定如何将其添加到我的 df。
我是 spark 和 scala 的新手,在这里真的需要一些帮助。
Spark >= 2.0
您可以跳过udf
并使用map
(Python中的create_map
)SQL函数:
import org.apache.spark.sql.functions.map
df.select(
map(mapData.map(c => lit(c) :: col(c) :: Nil).flatten: _*).alias("a_map")
)
Spark < 2.0
据我所知,没有直接的方法。您可以像这样使用 UDF:
import org.apache.spark.sql.functions.{udf, array, lit, col}
val df = sc.parallelize(Seq(
(1L, "addr1", "addr2", "addr3")
)).toDF("user", "address1", "address2", "address3")
val asMap = udf((keys: Seq[String], values: Seq[String]) =>
keys.zip(values).filter{
case (k, null) => false
case _ => true
}.toMap)
val keys = array(mapData.map(lit): _*)
val values = array(mapData.map(col): _*)
val dfWithMap = df.withColumn("address", asMap(keys, values))
另一个不需要 UDF 的选项是构造字段而不是映射:
val dfWithStruct = df.withColumn("address", struct(mapData.map(col): _*))
最大的优点是可以轻松处理不同类型的值
我有一个包含列的数据框:user, address1, address2, address3, phone1, phone2
等等。
我想将此数据框转换为 - user, address, phone where address = Map("address1" -> address1.value, "address2" -> address2.value, "address3" -> address3.value)
我能够使用以下方法将列转换为地图:
val mapData = List("address1", "address2", "address3")
df.map(_.getValuesMap[Any](mapData))
但我不确定如何将其添加到我的 df。
我是 spark 和 scala 的新手,在这里真的需要一些帮助。
Spark >= 2.0
您可以跳过udf
并使用map
(Python中的create_map
)SQL函数:
import org.apache.spark.sql.functions.map
df.select(
map(mapData.map(c => lit(c) :: col(c) :: Nil).flatten: _*).alias("a_map")
)
Spark < 2.0
据我所知,没有直接的方法。您可以像这样使用 UDF:
import org.apache.spark.sql.functions.{udf, array, lit, col}
val df = sc.parallelize(Seq(
(1L, "addr1", "addr2", "addr3")
)).toDF("user", "address1", "address2", "address3")
val asMap = udf((keys: Seq[String], values: Seq[String]) =>
keys.zip(values).filter{
case (k, null) => false
case _ => true
}.toMap)
val keys = array(mapData.map(lit): _*)
val values = array(mapData.map(col): _*)
val dfWithMap = df.withColumn("address", asMap(keys, values))
另一个不需要 UDF 的选项是构造字段而不是映射:
val dfWithStruct = df.withColumn("address", struct(mapData.map(col): _*))
最大的优点是可以轻松处理不同类型的值