单元测试 Avro 通用反序列化
Unit testing Avro Generic deseralisation
在我的 spark 作业中,我将数据集保存为 Avro 格式:
val ds: Dataset[Foo] = ...
ds.write.mode(SaveMode.Overwrite).avro(path)
在下游应用程序中,我使用自定义反序列化器解析 Avro 文件:
def foo(record: GenericRecord): Foo = Foo(
bar = record.get("bar").asInstanceOf[Int]
baz = record.get("baz").asInstanceOf[Utf8].toString
)
我想为自定义解析器编写规范,而不必将 Spark 引入下游应用程序的依赖项中。但我不确定如何在 Spark 中模拟序列化行为。 Spark 源在 Avro 本身之外有很多自定义处理。
我能否可靠地模拟 Spark Avro 序列化以用作我的规范的输入?
你不需要模拟序列化,你只需要创建一个GenericRecord
:
val gr = new GenericRecordBuilder(schema)
.put("bar", 10)
.put("baz", "bat")
.build
foo(gr) shouldBe Foo(10, "baz")
这是一个在非 spark 环境中模拟 Spark 的 avro auto-schema 生成并测试 ser/de.
的规范
private val sconf = new SparkConf().set("spark.sql.avro.compression.codec", "snappy")
implicit val spark: SparkSession =
SparkSession.builder().appName("SparkTest").master("local[*]").config(sconf).getOrCreate()
implicit def sc: SparkContext = spark.sparkContext
"a foo" should {
"serialise to/from avro" in {
val foo = Foo("bar")
val tempPath = "./foo_avro"
val rdd = sc.parallelize(List(foo), 1)
val jsonRdd = rdd.map(_.toJson)
spark.read.json(jsonRdd).write.mode(SaveMode.Overwrite).avro(tempPath)
val readItBack = sc.newAPIHadoopFile(tempPath,
classOf[AvroKeyInputFormat[GenericRecord]],
classOf[AvroKey[GenericRecord]],
classOf[NullWritable])
val foos: Set[Foo] =
readItBack.keys.map(x => FooAvroParser(x.datum()).parse).collect().toSet
FileUtils.deleteDirectory(new File(tempPath))
foos mustEqual Set(foo)
}.set(minTestsOk = 5)
build.sbt
libraryDependencies ++= Seq(
"com.databricks" %% "spark-avro" % "3.2.0" % "test" exclude ("org.apache.avro", "avro"),
"org.apache.spark" %% "spark-sql" % "2.1.1" % "test",
"org.apache.avro" % "avro" % "1.7.7"
)
dependencyOverrides ++= Set(
"com.fasterxml.jackson.core" % "jackson-core" % "2.8.7" % "test",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.8.7" % "test",
"com.fasterxml.jackson.module" % "jackson-module-scala_2.11" % "2.8.7" % "test"
)
在我的 spark 作业中,我将数据集保存为 Avro 格式:
val ds: Dataset[Foo] = ...
ds.write.mode(SaveMode.Overwrite).avro(path)
在下游应用程序中,我使用自定义反序列化器解析 Avro 文件:
def foo(record: GenericRecord): Foo = Foo(
bar = record.get("bar").asInstanceOf[Int]
baz = record.get("baz").asInstanceOf[Utf8].toString
)
我想为自定义解析器编写规范,而不必将 Spark 引入下游应用程序的依赖项中。但我不确定如何在 Spark 中模拟序列化行为。 Spark 源在 Avro 本身之外有很多自定义处理。
我能否可靠地模拟 Spark Avro 序列化以用作我的规范的输入?
你不需要模拟序列化,你只需要创建一个GenericRecord
:
val gr = new GenericRecordBuilder(schema)
.put("bar", 10)
.put("baz", "bat")
.build
foo(gr) shouldBe Foo(10, "baz")
这是一个在非 spark 环境中模拟 Spark 的 avro auto-schema 生成并测试 ser/de.
的规范 private val sconf = new SparkConf().set("spark.sql.avro.compression.codec", "snappy")
implicit val spark: SparkSession =
SparkSession.builder().appName("SparkTest").master("local[*]").config(sconf).getOrCreate()
implicit def sc: SparkContext = spark.sparkContext
"a foo" should {
"serialise to/from avro" in {
val foo = Foo("bar")
val tempPath = "./foo_avro"
val rdd = sc.parallelize(List(foo), 1)
val jsonRdd = rdd.map(_.toJson)
spark.read.json(jsonRdd).write.mode(SaveMode.Overwrite).avro(tempPath)
val readItBack = sc.newAPIHadoopFile(tempPath,
classOf[AvroKeyInputFormat[GenericRecord]],
classOf[AvroKey[GenericRecord]],
classOf[NullWritable])
val foos: Set[Foo] =
readItBack.keys.map(x => FooAvroParser(x.datum()).parse).collect().toSet
FileUtils.deleteDirectory(new File(tempPath))
foos mustEqual Set(foo)
}.set(minTestsOk = 5)
build.sbt
libraryDependencies ++= Seq(
"com.databricks" %% "spark-avro" % "3.2.0" % "test" exclude ("org.apache.avro", "avro"),
"org.apache.spark" %% "spark-sql" % "2.1.1" % "test",
"org.apache.avro" % "avro" % "1.7.7"
)
dependencyOverrides ++= Set(
"com.fasterxml.jackson.core" % "jackson-core" % "2.8.7" % "test",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.8.7" % "test",
"com.fasterxml.jackson.module" % "jackson-module-scala_2.11" % "2.8.7" % "test"
)