在不重启批处理的情况下刷新 spark streaming 中的缓存值

Refresh cached values in spark streaming without reboot the batch

也许问题太简单了,至少看起来是这样,但我有以下问题:

一个。在spark streaming进程中执行spark submit。

 ccc.foreachRDD(rdd => {
          rdd.repartition(20).foreachPartition(p => {

               val repo = getReposX
               t foreach (g => {
    .................

乙。 getReposX 是一个函数,它在 mongoDB 中进行查询,恢复一个 Map 机智 key/value 在每个进程的执行者中都是必需的。

C.进入 foreach 中的每个 g 我管理这张地图 "cached"

问题是当 mongo 集合中发生任何变化时,我没有观察或没有检测到变化,因此我正在管理未更新的地图。我的问题是:我怎样才能得到它?是的,我知道如果我重新启动 spark-submit 并再次执行驱动程序是可以的,但否则我将永远不会在我的地图中看到更新。

有什么想法或建议吗? 问候。

最后我开发了一个解决方案。首先,我更详细地解释了这个问题,因为我真正想知道的是如何实现一个对象或 "cache",它每隔一段时间就会刷新一次,或者按某种顺序刷新,而不需要重新启动 spark streaming process,也就是活着刷新。

在我的例子中,这个 "cache" 或刷新的对象是一个连接到 mongoDB 集合的对象(Singleton),以恢复每个执行程序使用并缓存在内存中的 HashMap作为一个好的单身人士。这样做的问题是,一旦执行了 spark streaming 提交,它就会将该对象缓存在内存中,但除非进程重新启动,否则它不会刷新。把一个广播想象成变量达到1000时刷新的计数器模式,但是这些都是只读的,不能修改。想到一个计数器,但是这些只能被驱动读取。

最后我的解决方案是,在加载 mongo 集合和缓存的对象的初始化块中,我实现了这个:

    //Initialization Block
      {
          val ex = new ScheduledThreadPoolExecutor(1)
          val task = new Runnable {
            def run() = {
              logger.info("Refresh - Inicialization")
              initCache
            }
          }
          val f = ex.scheduleAtFixedRate(task, 0, TIME_REFRES, TimeUnit.SECONDS)
       }


initCache is nothing more than a function that connects mongo and loads a collection:

    var cache = mutable.HashMap[String,Description]()
    def initCache():mutable.HashMap[String, Description]={
           val serverAddresses = Functions.getMongoServers(SERVER, PORT)

        val mongoConnectionFactory = new MongoCollectionFactory(serverAddresses,DATABASE,COLLECTION,USERNAME,PASSWORD)

        val collection = mongoConnectionFactory.getMongoCollection()

        val docs = collection.find.iterator()
        cache.clear()
        while (docs.hasNext) {

          var doc = docs.next

          cache.put(...............
        }

        cache
      }

这样,spark streaming submit启动后,每个task会多开一个task,每隔X次(我的情况是1或2小时)刷新单例集合的值,并且始终恢复实例化的值:

def getCache():mutable.HashMap[String, Description]={
      cache
}