模拟 SparkSession 进行单元测试
Mocking SparkSession for unit testing
我的 spark 应用程序中有一个方法可以从 MySQL 数据库加载数据。该方法看起来像这样。
trait DataManager {
val session: SparkSession
def loadFromDatabase(input: Input): DataFrame = {
session.read.jdbc(input.jdbcUrl, s"(${input.selectQuery}) T0",
input.columnName, 0L, input.maxId, input.parallelism, input.connectionProperties)
}
}
该方法除了执行 jdbc
方法并从数据库加载数据外,什么都不做。我怎样才能测试这个方法?标准方法是创建对象 session
的模拟,它是 SparkSession
的实例。但是由于 SparkSession
有一个私有构造函数,我无法使用 ScalaMock 模拟它。
这里的主要问题是我的函数是一个纯副作用函数(副作用是从关系数据库中提取数据)并且我如何对这个函数进行单元测试,因为我在模拟时遇到了问题SparkSession
.
那么有什么方法可以模拟 SparkSession
或比模拟更好的方法来测试此方法吗?
对于你的情况,我建议不要模拟 SparkSession。这或多或少会模拟整个函数(无论如何你都可以这样做)。如果您想测试此功能,我的建议是 运行 一个嵌入式数据库(如 H2)并使用真正的 SparkSession。为此,您需要向 DataManager
提供 SparkSession。
未经测试的草图:
您的代码:
class DataManager (session: SparkSession) {
def loadFromDatabase(input: Input): DataFrame = {
session.read.jdbc(input.jdbcUrl, s"(${input.selectQuery}) T0",
input.columnName, 0L, input.maxId, input.parallelism, input.connectionProperties)
}
}
你的test-case:
class DataManagerTest extends FunSuite with BeforeAndAfter {
override def beforeAll() {
Connection conn = DriverManager.getConnection("jdbc:h2:~/test", "sa", "");
// your insert statements goes here
conn.close()
}
test ("should load data from database") {
val dm = DataManager(SparkSession.builder().getOrCreate())
val input = Input(jdbcUrl = "jdbc:h2:~/test", selectQuery="SELECT whateveryounedd FROM whereeveryouputit ")
val expectedData = dm.loadFromDatabase(input)
assert(//expectedData)
}
}
您可以使用 mockito scala 来模拟 SparkSession,如 this article 所示。
我的 spark 应用程序中有一个方法可以从 MySQL 数据库加载数据。该方法看起来像这样。
trait DataManager {
val session: SparkSession
def loadFromDatabase(input: Input): DataFrame = {
session.read.jdbc(input.jdbcUrl, s"(${input.selectQuery}) T0",
input.columnName, 0L, input.maxId, input.parallelism, input.connectionProperties)
}
}
该方法除了执行 jdbc
方法并从数据库加载数据外,什么都不做。我怎样才能测试这个方法?标准方法是创建对象 session
的模拟,它是 SparkSession
的实例。但是由于 SparkSession
有一个私有构造函数,我无法使用 ScalaMock 模拟它。
这里的主要问题是我的函数是一个纯副作用函数(副作用是从关系数据库中提取数据)并且我如何对这个函数进行单元测试,因为我在模拟时遇到了问题SparkSession
.
那么有什么方法可以模拟 SparkSession
或比模拟更好的方法来测试此方法吗?
对于你的情况,我建议不要模拟 SparkSession。这或多或少会模拟整个函数(无论如何你都可以这样做)。如果您想测试此功能,我的建议是 运行 一个嵌入式数据库(如 H2)并使用真正的 SparkSession。为此,您需要向 DataManager
提供 SparkSession。
未经测试的草图:
您的代码:
class DataManager (session: SparkSession) {
def loadFromDatabase(input: Input): DataFrame = {
session.read.jdbc(input.jdbcUrl, s"(${input.selectQuery}) T0",
input.columnName, 0L, input.maxId, input.parallelism, input.connectionProperties)
}
}
你的test-case:
class DataManagerTest extends FunSuite with BeforeAndAfter {
override def beforeAll() {
Connection conn = DriverManager.getConnection("jdbc:h2:~/test", "sa", "");
// your insert statements goes here
conn.close()
}
test ("should load data from database") {
val dm = DataManager(SparkSession.builder().getOrCreate())
val input = Input(jdbcUrl = "jdbc:h2:~/test", selectQuery="SELECT whateveryounedd FROM whereeveryouputit ")
val expectedData = dm.loadFromDatabase(input)
assert(//expectedData)
}
}
您可以使用 mockito scala 来模拟 SparkSession,如 this article 所示。