Spark 元组每个键获得 details/rdd
Spark Tuple get details/rdd per key
我有这些行:
(key1,Illinois|111|67342|...)
(key1,Illinois|121|67142|...)
(key2,Hawaii|113|67343|...)
(key1,Illinois|211|67442|...)
(key3,Hawaii|153|66343|...)
(key3,Ohio|193|68343|...)
(1) 如何获得唯一密钥?
(2) 如何获取每个键的行数(key1 - 3 行,key2 - 1 行,键 3 - 2 行...因此输出为:3,1,2)
(3) 如何获取每个键的行的字节大小 (5MB,2MB,3MB)
编辑 1. 这是我的新代码:
val rdd : RDD[(String, Array[String])] = ...
val rdd_res = rdd.groupByKey().map(row => (row._1, row._2.size, byteSize(row._2)))
val rddKeys = rdd_res.map(row => row._1)
val rddCount = rdd_res.map(row => row._2)
val rddByteSize = rdd_res.map(row => row._3)
如何实现字节大小?我想获取将保存到磁盘的大小。
编辑 2.
val rdd_res : RDD[(String, (Int, Int))] = rdd.aggregateByKey((0,0))((accum, value) => (accum._1 + 1, accum._2 + size(value)), (first, second) => (first._1 + second._1, first._2 + second._2))
val rdd_res_keys = rdd_res.map(row=>row._1).collect().mkString(",")
val rdd_res_count = rdd_res.map(row=>row._2).collect().map(_._1).mkString(",")
val rdd_res_bytes = rdd_res.map(row=>row._2).collect().map(_._2).mkString(",")
考虑到你有一对(键,值)RDD。
您可以使用下面的方法获取密钥和计数
rdd_res = rdd_inp.countByKey
您可以使用以下
列出键的大小
rdd_size_res = rdd_inp.groupByKey().map((a,b)=>(a,size(b)))
def size(src: List[String]):List[String] = {
src.map(a => (32 + a.length() * 2).toString())
}
请检查以上是否适用于您的场景。
对于不同的键,您应该调换顺序:
rdd.keys.distinct.collect
但是从技术上讲,您是通过将键数计算到地图中得到的...通过 countByKey
其中 returns 是 key->count
的地图
rdd.countByKey
并且,要获取字节大小,您应该查看 this SO question,因为它将取决于解码。但是,一旦您决定了尺寸方法,您就可以通过以下方式获得它:
rdd.aggregateByKey(0)((accum, value) => accum + size(value), _ + _)
或者,您可以一次完成所有操作:
rdd.aggregateByKey((0,0))((accum, value) => (accum._1 + 1, accum._2 + size(value), (first, second) => (first._1 + second._1, first._2 + second._2))
这应该产生一个 RDD[(String, (Int, Int))]
,其中元组中的第一项是键数,第二项是键大小
我有这些行:
(key1,Illinois|111|67342|...)
(key1,Illinois|121|67142|...)
(key2,Hawaii|113|67343|...)
(key1,Illinois|211|67442|...)
(key3,Hawaii|153|66343|...)
(key3,Ohio|193|68343|...)
(1) 如何获得唯一密钥?
(2) 如何获取每个键的行数(key1 - 3 行,key2 - 1 行,键 3 - 2 行...因此输出为:3,1,2)
(3) 如何获取每个键的行的字节大小 (5MB,2MB,3MB)
编辑 1. 这是我的新代码:
val rdd : RDD[(String, Array[String])] = ...
val rdd_res = rdd.groupByKey().map(row => (row._1, row._2.size, byteSize(row._2)))
val rddKeys = rdd_res.map(row => row._1)
val rddCount = rdd_res.map(row => row._2)
val rddByteSize = rdd_res.map(row => row._3)
如何实现字节大小?我想获取将保存到磁盘的大小。
编辑 2.
val rdd_res : RDD[(String, (Int, Int))] = rdd.aggregateByKey((0,0))((accum, value) => (accum._1 + 1, accum._2 + size(value)), (first, second) => (first._1 + second._1, first._2 + second._2))
val rdd_res_keys = rdd_res.map(row=>row._1).collect().mkString(",")
val rdd_res_count = rdd_res.map(row=>row._2).collect().map(_._1).mkString(",")
val rdd_res_bytes = rdd_res.map(row=>row._2).collect().map(_._2).mkString(",")
考虑到你有一对(键,值)RDD。
您可以使用下面的方法获取密钥和计数
rdd_res = rdd_inp.countByKey
您可以使用以下
列出键的大小rdd_size_res = rdd_inp.groupByKey().map((a,b)=>(a,size(b)))
def size(src: List[String]):List[String] = {
src.map(a => (32 + a.length() * 2).toString())
}
请检查以上是否适用于您的场景。
对于不同的键,您应该调换顺序:
rdd.keys.distinct.collect
但是从技术上讲,您是通过将键数计算到地图中得到的...通过 countByKey
其中 returns 是 key->count
rdd.countByKey
并且,要获取字节大小,您应该查看 this SO question,因为它将取决于解码。但是,一旦您决定了尺寸方法,您就可以通过以下方式获得它:
rdd.aggregateByKey(0)((accum, value) => accum + size(value), _ + _)
或者,您可以一次完成所有操作:
rdd.aggregateByKey((0,0))((accum, value) => (accum._1 + 1, accum._2 + size(value), (first, second) => (first._1 + second._1, first._2 + second._2))
这应该产生一个 RDD[(String, (Int, Int))]
,其中元组中的第一项是键数,第二项是键大小