使用 Spark Testing Base 库创建 Spark DataFrames 的最佳方法是什么?
What is the best way to create Spark DataFrames using Spark Testing Base library?
我正在为一种将多个数据帧作为输入参数和 returns 一个数据帧的 Spark 方法编写单元测试。 spark 方法的代码如下所示:
class processor {
def process(df1: DataFrame, df2: DataFrame): DataFrame = {
// process and return resulting data frame
}
}
现有对应单元测试代码如下:
import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.apache.spark.sql.DataFrame
import org.scalatest.{FlatSpec, Matchers}
class TestProcess extends FlatSpec with DataFrameSuiteBase with Matchers {
val p:Processor = new Processor
"process()" should "return only one row" in {
df1RDD = sc.parallelize(
Seq("a", 12, 98999),
Seq("b", 42, 99)
)
df1DF = spark.createDataFrame(df1RDD).toDF()
df2RDD = sc.parallelize(
Seq("X", 12, "foo", "spark"),
Seq("Z", 42, "bar", "storm")
)
df2DF = spark.createDataFrame(df2RDD).toDF()
val result = p.process(df1, df2)
}
it should "return spark row" in {
df1RDD = sc.parallelize(
Seq("a", 12, 98999),
Seq("b", 42, 99)
)
df1DF = spark.createDataFrame(df1RDD).toDF()
df2RDD = sc.parallelize(
Seq("X", 12, "foo", "spark"),
Seq("Z", 42, "bar", "storm")
)
df2DF = spark.createDataFrame(df2RDD).toDF()
val result = p.process(df1, df2)
}
}
此代码工作正常,但创建 RDD 和 DF 的代码在每个测试方法中重复存在问题。当我尝试在测试方法外部或 BeforeAndAfterAll() 方法内部创建 RDD 时,出现有关 sc
不可用的错误。似乎 Spark Testing Base
库仅在测试方法内部启动 sc
和 spark
变量。
我想知道是否有任何方法可以避免编写此重复代码?
在使用 WordSpec
而不是使用 FlatSpec
后更新了代码
import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.apache.spark.sql.DataFrame
import org.scalamock.scalatest.MockFactory
import org.scalatest.{Matchers, WordSpec}
class TestProcess extends WordSpec with DataFrameSuiteBase with Matchers {
val p:Processor = new Processor
"process()" should {
df1RDD = sc.parallelize(
Seq("a", 12, 98999),
Seq("b", 42, 99)
)
df1DF = spark.createDataFrame(df1RDD).toDF()
df2RDD = sc.parallelize(
Seq("X", 12, "foo", "spark"),
Seq("Z", 42, "bar", "storm")
)
df2DF = spark.createDataFrame(df2RDD).toDF()
val result = p.process(df1, df2)
"return only one row" in {
result.count should equal(1)
}
"return spark row" in {
// assertions to check if 'row' containing 'spark' in last column is in the result or not
}
}
}
使用 WordSpec
而不是 FlatSpec
,因为它允许将公共初始化分组在测试子句之前,如
"process()" should {
df1RDD = sc.parallelize(Seq("a", 12, 98999),Seq("b", 42, 99))
df1DF = spark.createDataFrame(df1RDD).toDF()
df2RDD = sc.parallelize(Seq("X", 12, "foo", "spark"), Seq("Z", 42, "bar", "storm"))
df2DF = spark.createDataFrame(df2RDD).toDF()
"return only one row" in {
....
}
"return spark row" in {
....
}
}
编辑:此外,以下两行代码很难证明使用库 (spark-testing-base) 是合理的:
val spark = SparkSession.builder.master("local[1]").getOrCreate
val sc = spark.sparkContext
将这些添加到您的 class 的顶部,并且您已设置好 SparkContext 和所有内容,并且没有 NPE。
编辑:我刚刚通过自己的测试确认 spark-testing-base 不能 与 WordSpec 配合使用。如果您仍想使用它,请考虑向库作者提交错误报告,因为这绝对是 spark-testing-base 的问题。
我正在为一种将多个数据帧作为输入参数和 returns 一个数据帧的 Spark 方法编写单元测试。 spark 方法的代码如下所示:
class processor {
def process(df1: DataFrame, df2: DataFrame): DataFrame = {
// process and return resulting data frame
}
}
现有对应单元测试代码如下:
import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.apache.spark.sql.DataFrame
import org.scalatest.{FlatSpec, Matchers}
class TestProcess extends FlatSpec with DataFrameSuiteBase with Matchers {
val p:Processor = new Processor
"process()" should "return only one row" in {
df1RDD = sc.parallelize(
Seq("a", 12, 98999),
Seq("b", 42, 99)
)
df1DF = spark.createDataFrame(df1RDD).toDF()
df2RDD = sc.parallelize(
Seq("X", 12, "foo", "spark"),
Seq("Z", 42, "bar", "storm")
)
df2DF = spark.createDataFrame(df2RDD).toDF()
val result = p.process(df1, df2)
}
it should "return spark row" in {
df1RDD = sc.parallelize(
Seq("a", 12, 98999),
Seq("b", 42, 99)
)
df1DF = spark.createDataFrame(df1RDD).toDF()
df2RDD = sc.parallelize(
Seq("X", 12, "foo", "spark"),
Seq("Z", 42, "bar", "storm")
)
df2DF = spark.createDataFrame(df2RDD).toDF()
val result = p.process(df1, df2)
}
}
此代码工作正常,但创建 RDD 和 DF 的代码在每个测试方法中重复存在问题。当我尝试在测试方法外部或 BeforeAndAfterAll() 方法内部创建 RDD 时,出现有关 sc
不可用的错误。似乎 Spark Testing Base
库仅在测试方法内部启动 sc
和 spark
变量。
我想知道是否有任何方法可以避免编写此重复代码?
在使用 WordSpec
而不是使用 FlatSpec
import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.apache.spark.sql.DataFrame
import org.scalamock.scalatest.MockFactory
import org.scalatest.{Matchers, WordSpec}
class TestProcess extends WordSpec with DataFrameSuiteBase with Matchers {
val p:Processor = new Processor
"process()" should {
df1RDD = sc.parallelize(
Seq("a", 12, 98999),
Seq("b", 42, 99)
)
df1DF = spark.createDataFrame(df1RDD).toDF()
df2RDD = sc.parallelize(
Seq("X", 12, "foo", "spark"),
Seq("Z", 42, "bar", "storm")
)
df2DF = spark.createDataFrame(df2RDD).toDF()
val result = p.process(df1, df2)
"return only one row" in {
result.count should equal(1)
}
"return spark row" in {
// assertions to check if 'row' containing 'spark' in last column is in the result or not
}
}
}
使用 WordSpec
而不是 FlatSpec
,因为它允许将公共初始化分组在测试子句之前,如
"process()" should {
df1RDD = sc.parallelize(Seq("a", 12, 98999),Seq("b", 42, 99))
df1DF = spark.createDataFrame(df1RDD).toDF()
df2RDD = sc.parallelize(Seq("X", 12, "foo", "spark"), Seq("Z", 42, "bar", "storm"))
df2DF = spark.createDataFrame(df2RDD).toDF()
"return only one row" in {
....
}
"return spark row" in {
....
}
}
编辑:此外,以下两行代码很难证明使用库 (spark-testing-base) 是合理的:
val spark = SparkSession.builder.master("local[1]").getOrCreate
val sc = spark.sparkContext
将这些添加到您的 class 的顶部,并且您已设置好 SparkContext 和所有内容,并且没有 NPE。
编辑:我刚刚通过自己的测试确认 spark-testing-base 不能 与 WordSpec 配合使用。如果您仍想使用它,请考虑向库作者提交错误报告,因为这绝对是 spark-testing-base 的问题。