将 Spark DataFrame 转换为 HashMap

Convert Spark DataFrame to HashMap

我有一个如下所示的数据框:

product1 product2 difference
123      456      0.5
123      789      1
456      789      0
456      123      0.5
789      123      1
789      456      0

我想要这样的输出:

{'123': {'456': 0.5, '789': 1}, 456: {'123': 0.5, '789': 1}, '789': {'123': 1, '456': 0}}

到目前为止,我已经尝试了 zipWithIndexcollectAsMap,但没有成功。

到目前为止我尝试过的代码是:

val tples: RDD[(Int, (Int, Double))] = (products.rdd
  .map(r => (r(0).toString.toDouble.toInt, (r(1).toString.toDouble.toInt, r(2).toString.toDouble))))
val lst: = tpls.groupByKey().map(r => (r._1, r._2.toSeq))

这为我提供了产品和差异列表,而不是散列图

如果我没有正确理解你的问题,你想要这样的东西:

val myRdd = sc.makeRDD(List(
  (123, (456, 0.5)), 
  (123, (789, 1.0)), 
  (456, (789, 0.0)), 
  (456, (123, 0.5)), 
  (789, (123, 1.0)), 
  (789, (456, 0.0))
))


val myHashMap = myRdd.groupByKey.mapValues(_.toMap).collect.toMap

// gives:
// scala.collection.immutable.Map[Int,scala.collection.immutable.Map[Int,Double]] = 
//   Map(
//     456 -> Map(789 -> 0.0, 123 -> 0.5), 
//     789 -> Map(123 -> 1.0, 456 -> 0.0), 
//     123 -> Map(456 -> 0.5, 789 -> 1.0)
//   )

简要说明:groupByKey 给你像 (123, Seq((456, 0.5), (789, 1.0))) 这样的元组。您要将第二个组件 ("values") 转换为地图,因此调用 mapValues(_.toMap)。然后(如果你真的想将集合加载到你的节点并将其转换为本地,非分布式地图),你必须调用collect。这实质上为您提供了 (Int, Map[Int, Double]) 类型的元组列表。现在您可以调用此集合上的 toMap 来获取地图的地图。

可以先将dataframe转为RDD,转为key-value类型,再进行groupByKey。要获得想要的 Map 形式的结果,您需要 collect 分组的 RDD(因此对于大型数据集可能不可行):

val df = Seq(
  (123, 456, 0.5),
  (123, 789, 1.0),
  (456, 789, 0.0),
  (456, 123, 0.5),
  (789, 123, 1.0),
  (789, 456, 0.0)
).toDF("product1", "product2", "difference")

import org.apache.spark.sql.Row

val groupedRDD = df.rdd.map{
    case Row(p1: Int, p2: Int, diff: Double) => (p1, (p2, diff))
  }.
  groupByKey.mapValues(_.toMap)

groupedRDD.collectAsMap
// res1: scala.collection.immutable.Map[Any,scala.collection.immutable.Map[Int,Double]] = Map(
//   456 -> Map(789 -> 0.0, 123 -> 0.5), 789 -> Map(123 -> 1.0, 456 -> 0.0), 123 -> Map(456 -> 0.5, 789 -> 1.0)
// )