mapPartitions compile error: missing parameter type
mapPartitions compile error: missing parameter type
我正在尝试使用 Learning Spark:
书中的模式从包含 JSON 条记录的 Kafka 源中读取流
import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import java.util.Date
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.databind.DeserializationFeature
val spark = SparkSession.builder.appName("StreamingRetailTransactions").config("master", "local").getOrCreate()
val df = spark.readStream.
format("kafka").
option("kafka.bootstrap.servers", ...).
option("subscribe", "transactions_load").
option("kafka.security.protocol", "SASL_SSL").
...
load()
case class Invoice(
invoiceNo: Int,
stockCode: Int,
description: String,
...
storeId: Int,
transactionId: String
)
然后...
val df2 = df.selectExpr("CAST(value AS String)").as[String]
val df3 = df2.mapPartitions(records => {
val mapper = new ObjectMapper with ScalaObjectMapper
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.registerModule(DefaultScalaModule)
records.flatMap(record => {
try {
Some(mapper.readValue(record, classOf[Invoice]))
} catch {
case e: Exception => None
}
})
}, true)
val query = df3.writeStream.format("console").start()
但是,我 运行 关注这个问题:
df2: org.apache.spark.sql.Dataset[String] = [value: string]
<console>:63: error: missing parameter type
val df3 = df2.mapPartitions(records => {
^
有什么想法吗?
方法 mapPartitions 只接受一个函数:
func: Iterator[T] => Iterator[U]
作为参数。
去掉末尾的true
试试:
val df3 = df2.mapPartitions(records => {
val mapper = new ObjectMapper with ScalaObjectMapper
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.registerModule(DefaultScalaModule)
records.flatMap(record => {
try {
Some(mapper.readValue(record, classOf[Invoice]))
} catch {
case _: Exception => None
}
})
})
我正在尝试使用 Learning Spark:
书中的模式从包含 JSON 条记录的 Kafka 源中读取流import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import java.util.Date
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.databind.DeserializationFeature
val spark = SparkSession.builder.appName("StreamingRetailTransactions").config("master", "local").getOrCreate()
val df = spark.readStream.
format("kafka").
option("kafka.bootstrap.servers", ...).
option("subscribe", "transactions_load").
option("kafka.security.protocol", "SASL_SSL").
...
load()
case class Invoice(
invoiceNo: Int,
stockCode: Int,
description: String,
...
storeId: Int,
transactionId: String
)
然后...
val df2 = df.selectExpr("CAST(value AS String)").as[String]
val df3 = df2.mapPartitions(records => {
val mapper = new ObjectMapper with ScalaObjectMapper
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.registerModule(DefaultScalaModule)
records.flatMap(record => {
try {
Some(mapper.readValue(record, classOf[Invoice]))
} catch {
case e: Exception => None
}
})
}, true)
val query = df3.writeStream.format("console").start()
但是,我 运行 关注这个问题:
df2: org.apache.spark.sql.Dataset[String] = [value: string]
<console>:63: error: missing parameter type
val df3 = df2.mapPartitions(records => {
^
有什么想法吗?
方法 mapPartitions 只接受一个函数:
func: Iterator[T] => Iterator[U]
作为参数。
去掉末尾的true
试试:
val df3 = df2.mapPartitions(records => {
val mapper = new ObjectMapper with ScalaObjectMapper
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.registerModule(DefaultScalaModule)
records.flatMap(record => {
try {
Some(mapper.readValue(record, classOf[Invoice]))
} catch {
case _: Exception => None
}
})
})