单元测试 Avro 通用反序列化

Unit testing Avro Generic deseralisation

在我的 spark 作业中,我将数据集保存为 Avro 格式:

val ds: Dataset[Foo] = ...
ds.write.mode(SaveMode.Overwrite).avro(path)

在下游应用程序中,我使用自定义反序列化器解析 Avro 文件:

def foo(record: GenericRecord): Foo = Foo(
  bar = record.get("bar").asInstanceOf[Int]
  baz = record.get("baz").asInstanceOf[Utf8].toString
)

我想为自定义解析器编写规范,而不必将 Spark 引入下游应用程序的依赖项中。但我不确定如何在 Spark 中模拟序列化行为。 Spark 源在 Avro 本身之外有很多自定义处理。

我能否可靠地模拟 Spark Avro 序列化以用作我的规范的输入?

你不需要模拟序列化,你只需要创建一个GenericRecord:

 val gr = new GenericRecordBuilder(schema)
   .put("bar", 10)
   .put("baz", "bat")
   .build

 foo(gr) shouldBe Foo(10, "baz")

这是一个在非 spark 环境中模拟 Spark 的 avro auto-schema 生成并测试 ser/de.

的规范
  private val sconf = new SparkConf().set("spark.sql.avro.compression.codec", "snappy")

  implicit val spark: SparkSession =
    SparkSession.builder().appName("SparkTest").master("local[*]").config(sconf).getOrCreate()

  implicit def sc: SparkContext = spark.sparkContext

  "a foo" should {
    "serialise to/from avro" in {
      val foo = Foo("bar")
      val tempPath = "./foo_avro"
      val rdd = sc.parallelize(List(foo), 1)
      val jsonRdd = rdd.map(_.toJson)
      spark.read.json(jsonRdd).write.mode(SaveMode.Overwrite).avro(tempPath)

      val readItBack = sc.newAPIHadoopFile(tempPath,
                                           classOf[AvroKeyInputFormat[GenericRecord]],
                                           classOf[AvroKey[GenericRecord]],
                                           classOf[NullWritable])

      val foos: Set[Foo] =
        readItBack.keys.map(x => FooAvroParser(x.datum()).parse).collect().toSet
      FileUtils.deleteDirectory(new File(tempPath))

      foos mustEqual Set(foo)
    }.set(minTestsOk = 5)

build.sbt

libraryDependencies ++= Seq(
  "com.databricks" %% "spark-avro" % "3.2.0" % "test" exclude ("org.apache.avro", "avro"),
  "org.apache.spark" %% "spark-sql" % "2.1.1" % "test",
  "org.apache.avro" % "avro" % "1.7.7"
)
dependencyOverrides ++= Set(
  "com.fasterxml.jackson.core" % "jackson-core" % "2.8.7" % "test",
  "com.fasterxml.jackson.core" % "jackson-databind" % "2.8.7" % "test",
  "com.fasterxml.jackson.module" % "jackson-module-scala_2.11" % "2.8.7" % "test"
)