如何在 Spark 中按键对 RDD 进行分区?
How to partition RDD by key in Spark?
鉴于 HashPartitioner 文档说:
[HashPartitioner] implements hash-based partitioning using Java's
Object.hashCode.
假设我想按 kind
对 DeviceData
进行分区。
case class DeviceData(kind: String, time: Long, data: String)
通过覆盖 deviceData.hashCode()
方法并仅使用 kind
的哈希码对 RDD[DeviceData]
进行分区是否正确?
但是考虑到 HashPartitioner
需要多个分区参数,我很困惑是否需要提前知道种类的数量,如果种类多于分区会怎样?
如果我将分区数据写入磁盘,它在读取时将保持分区状态是否正确?
我的目标是打电话给
deviceDataRdd.foreachPartition(d: Iterator[DeviceData] => ...)
并且在迭代器中只有 DeviceData
具有相同的 kind
值。
使用 kind
做一个 groupByKey
怎么样?或者另一种 PairRDDFunctions
方法。
你让我觉得你并不真正关心分区,只是你在一个处理流程中获得所有特定种类的东西?
配对函数允许这样做:
rdd.keyBy(_.kind).partitionBy(new HashPartitioner(PARTITIONS))
.foreachPartition(...)
但是,您可能会更安全一些:
rdd.keyBy(_.kind).reduceByKey(....)
或 mapValues
或其他一些配对函数,保证您获得完整的片段
Would it be correct to partition an RDD[DeviceData] by overwriting the deviceData.hashCode() method and use only the hashcode of kind?
不会的。如果您查看 Java Object.hashCode
文档,您将找到有关 hashCode
总合同的以下信息:
If two objects are equal according to the equals(Object) method, then calling the hashCode method on each of the two objects must produce the same integer result.
因此,除非纯粹基于 kind
设备的平等概念适合您的用例,而且我严重怀疑它是否适合,否则修改 HashCode
以获得所需的分区是一个坏主意。在一般情况下,您应该 implement your own partitioner 但这里不是必需的。
因为,除了 SQL 和 GraphX 中的特殊情况,partitionBy
仅在 PairRDD
上有效,因此创建 RDD[(String, DeviceData)]
并使用普通 HashPartitioner
deviceDataRdd.map(dev => (dev.kind, dev)).partitionBy(new HashPartitioner(n))
请记住,在 kind
具有低基数或高度偏斜分布的情况下,使用它进行分区可能不是最佳解决方案。
鉴于 HashPartitioner 文档说:
[HashPartitioner] implements hash-based partitioning using Java's Object.hashCode.
假设我想按 kind
对 DeviceData
进行分区。
case class DeviceData(kind: String, time: Long, data: String)
通过覆盖 deviceData.hashCode()
方法并仅使用 kind
的哈希码对 RDD[DeviceData]
进行分区是否正确?
但是考虑到 HashPartitioner
需要多个分区参数,我很困惑是否需要提前知道种类的数量,如果种类多于分区会怎样?
如果我将分区数据写入磁盘,它在读取时将保持分区状态是否正确?
我的目标是打电话给
deviceDataRdd.foreachPartition(d: Iterator[DeviceData] => ...)
并且在迭代器中只有 DeviceData
具有相同的 kind
值。
使用 kind
做一个 groupByKey
怎么样?或者另一种 PairRDDFunctions
方法。
你让我觉得你并不真正关心分区,只是你在一个处理流程中获得所有特定种类的东西?
配对函数允许这样做:
rdd.keyBy(_.kind).partitionBy(new HashPartitioner(PARTITIONS))
.foreachPartition(...)
但是,您可能会更安全一些:
rdd.keyBy(_.kind).reduceByKey(....)
或 mapValues
或其他一些配对函数,保证您获得完整的片段
Would it be correct to partition an RDD[DeviceData] by overwriting the deviceData.hashCode() method and use only the hashcode of kind?
不会的。如果您查看 Java Object.hashCode
文档,您将找到有关 hashCode
总合同的以下信息:
If two objects are equal according to the equals(Object) method, then calling the hashCode method on each of the two objects must produce the same integer result.
因此,除非纯粹基于 kind
设备的平等概念适合您的用例,而且我严重怀疑它是否适合,否则修改 HashCode
以获得所需的分区是一个坏主意。在一般情况下,您应该 implement your own partitioner 但这里不是必需的。
因为,除了 SQL 和 GraphX 中的特殊情况,partitionBy
仅在 PairRDD
上有效,因此创建 RDD[(String, DeviceData)]
并使用普通 HashPartitioner
deviceDataRdd.map(dev => (dev.kind, dev)).partitionBy(new HashPartitioner(n))
请记住,在 kind
具有低基数或高度偏斜分布的情况下,使用它进行分区可能不是最佳解决方案。