Spark 任务不可序列化(案例 类)
Spark Task not serializable (Case Classes)
当我使用在闭包内扩展 Serializable 的案例 class 或 class/object 时,Spark 抛出 Task not serializable。
object WriteToHbase extends Serializable {
def main(args: Array[String]) {
val csvRows: RDD[Array[String] = ...
val dateFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
val usersRDD = csvRows.map(row => {
new UserTable(row(0), row(1), row(2), row(9), row(10), row(11))
})
processUsers(sc: SparkContext, usersRDD, dateFormatter)
})
}
def processUsers(sc: SparkContext, usersRDD: RDD[UserTable], dateFormatter: DateTimeFormatter): Unit = {
usersRDD.foreachPartition(part => {
val conf = HBaseConfiguration.create()
val table = new HTable(conf, tablename)
part.foreach(userRow => {
val id = userRow.id
val date1 = dateFormatter.parseDateTime(userRow.date1)
})
table.flushCommits()
table.close()
})
}
我的第一次尝试是使用案例 class:
case class UserTable(id: String, name: String, address: String, ...) extends Serializable
我的第二次尝试是使用 class 而不是 class:
class UserTable (val id: String, val name: String, val addtess: String, ...) extends Serializable {
}
我的第三次尝试是在 class 中使用伴随对象:
object UserTable extends Serializable {
def apply(id: String, name: String, address: String, ...) = new UserTable(id, name, address, ...)
}
很可能函数 "doSomething" 是在您的 class 上定义的,它是不可序列化的。而是将 "doSomething" 函数移动到伴随对象(例如,使其成为静态的)。
是 dateFormatter,我将它放在分区循环中,现在可以使用了。
usersRDD.foreachPartition(part => {
val id = userRow.id
val dateFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
val date1 = dateFormatter.parseDateTime(userRow.date1)
})
当我使用在闭包内扩展 Serializable 的案例 class 或 class/object 时,Spark 抛出 Task not serializable。
object WriteToHbase extends Serializable {
def main(args: Array[String]) {
val csvRows: RDD[Array[String] = ...
val dateFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
val usersRDD = csvRows.map(row => {
new UserTable(row(0), row(1), row(2), row(9), row(10), row(11))
})
processUsers(sc: SparkContext, usersRDD, dateFormatter)
})
}
def processUsers(sc: SparkContext, usersRDD: RDD[UserTable], dateFormatter: DateTimeFormatter): Unit = {
usersRDD.foreachPartition(part => {
val conf = HBaseConfiguration.create()
val table = new HTable(conf, tablename)
part.foreach(userRow => {
val id = userRow.id
val date1 = dateFormatter.parseDateTime(userRow.date1)
})
table.flushCommits()
table.close()
})
}
我的第一次尝试是使用案例 class:
case class UserTable(id: String, name: String, address: String, ...) extends Serializable
我的第二次尝试是使用 class 而不是 class:
class UserTable (val id: String, val name: String, val addtess: String, ...) extends Serializable {
}
我的第三次尝试是在 class 中使用伴随对象:
object UserTable extends Serializable {
def apply(id: String, name: String, address: String, ...) = new UserTable(id, name, address, ...)
}
很可能函数 "doSomething" 是在您的 class 上定义的,它是不可序列化的。而是将 "doSomething" 函数移动到伴随对象(例如,使其成为静态的)。
是 dateFormatter,我将它放在分区循环中,现在可以使用了。
usersRDD.foreachPartition(part => {
val id = userRow.id
val dateFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
val date1 = dateFormatter.parseDateTime(userRow.date1)
})