如何将 maptype 转换为 Spark 中的 SparkML 稀疏向量?

How to convert a maptype into SparkML sparse vector in Spark?

我的原始模式包含许多我想在 ML 模型中使用的映射类型,因此我需要将它们转换为 SparkML 稀疏向量。

root
 |-- colA: map (nullable = true)
 |    |-- key: string
 |    |-- value: double (valueContainsNull = true)
 |-- colB: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- colC: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

上下文: SparkML 模型要求将数据形成为特征向量。有 一些生成特征向量的实用程序,但 none 支持 maptype 类型。 例如 SparkML VectorAssembler 允许组合多个列(所有数字类型、布尔类型或向量类型)。

编辑:

到目前为止,我的解决方案是将地图单独分解成列,然后使用 VectorAssembler:

val listkeysColA = df.select(explode($"colA"))
  .select($"key").as[Int].distinct.collect.sorted

val exploded= df.select(listkeysColA.map(x => 
  $"colA".getItem(x).alias(x.toString)): _*).na.fill(0) 

val columnNames = exploded.columns

val assembler = new VectorAssembler().setInputCols(columnNames).setOutputCol("features")

编辑2:

我应该补充一点,我的地图中的数据非常稀疏,并且事先没有已知的键集。这就是为什么在我当前的解决方案中,我首先传递给数据以收集和排序键。然后我使用 getItem(keyName).

访问这些值

据我所知,Spark 中没有内置方法,因此 UDF 在这种情况下是合适的解决方案。这是一个带有 Map[String, Double] 和 returns ML 向量的列:

val toVector = udf((m: Map[String, Double]) => Vectors.dense(m.values.toArray).toSparse)

由于 Map 没有顺序,因此也不能保证生成的向量具有特定顺序。

示例输入 (df):

+---------------------------------+---------------------------------+
|colA                             |colB                             |
+---------------------------------+---------------------------------+
|Map(a -> 1.0, b -> 2.0, c -> 3.0)|Map(a -> 1.0, b -> 2.0, c -> 3.0)|
+---------------------------------+---------------------------------+

并使用 UDF

val df2 = df.withColumn("colA", toVector($"colA")).withColumn("colB", toVector($"colB"))

给出以下输出:

+-------------+-------------+
|colA         |colB         |
+-------------+-------------+
|[1.0,2.0,3.0]|[1.0,2.0,3.0]|
+-------------+-------------+

其中两列都是矢量类型。

root
 |-- colA: vector (nullable = true)
 |-- colB: vector (nullable = true)

如果您想将所有列合并到一个向量中,这里使用 VectorAssembler 是合适的,如问题编辑中那样。


编辑:

如果您需要保持值的特定顺序,那么您需要像您所做的那样先收集所有键。但是,您可以避免使用 explode

val keys = df.select($"colA")
  .flatMap(_.getAs[Map[String, Int]]("colA").keys)
  .distinct
  .collect
  .sorted

然后适当更改 UDF 以考虑 keys 的顺序,默认值为 0.0:

val toVector = udf((m: Map[String, Double]) => 
  Vectors.dense(keys.map(key => m.getOrElse(key, 0.0))).toSparse
)