分区如何在 Spark Streaming 中工作?
How do partitions work in Spark Streaming?
我正在致力于提高 Spark 流应用程序的性能。
分区在流媒体环境中的工作方式。是等同于将文件加载到 spark 中还是始终只创建一个分区,使其仅在执行程序的一个核心中工作?
在 Spark Streaming(非结构化)中,分区的工作方式与您在使用 RDD
s 时所知道的完全一样。您可以使用
轻松检查分区数
rdd.getNumPartitions
正如您还标记了 spark-streaming-kafka 值得一提的是,您的输入 DStream 中的分区数将与您正在访问的 Kafka 主题中的分区数相匹配消费。
通常,对于 RDD,有 HashPartitioner and the RangePartitioner 可用的重新分区策略。您可以使用 HashPartitioner
by
rdd.partitionBy(new HashPartitioner(2))
其中 rdd
是 键值对 RDD,2
是分区数。
与结构化API相比,RDD 还具有应用自定义分区器的优势。为此,您可以扩展 Partitioner
class 并重写方法 numPartitions
和 getPartitions
,如下例所示:
import org.apache.spark.Partitioner
class TablePartitioner extends Partitioner {
override def numPartitions: Int = 2
override def getPartition(key: Any): Int = {
val tableName = key.asInstanceOf[String]
if(tableName == "foo") 0 // partition count start at 0
else 1
}
}
我正在致力于提高 Spark 流应用程序的性能。
分区在流媒体环境中的工作方式。是等同于将文件加载到 spark 中还是始终只创建一个分区,使其仅在执行程序的一个核心中工作?
在 Spark Streaming(非结构化)中,分区的工作方式与您在使用 RDD
s 时所知道的完全一样。您可以使用
rdd.getNumPartitions
正如您还标记了 spark-streaming-kafka 值得一提的是,您的输入 DStream 中的分区数将与您正在访问的 Kafka 主题中的分区数相匹配消费。
通常,对于 RDD,有 HashPartitioner and the RangePartitioner 可用的重新分区策略。您可以使用 HashPartitioner
by
rdd.partitionBy(new HashPartitioner(2))
其中 rdd
是 键值对 RDD,2
是分区数。
与结构化API相比,RDD 还具有应用自定义分区器的优势。为此,您可以扩展 Partitioner
class 并重写方法 numPartitions
和 getPartitions
,如下例所示:
import org.apache.spark.Partitioner
class TablePartitioner extends Partitioner {
override def numPartitions: Int = 2
override def getPartition(key: Any): Int = {
val tableName = key.asInstanceOf[String]
if(tableName == "foo") 0 // partition count start at 0
else 1
}
}