在不重启批处理的情况下刷新 spark streaming 中的缓存值
Refresh cached values in spark streaming without reboot the batch
也许问题太简单了,至少看起来是这样,但我有以下问题:
一个。在spark streaming进程中执行spark submit。
ccc.foreachRDD(rdd => {
rdd.repartition(20).foreachPartition(p => {
val repo = getReposX
t foreach (g => {
.................
乙。 getReposX 是一个函数,它在 mongoDB 中进行查询,恢复一个 Map 机智 key/value 在每个进程的执行者中都是必需的。
C.进入 foreach 中的每个 g 我管理这张地图 "cached"
问题是当 mongo 集合中发生任何变化时,我没有观察或没有检测到变化,因此我正在管理未更新的地图。我的问题是:我怎样才能得到它?是的,我知道如果我重新启动 spark-submit 并再次执行驱动程序是可以的,但否则我将永远不会在我的地图中看到更新。
有什么想法或建议吗?
问候。
最后我开发了一个解决方案。首先,我更详细地解释了这个问题,因为我真正想知道的是如何实现一个对象或 "cache",它每隔一段时间就会刷新一次,或者按某种顺序刷新,而不需要重新启动 spark streaming process,也就是活着刷新。
在我的例子中,这个 "cache" 或刷新的对象是一个连接到 mongoDB 集合的对象(Singleton),以恢复每个执行程序使用并缓存在内存中的 HashMap作为一个好的单身人士。这样做的问题是,一旦执行了 spark streaming 提交,它就会将该对象缓存在内存中,但除非进程重新启动,否则它不会刷新。把一个广播想象成变量达到1000时刷新的计数器模式,但是这些都是只读的,不能修改。想到一个计数器,但是这些只能被驱动读取。
最后我的解决方案是,在加载 mongo 集合和缓存的对象的初始化块中,我实现了这个:
//Initialization Block
{
val ex = new ScheduledThreadPoolExecutor(1)
val task = new Runnable {
def run() = {
logger.info("Refresh - Inicialization")
initCache
}
}
val f = ex.scheduleAtFixedRate(task, 0, TIME_REFRES, TimeUnit.SECONDS)
}
initCache is nothing more than a function that connects mongo and loads a collection:
var cache = mutable.HashMap[String,Description]()
def initCache():mutable.HashMap[String, Description]={
val serverAddresses = Functions.getMongoServers(SERVER, PORT)
val mongoConnectionFactory = new MongoCollectionFactory(serverAddresses,DATABASE,COLLECTION,USERNAME,PASSWORD)
val collection = mongoConnectionFactory.getMongoCollection()
val docs = collection.find.iterator()
cache.clear()
while (docs.hasNext) {
var doc = docs.next
cache.put(...............
}
cache
}
这样,spark streaming submit启动后,每个task会多开一个task,每隔X次(我的情况是1或2小时)刷新单例集合的值,并且始终恢复实例化的值:
def getCache():mutable.HashMap[String, Description]={
cache
}
也许问题太简单了,至少看起来是这样,但我有以下问题:
一个。在spark streaming进程中执行spark submit。
ccc.foreachRDD(rdd => {
rdd.repartition(20).foreachPartition(p => {
val repo = getReposX
t foreach (g => {
.................
乙。 getReposX 是一个函数,它在 mongoDB 中进行查询,恢复一个 Map 机智 key/value 在每个进程的执行者中都是必需的。
C.进入 foreach 中的每个 g 我管理这张地图 "cached"
问题是当 mongo 集合中发生任何变化时,我没有观察或没有检测到变化,因此我正在管理未更新的地图。我的问题是:我怎样才能得到它?是的,我知道如果我重新启动 spark-submit 并再次执行驱动程序是可以的,但否则我将永远不会在我的地图中看到更新。
有什么想法或建议吗? 问候。
最后我开发了一个解决方案。首先,我更详细地解释了这个问题,因为我真正想知道的是如何实现一个对象或 "cache",它每隔一段时间就会刷新一次,或者按某种顺序刷新,而不需要重新启动 spark streaming process,也就是活着刷新。
在我的例子中,这个 "cache" 或刷新的对象是一个连接到 mongoDB 集合的对象(Singleton),以恢复每个执行程序使用并缓存在内存中的 HashMap作为一个好的单身人士。这样做的问题是,一旦执行了 spark streaming 提交,它就会将该对象缓存在内存中,但除非进程重新启动,否则它不会刷新。把一个广播想象成变量达到1000时刷新的计数器模式,但是这些都是只读的,不能修改。想到一个计数器,但是这些只能被驱动读取。
最后我的解决方案是,在加载 mongo 集合和缓存的对象的初始化块中,我实现了这个:
//Initialization Block
{
val ex = new ScheduledThreadPoolExecutor(1)
val task = new Runnable {
def run() = {
logger.info("Refresh - Inicialization")
initCache
}
}
val f = ex.scheduleAtFixedRate(task, 0, TIME_REFRES, TimeUnit.SECONDS)
}
initCache is nothing more than a function that connects mongo and loads a collection:
var cache = mutable.HashMap[String,Description]()
def initCache():mutable.HashMap[String, Description]={
val serverAddresses = Functions.getMongoServers(SERVER, PORT)
val mongoConnectionFactory = new MongoCollectionFactory(serverAddresses,DATABASE,COLLECTION,USERNAME,PASSWORD)
val collection = mongoConnectionFactory.getMongoCollection()
val docs = collection.find.iterator()
cache.clear()
while (docs.hasNext) {
var doc = docs.next
cache.put(...............
}
cache
}
这样,spark streaming submit启动后,每个task会多开一个task,每隔X次(我的情况是1或2小时)刷新单例集合的值,并且始终恢复实例化的值:
def getCache():mutable.HashMap[String, Description]={
cache
}