从地图数组的 RDD 中获取最大的地图
Get largest Map from an RDD of Array of Maps
我在这样的 RDD 中有一个映射数组:
Map("id" -> 1, "name" -> "punit")
Map("id" -> 2, "name" -> "naik", "ph_no" -> 123123)
现在我的目标是将这个地图数组写入 CSV 文件,如下所示:
id,ph_no,name
1,,punit
2,123123,naik
ID 1 未提供 ph_no,这就是它在 CSV 中为空的原因。所以我想遍历这个RDD并找到最大的Map,这样我就可以通过提取它的键来命名header中的所有字段。
用 Scala 术语来说就是:
val x = Array(Map("id" -> 1, "name" -> "punit"),Map("id" -> 2, "name" -> "naik", "ph_no" -> 123123)).maxBy(_.size)
这将正确地给我:
res0: scala.collection.immutable.Map[String,Any] = Map(id -> 2, name -> naik, ph_no -> 123123)
我该怎么做?
您可以使用 .max()
,指定按地图大小排序。
scala> val rdd = sc.parallelize(Array(Map("id" -> 1, "name" -> "punit"),Map("id" -> 2, "name" -> "naik", "ph_no" -> 123123)))
rdd: org.apache.spark.rdd.RDD[scala.collection.immutable.Map[String,Any]] = ParallelCollectionRDD[0] at parallelize at <console>:27
scala> val maxMap = rdd.max()(Ordering.by(_.size))
maxMap: scala.collection.immutable.Map[String,Any] = Map(id -> 2, name -> naik, ph_no -> 123123)
顺便说一下,由于您使用的是 CSV 文件,因此您可能会对使用 spark-csv
感兴趣。
寻找最大尺寸的 Map
元素可能不够准确,因为其中 none 可能拥有所有数据(从示例判断)。
您可以通过合并映射中所有不同的键来获取 headers 的列表。
类似于:
val rddOfMaps:RDD[Map[String,Any]] = sc.parallelize(Seq(Map("a"->1, "b"->2, "d"->3),Map("a"->2, "c"->4, "e" -> 1)))
val headers = rddOfMaps.flatMap(entry => entry.keySet).distinct.collect
val csvData = rddOfMaps.map(entry => header.map(column => entry.get(column).getOrElse("")).mkString(","))
// 1,2,,3,
// 2,,4,,1
我在这样的 RDD 中有一个映射数组:
Map("id" -> 1, "name" -> "punit")
Map("id" -> 2, "name" -> "naik", "ph_no" -> 123123)
现在我的目标是将这个地图数组写入 CSV 文件,如下所示:
id,ph_no,name
1,,punit
2,123123,naik
ID 1 未提供 ph_no,这就是它在 CSV 中为空的原因。所以我想遍历这个RDD并找到最大的Map,这样我就可以通过提取它的键来命名header中的所有字段。
用 Scala 术语来说就是:
val x = Array(Map("id" -> 1, "name" -> "punit"),Map("id" -> 2, "name" -> "naik", "ph_no" -> 123123)).maxBy(_.size)
这将正确地给我:
res0: scala.collection.immutable.Map[String,Any] = Map(id -> 2, name -> naik, ph_no -> 123123)
我该怎么做?
您可以使用 .max()
,指定按地图大小排序。
scala> val rdd = sc.parallelize(Array(Map("id" -> 1, "name" -> "punit"),Map("id" -> 2, "name" -> "naik", "ph_no" -> 123123)))
rdd: org.apache.spark.rdd.RDD[scala.collection.immutable.Map[String,Any]] = ParallelCollectionRDD[0] at parallelize at <console>:27
scala> val maxMap = rdd.max()(Ordering.by(_.size))
maxMap: scala.collection.immutable.Map[String,Any] = Map(id -> 2, name -> naik, ph_no -> 123123)
顺便说一下,由于您使用的是 CSV 文件,因此您可能会对使用 spark-csv
感兴趣。
寻找最大尺寸的 Map
元素可能不够准确,因为其中 none 可能拥有所有数据(从示例判断)。
您可以通过合并映射中所有不同的键来获取 headers 的列表。
类似于:
val rddOfMaps:RDD[Map[String,Any]] = sc.parallelize(Seq(Map("a"->1, "b"->2, "d"->3),Map("a"->2, "c"->4, "e" -> 1)))
val headers = rddOfMaps.flatMap(entry => entry.keySet).distinct.collect
val csvData = rddOfMaps.map(entry => header.map(column => entry.get(column).getOrElse("")).mkString(","))
// 1,2,,3,
// 2,,4,,1