Spark:如何使用每个分区的 mapPartition 和 create/close 连接
Spark : How to use mapPartition and create/close connection per partition
所以,我想对我的 spark DataFrame 做一些操作,将它们写入 DB 并在最后创建另一个 DataFrame。看起来像这样:
import sqlContext.implicits._
val newDF = myDF.mapPartitions(
iterator => {
val conn = new DbConnection
iterator.map(
row => {
addRowToBatch(row)
convertRowToObject(row)
})
conn.writeTheBatchToDB()
conn.close()
})
.toDF()
这给了我一个错误,因为 mapPartitions 需要 return 类型的 Iterator[NotInferedR]
,但这里是 Unit
。我知道这可以通过 forEachPartition 实现,但我也想进行映射。单独进行将是一项开销(额外的火花工作)。怎么办?
谢谢!
匿名函数实现中的最后一个表达式必须是 return 值:
import sqlContext.implicits._
val newDF = myDF.mapPartitions(
iterator => {
val conn = new DbConnection
// using toList to force eager computation - make it happen now when connection is open
val result = iterator.map(/* the same... */).toList
conn.writeTheBatchToDB()
conn.close()
result.iterator
}
).toDF()
在大多数情况下,如果不减慢作业速度,急于使用迭代器将导致执行失败。因此,我所做的是检查迭代器是否已经为空,然后执行清理例程。
rdd.mapPartitions(itr => {
val conn = new DbConnection
itr.map(data => {
val yourActualResult = // do something with your data and conn here
if(itr.isEmpty) conn.close // close the connection
yourActualResult
})
})
起初以为这是一个 spark 问题,但实际上是一个 scala 问题。 http://www.scala-lang.org/api/2.12.0/scala/collection/Iterator.html#isEmpty:Boolean
所以,我想对我的 spark DataFrame 做一些操作,将它们写入 DB 并在最后创建另一个 DataFrame。看起来像这样:
import sqlContext.implicits._
val newDF = myDF.mapPartitions(
iterator => {
val conn = new DbConnection
iterator.map(
row => {
addRowToBatch(row)
convertRowToObject(row)
})
conn.writeTheBatchToDB()
conn.close()
})
.toDF()
这给了我一个错误,因为 mapPartitions 需要 return 类型的 Iterator[NotInferedR]
,但这里是 Unit
。我知道这可以通过 forEachPartition 实现,但我也想进行映射。单独进行将是一项开销(额外的火花工作)。怎么办?
谢谢!
匿名函数实现中的最后一个表达式必须是 return 值:
import sqlContext.implicits._
val newDF = myDF.mapPartitions(
iterator => {
val conn = new DbConnection
// using toList to force eager computation - make it happen now when connection is open
val result = iterator.map(/* the same... */).toList
conn.writeTheBatchToDB()
conn.close()
result.iterator
}
).toDF()
在大多数情况下,如果不减慢作业速度,急于使用迭代器将导致执行失败。因此,我所做的是检查迭代器是否已经为空,然后执行清理例程。
rdd.mapPartitions(itr => {
val conn = new DbConnection
itr.map(data => {
val yourActualResult = // do something with your data and conn here
if(itr.isEmpty) conn.close // close the connection
yourActualResult
})
})
起初以为这是一个 spark 问题,但实际上是一个 scala 问题。 http://www.scala-lang.org/api/2.12.0/scala/collection/Iterator.html#isEmpty:Boolean