Spark 在 foreachRDD 操作中抛出 Not Serializable Exception
Spark throws Not Serializable Exception inside a foreachRDD operation
我正在尝试使用 Scala 和 Spark Streaming 实现观察者模式。这个想法是,每当我从流中(来自 kafka)收到一条记录时,我都会通过调用闭包内的方法 "notifyObservers" 来通知观察者。这是代码:
流由 kafka 实用程序提供。
notifyObserver 方法被定义为遵循模式规则的抽象 class。
我认为该错误与方法无法序列化这一事实有关。
我的想法正确吗?如果是,我应该遵循什么样的解决方案?
谢谢
def onMessageConsumed() = {
stream.foreachRDD(rdd => {
rdd.foreach(consumerRecord => {
val record = new Record[T](consumerRecord.topic(),
consumerRecord.value())
//notify observers with the record to compute
notifyObservers(record)
})
})
}
是的,发送给其他执行器(在foreach
等中执行)的代码中使用的类应该实现Serializable
接口。
此外,如果您的通知代码需要连接到某些资源,则需要将 foreach
包装到 foreachPartition
中,如下所示:
stream.foreachRDD(rdd => {
rdd.foreachPartition(rddPartition =>
// setup connection to external component
rddPartition.foreach(consumerRecord => {
val record = new Record[T](consumerRecord.topic(),
consumerRecord.value())
notifyObservers(record)
})
// close connection to external component
})
})
我正在尝试使用 Scala 和 Spark Streaming 实现观察者模式。这个想法是,每当我从流中(来自 kafka)收到一条记录时,我都会通过调用闭包内的方法 "notifyObservers" 来通知观察者。这是代码:
流由 kafka 实用程序提供。 notifyObserver 方法被定义为遵循模式规则的抽象 class。 我认为该错误与方法无法序列化这一事实有关。 我的想法正确吗?如果是,我应该遵循什么样的解决方案? 谢谢
def onMessageConsumed() = {
stream.foreachRDD(rdd => {
rdd.foreach(consumerRecord => {
val record = new Record[T](consumerRecord.topic(),
consumerRecord.value())
//notify observers with the record to compute
notifyObservers(record)
})
})
}
是的,发送给其他执行器(在foreach
等中执行)的代码中使用的类应该实现Serializable
接口。
此外,如果您的通知代码需要连接到某些资源,则需要将 foreach
包装到 foreachPartition
中,如下所示:
stream.foreachRDD(rdd => {
rdd.foreachPartition(rddPartition =>
// setup connection to external component
rddPartition.foreach(consumerRecord => {
val record = new Record[T](consumerRecord.topic(),
consumerRecord.value())
notifyObservers(record)
})
// close connection to external component
})
})