mapPartitions compile error: missing parameter type

mapPartitions compile error: missing parameter type

我正在尝试使用 Learning Spark:

书中的模式从包含 JSON 条记录的 Kafka 源中读取流
import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import java.util.Date

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.databind.DeserializationFeature

val spark = SparkSession.builder.appName("StreamingRetailTransactions").config("master", "local").getOrCreate()


val df = spark.readStream.
                format("kafka").
                option("kafka.bootstrap.servers", ...).
                option("subscribe", "transactions_load").
                option("kafka.security.protocol", "SASL_SSL").
                ...
                load()

case class Invoice(
                    invoiceNo: Int,
                    stockCode: Int,
                    description: String,
                    ...
                    storeId: Int,
                    transactionId: String
                  )

然后...

val df2 = df.selectExpr("CAST(value AS String)").as[String]

val df3 = df2.mapPartitions(records => {

  val mapper = new ObjectMapper with ScalaObjectMapper
  mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
  mapper.registerModule(DefaultScalaModule)

  records.flatMap(record => {
    try {
      Some(mapper.readValue(record, classOf[Invoice]))
    } catch {
      case e: Exception => None
    }
  })
}, true)

val query = df3.writeStream.format("console").start()

但是,我 运行 关注这个问题:

df2: org.apache.spark.sql.Dataset[String] = [value: string]
<console>:63: error: missing parameter type
       val df3 = df2.mapPartitions(records => {
                                   ^

有什么想法吗?

方法 mapPartitions 只接受一个函数:

func: Iterator[T] => Iterator[U]

作为参数。

去掉末尾的true试试:

val df3 = df2.mapPartitions(records => {

    val mapper = new ObjectMapper with ScalaObjectMapper
    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
    mapper.registerModule(DefaultScalaModule)

    records.flatMap(record => {
      try {
        Some(mapper.readValue(record, classOf[Invoice]))
      } catch {
        case _: Exception => None
      }
    })
  })