zkClient 不能被序列化,sparkstreaming 将 kafka 偏移量写入 zookeeper
zkClient can not be Serializabled , sparkstreaming write kafka offset to zookeeper
我的项目包括 ZooKeeper、Kafka 和 Spark Streaming。问题是当我尝试使用 Spark Streaming 将 Kafka 偏移量写入 ZooKeeper 时无法序列化 zkClient
。我看过几个 GitHub 项目,例如:https://github.com/ippontech/spark-kafka-source
//save the offsets
kafkaStream.foreachRDD(rdd => offsetsStore.saveOffsets(topic, rdd))
def saveOffsets(topic: String, rdd: RDD[_]): Unit = {
logger.info("Saving offsets to ZooKeeper")
val stopwatch = new Stopwatch()
val offsetsRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetsRanges.foreach(offsetRange => logger.debug(s"Using ${offsetRange}"))
val offsetsRangesStr = offsetsRanges.map(offsetRange => s"${offsetRange.partition}:${offsetRange.fromOffset}").mkString(",")
logger.debug(s"Writing offsets to ZooKeeper: ${offsetsRangesStr}")
**ZkUtils.updatePersistentPath(zkClient, zkPath, offsetsRangesStr)**
logger.info("Done updating offsets in ZooKeeper. Took " + stopwatch)
}
如代码:kafkaStream.foreachRDD(rdd => offsetsStore.saveOffsets(rdd))
将在对象 offsetStore
中的驱动程序 private val zkClient = new ZkClient(zkHosts, 30000, 30000,ZKStringSerializer)
中执行,但无法序列化 zkClient
它是如何工作的?
你可以定义zkClient
为@transient lazy val
,这意味着它不会在驱动程序和执行程序之间序列化(这是@transient
部分),而是重新初始化,在包含上述代码的 class 的每个实例中(这是 lazy
部分)。
您可以在此处阅读有关此模式的更多信息:
http://fdahms.com/2015/10/14/scala-and-the-transient-lazy-val-pattern/
我的项目包括 ZooKeeper、Kafka 和 Spark Streaming。问题是当我尝试使用 Spark Streaming 将 Kafka 偏移量写入 ZooKeeper 时无法序列化 zkClient
。我看过几个 GitHub 项目,例如:https://github.com/ippontech/spark-kafka-source
//save the offsets
kafkaStream.foreachRDD(rdd => offsetsStore.saveOffsets(topic, rdd))
def saveOffsets(topic: String, rdd: RDD[_]): Unit = {
logger.info("Saving offsets to ZooKeeper")
val stopwatch = new Stopwatch()
val offsetsRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetsRanges.foreach(offsetRange => logger.debug(s"Using ${offsetRange}"))
val offsetsRangesStr = offsetsRanges.map(offsetRange => s"${offsetRange.partition}:${offsetRange.fromOffset}").mkString(",")
logger.debug(s"Writing offsets to ZooKeeper: ${offsetsRangesStr}")
**ZkUtils.updatePersistentPath(zkClient, zkPath, offsetsRangesStr)**
logger.info("Done updating offsets in ZooKeeper. Took " + stopwatch)
}
如代码:kafkaStream.foreachRDD(rdd => offsetsStore.saveOffsets(rdd))
将在对象 offsetStore
中的驱动程序 private val zkClient = new ZkClient(zkHosts, 30000, 30000,ZKStringSerializer)
中执行,但无法序列化 zkClient
它是如何工作的?
你可以定义zkClient
为@transient lazy val
,这意味着它不会在驱动程序和执行程序之间序列化(这是@transient
部分),而是重新初始化,在包含上述代码的 class 的每个实例中(这是 lazy
部分)。
您可以在此处阅读有关此模式的更多信息: http://fdahms.com/2015/10/14/scala-and-the-transient-lazy-val-pattern/