并行处理 DStream 中的 RDD
Processing RDDs in a DStream in parallel
我遇到了以下在 Spark Streaming 中处理消息的代码:
val listRDD = ssc.socketTextStream(host, port)
listRDD.foreachRDD(rdd => {
rdd.foreachPartition(partition => {
// Should I start a separate thread for each RDD and/or Partition?
partition.foreach(message => {
Processor.processMessage(message)
})
})
})
这对我有用,但我不确定这是否是最好的方法。我知道一个 DStream 由 "one to many" 个 RDD 组成,但是这段代码一个接一个地顺序处理 RDD,对吧?难道没有更好的方法——方法或函数——我可以使用它来并行处理 DStream 中的所有 RDD?我应该为每个 RDD and/or 分区启动一个单独的线程吗?我是否误解了这段代码在 Spark 下的工作方式?
不知何故,我认为这段代码没有利用 Spark 中的并行性。
为了方便和高效,流被分成小的 RDD(检查 micro-batching。但你真的不需要将每个 RDD 分成多个分区,甚至不需要将流分成 RDD。
这完全取决于 Processor.processMessage
到底是什么。如果它是一个单一的转换函数,你可以只做 listRDD.map(Processor.processMessage)
然后你会得到一个流,无论处理消息的结果是什么,并行计算,你不需要做太多其他事情。
如果 Processor
是一个保持状态的可变对象(比如,计算消息的数量),那么事情就更复杂了,因为您需要定义许多这样的对象来考虑并行性,并且还需要稍后以某种方式合并结果。
我遇到了以下在 Spark Streaming 中处理消息的代码:
val listRDD = ssc.socketTextStream(host, port)
listRDD.foreachRDD(rdd => {
rdd.foreachPartition(partition => {
// Should I start a separate thread for each RDD and/or Partition?
partition.foreach(message => {
Processor.processMessage(message)
})
})
})
这对我有用,但我不确定这是否是最好的方法。我知道一个 DStream 由 "one to many" 个 RDD 组成,但是这段代码一个接一个地顺序处理 RDD,对吧?难道没有更好的方法——方法或函数——我可以使用它来并行处理 DStream 中的所有 RDD?我应该为每个 RDD and/or 分区启动一个单独的线程吗?我是否误解了这段代码在 Spark 下的工作方式?
不知何故,我认为这段代码没有利用 Spark 中的并行性。
为了方便和高效,流被分成小的 RDD(检查 micro-batching。但你真的不需要将每个 RDD 分成多个分区,甚至不需要将流分成 RDD。
这完全取决于 Processor.processMessage
到底是什么。如果它是一个单一的转换函数,你可以只做 listRDD.map(Processor.processMessage)
然后你会得到一个流,无论处理消息的结果是什么,并行计算,你不需要做太多其他事情。
如果 Processor
是一个保持状态的可变对象(比如,计算消息的数量),那么事情就更复杂了,因为您需要定义许多这样的对象来考虑并行性,并且还需要稍后以某种方式合并结果。