Spark 在 foreachRDD 操作中抛出 Not Serializable Exception

Spark throws Not Serializable Exception inside a foreachRDD operation

我正在尝试使用 Scala 和 Spark Streaming 实现观察者模式。这个想法是,每当我从流中(来自 kafka)收到一条记录时,我都会通过调用闭包内的方法 "notifyObservers" 来通知观察者。这是代码:

流由 kafka 实用程序提供。 notifyObserver 方法被定义为遵循模式规则的抽象 class。 我认为该错误与方法无法序列化这一事实有关。 我的想法正确吗?如果是,我应该遵循什么样的解决方案? 谢谢

def onMessageConsumed() = {
    stream.foreachRDD(rdd => {
      rdd.foreach(consumerRecord => {
        val record = new Record[T](consumerRecord.topic(), 
                                   consumerRecord.value())
        //notify observers with the record to compute
        notifyObservers(record)
      })
    })
  }

是的,发送给其他执行器(在foreach等中执行)的代码中使用的类应该实现Serializable接口。

此外,如果您的通知代码需要连接到某些资源,则需要将 foreach 包装到 foreachPartition 中,如下所示:

stream.foreachRDD(rdd => {
   rdd.foreachPartition(rddPartition =>
      // setup connection to external component      
      rddPartition.foreach(consumerRecord => {
        val record = new Record[T](consumerRecord.topic(), 
                                   consumerRecord.value())
        notifyObservers(record)
      })
      // close connection to external component
   })
  })