为什么在 StatefulNetworkWordCount in spark streaming 示例中需要 HashPartioner?
Why is HashPartioner needed in the StatefulNetworkWordCount in spark streaming example?
我的问题是关于 StatefulNetworkWordCount 示例的:
Q1) stateDstream RDD 是由驱动程序或工作节点维护的,还是每个工作节点都有自己的完整状态 rdd 的本地副本?
Q2) 为什么我们需要在下面一行中使用 HashPartitioner :
val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc,
new HashPartitioner (ssc.sparkContext.defaultParallelism), true, initialRDD)
幕后发生了什么?
回答你的两个问题:
1) DStream
生产的 RDD
分布在工人中。与非流式类似,这意味着 DStream
生成的每个 RDD
的记录分布在整个集群中(这就是分区在这里很重要的原因)。
2) 分区在这种情况下很重要,因为它决定了每个 RDD
迭代中的记录如何拆分。特别是对于像 updateStateByKey()
这样的转换,您往往会看到 RDD
的键在不同的批次间隔中保持不变。所以这里不言而喻,如果我们来自每个区间 RDD
的键排列在相同的分区中,这个函数可以更有效地工作并且可以更新一个键 within 的状态.
例如,让我们看看您链接的字数统计程序。让我们考虑两个一秒间隔的 RDD
(rdd1
在 t=1 和 rdd2
在 t=2)。假设生成的 rdd1
是针对文本 "hello world"
而生成的 rdd2
也看到了文本 "hello I'm world"
。如果没有分区,每个 RDD
的记录可以发送到不同工作人员的不同分区(t=1 处的 "hello"
和 t=2 处的 "hello"
可以发送到不同的位置)。这意味着对计数状态的更新将需要在每次迭代中重新排列记录以获得更新的计数。定义分区程序(并按照参数之一的指示记住!),我们将在同一分区中看到键 "hello"
和 "world"
,从而避免随机播放,并创建更高效的更新。
这里还需要注意的是,因为键可以改变,所以有一个参数可以切换是否记住分区程序。
我的问题是关于 StatefulNetworkWordCount 示例的:
Q1) stateDstream RDD 是由驱动程序或工作节点维护的,还是每个工作节点都有自己的完整状态 rdd 的本地副本?
Q2) 为什么我们需要在下面一行中使用 HashPartitioner :
val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc,
new HashPartitioner (ssc.sparkContext.defaultParallelism), true, initialRDD)
幕后发生了什么?
回答你的两个问题:
1) DStream
生产的 RDD
分布在工人中。与非流式类似,这意味着 DStream
生成的每个 RDD
的记录分布在整个集群中(这就是分区在这里很重要的原因)。
2) 分区在这种情况下很重要,因为它决定了每个 RDD
迭代中的记录如何拆分。特别是对于像 updateStateByKey()
这样的转换,您往往会看到 RDD
的键在不同的批次间隔中保持不变。所以这里不言而喻,如果我们来自每个区间 RDD
的键排列在相同的分区中,这个函数可以更有效地工作并且可以更新一个键 within 的状态.
例如,让我们看看您链接的字数统计程序。让我们考虑两个一秒间隔的 RDD
(rdd1
在 t=1 和 rdd2
在 t=2)。假设生成的 rdd1
是针对文本 "hello world"
而生成的 rdd2
也看到了文本 "hello I'm world"
。如果没有分区,每个 RDD
的记录可以发送到不同工作人员的不同分区(t=1 处的 "hello"
和 t=2 处的 "hello"
可以发送到不同的位置)。这意味着对计数状态的更新将需要在每次迭代中重新排列记录以获得更新的计数。定义分区程序(并按照参数之一的指示记住!),我们将在同一分区中看到键 "hello"
和 "world"
,从而避免随机播放,并创建更高效的更新。
这里还需要注意的是,因为键可以改变,所以有一个参数可以切换是否记住分区程序。