SQLITE_ERROR: Connection is closed when connecting from Spark via JDBC to SQLite database

SQLITE_ERROR: Connection is closed when connecting from Spark via JDBC to SQLite database

我正在使用 Apache Spark 1.5.1 并尝试连接到名为 clinton.db 的本地 SQLite 数据库。从数据库的 table 创建数据框工作正常,但是当我对创建的对象执行一些操作时,我收到下面的错误 "SQL error or missing database (Connection is closed)"。有趣的是,我仍然得到了操作的结果。知道我可以做些什么来解决问题,即避免错误吗?

spark-shell的启动命令:

../spark/bin/spark-shell --master local[8] --jars ../libraries/sqlite-jdbc-3.8.11.1.jar --classpath ../libraries/sqlite-jdbc-3.8.11.1.jar

正在从数据库读取:

val emails = sqlContext.read.format("jdbc").options(Map("url" -> "jdbc:sqlite:../data/clinton.sqlite", "dbtable" -> "Emails")).load()

简单计数(失败):

emails.count

错误:

15/09/30 09:06:39 WARN JDBCRDD: Exception closing statement java.sql.SQLException: [SQLITE_ERROR] SQL error or missing database (Connection is closed) at org.sqlite.core.DB.newSQLException(DB.java:890) at org.sqlite.core.CoreStatement.internalClose(CoreStatement.java:109) at org.sqlite.jdbc3.JDBC3Statement.close(JDBC3Statement.java:35) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anon.org$apache$spark$sql$execution$datasources$jdbc$JDBCRDD$$anon$$close(JDBCRDD.scala:454) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anon$$anonfun.apply(JDBCRDD.scala:358) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anon$$anonfun.apply(JDBCRDD.scala:358) at org.apache.spark.TaskContextImpl$$anon.onTaskCompletion(TaskContextImpl.scala:60) at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted.apply(TaskContextImpl.scala:79) at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted.apply(TaskContextImpl.scala:77) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:77) at org.apache.spark.scheduler.Task.run(Task.scala:90) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) res1: Long = 7945

我得到了同样的错误,重要的一行就在异常之前:

15/11/30 12:13:02 INFO jdbc.JDBCRDD: closed connection

15/11/30 12:13:02 WARN jdbc.JDBCRDD: Exception closing statement java.sql.SQLException: [SQLITE_ERROR] SQL error or missing database (Connection is closed) at org.sqlite.core.DB.newSQLException(DB.java:890) at org.sqlite.core.CoreStatement.internalClose(CoreStatement.java:109) at org.sqlite.jdbc3.JDBC3Statement.close(JDBC3Statement.java:35) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anon.org$apache$spark$sql$execution$datasources$jdbc$JDBCRDD$$anon$$close(JDBCRDD.scala:454)

所以Spark成功关闭了JDBC连接,然后又无法关闭JDBC语句


查看源码,close()被调用了两次:

第 358 行(org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD,Spark 1.5.1)

context.addTaskCompletionListener{ context => close() }

第 469 行

override def hasNext: Boolean = {
  if (!finished) {
    if (!gotNext) {
      nextValue = getNext()
      if (finished) {
        close()
      }
      gotNext = true
    }
  }
  !finished
}

如果您查看 close() 方法(第 443 行)

def close() {
  if (closed) return

您可以看到它检查了变量 closed,但该值从未设置为 true。

如果我没看错的话,这个bug还在master中。我已经提交了 bug report.