状态管理不可序列化

State management not serializable

在我的应用程序中,我想跟踪多个状态。因此,我尝试将整个状态管理逻辑封装在 class StateManager 中,如下所示:

@SerialVersionUID(xxxxxxxL)
class StateManager(
    inputStream: DStream[(String, String)],
    initialState: RDD[(String, String)]
) extends Serializable {
  lazy val state = inputStream.mapWithState(stateSpec).map(_.get)
  lazy val stateSpec = StateSpec
    .function(trackStateFunc _)
    .initialState(initialState)
    .timeout(Seconds(30))
  def trackStateFunc(key: String, value: Option[String], state: State[String]): Option[(String, String)] = {}
}

object StateManager { def apply(dstream: DStream[(String, String)], initialstate: RDD[(String, String)]) = new StateManager(_dStream, _initialState) }

@SerialVersionUID(xxxxxxxL) ... extends Serializable试图解决我的问题。

但是当从我的主 class 调用 StateManager 时,如下所示:

val lStreamingContext = StreamingEnvironment(streamingWindow, checkpointDirectory)
val statemanager= StateManager(lStreamingEnvironment.sparkContext, 1, None)
val state= statemanager.state(lKafkaStream)

state.foreachRDD(_.foreach(println))

StreamingEnvironment见下文),我得到:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
[...]
Caused by: java.io.NotSerializableException: Object of org.apache.spark.streaming.kafka.DirectKafkaInputDStream is being serialized  possibly as a part of closure of an RDD operation. This is because  the DStream object is being referred to from within the closure.  Please rewrite the RDD operation inside this DStream to avoid this.  This has been enforced to avoid bloating of Spark tasks  with unnecessary objects.

错误很明显,但我仍然不明白它触发了什么。

它在哪里触发? 我该怎么做才能解决这个问题并获得可重复使用的 class?


可能有用的 StreamingEnvironment class:

class StreamingEnvironment(mySparkConf: SparkConf, myKafkaConf: KafkaConf, myStreamingWindow: Duration, myCheckPointDirectory: String) {
  val sparkContext = spark.SparkContext.getOrCreate(mySparkConf)
  lazy val streamingContext = new StreamingContext(sparkContext , mMicrobatchPeriod)

  streamingContext.checkpoint(mCheckPointDirectory)
  streamingContext.remember(Minutes(1))

  def stream() = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, myKafkaConf.mBrokers, myKafkaConf.mTopics)
}

object StreamingEnvironment {
  def apply(streamingWindow: Duration, checkpointDirectory: String) = {
    //setup sparkConf and kafkaConf

    new StreamingEnvironment(sparkConf , kafkaConf, streamingWindow, checkpointDirectory)
  }
}

当我们将一个方法提升到一个函数中时,outer 对父 class 的引用将成为该函数引用的一部分,如下所示:function(trackStateFunc _)trackStateFunc 直接声明为函数(即作为 val)可能会解决问题。

另请注意,标记 class Serializable 并不能使它神奇地变成这样。 DStream 不可序列化,应注释为 @transient,这也可能会解决问题。