Spark:每个 Spark RDD 分区的数据库连接并执行 mapPartition

Spark : DB connection per Spark RDD partition and do mapPartition

我想在我的 spark rdd 上做一个 mapPartitions,

    val newRd = myRdd.mapPartitions(
      partition => {

        val connection = new DbConnection /*creates a db connection per partition*/

        val newPartition = partition.map(
           record => {
             readMatchingFromDB(record, connection)
         })
        connection.close()
        newPartition
      })

但是,这给了我一个连接已经关闭的异常,正如预期的那样,因为在控件到达 .map() 之前,我的 connection 已关闭。我想为每个 RDD 分区创建一个连接,并正确关闭它。我怎样才能做到这一点?

谢谢!

如讨论中所述 - 问题源于迭代器 partition 上 map 操作的惰性。这种懒惰意味着对于每个分区,都会创建并关闭一个连接,并且只有在稍后(当 RDD 被执行时), readMatchingFromDB 才会被调用。

要解决这个问题,您应该在关闭连接之前强制遍历迭代器,例如通过将其转换为列表(然后返回):

val newRd = myRdd.mapPartitions(partition => {
  val connection = new DbConnection /*creates a db connection per partition*/

  val newPartition = partition.map(record => {
    readMatchingFromDB(record, connection)
  }).toList // consumes the iterator, thus calls readMatchingFromDB 

  connection.close()
  newPartition.iterator // create a new iterator
})
rdd.foreachPartitionAsync(iterator->{

// this object will be cached inside each executor JVM. For the first time, the //connection will be created and hence forward, it will be reused. 
// Very useful for streaming apps
DBConn conn=DBConn.getConnection()
while(iterator.hasNext()) {
  conn.read();
}

});

public class DBConn{
private static dbObj=null;

//Create a singleton method that returns only one instance of this object
}

}