Flink MapStateDescriptor 更新 List 值
Flink MapStateDescriptor update List value
我使用 MapStateDescriptor
进行状态计算。这里有一些代码
final val myMap = new MapStateDescriptor[String, List[String]]("myMap", classOf[String], classOf[List[String]])
在我的计算过程中,我想通过向 List[String]
添加新元素来更新我的地图。
可能吗?
更新 #1
已编写以下 def 来管理我的地图
def updateTagsMapState(mapKey: String, tagId: String, mapToUpdate: MapState[String, List[String]]): Unit = {
if (mapToUpdate.contains(mapKey)) {
val mapValues: List[String] = mapToUpdate.get(mapKey)
val updatedMapValues: List[String] = tagId :: mapValues
mapToUpdate.put(mapKey, updatedMapValues)
} else {
mapToUpdate.put(mapKey,List(tagId))
}
}
当然可以。根据您将在那里使用的 Scala List
还是 Java,您可以像这样从描述符实际创建状态:
lazy val stateMap = getRuntimeContext.getMapState(myMap)
那么你可以简单地做:
val list = stateMap.get("someKey")
stateMap.put("someKey", list +: "SomeVal")
请注意,如果您要使用可变数据结构,则不一定需要再次调用 put
,因为数据结构的更新也会更新状态。但是这种方法在 RocksDB 状态的情况下不起作用,因为在这种情况下状态仅在您调用 put
后更新,因此始终建议更新状态本身而不是仅仅更新底层对象。
我使用 MapStateDescriptor
进行状态计算。这里有一些代码
final val myMap = new MapStateDescriptor[String, List[String]]("myMap", classOf[String], classOf[List[String]])
在我的计算过程中,我想通过向 List[String]
添加新元素来更新我的地图。
可能吗?
更新 #1
已编写以下 def 来管理我的地图
def updateTagsMapState(mapKey: String, tagId: String, mapToUpdate: MapState[String, List[String]]): Unit = {
if (mapToUpdate.contains(mapKey)) {
val mapValues: List[String] = mapToUpdate.get(mapKey)
val updatedMapValues: List[String] = tagId :: mapValues
mapToUpdate.put(mapKey, updatedMapValues)
} else {
mapToUpdate.put(mapKey,List(tagId))
}
}
当然可以。根据您将在那里使用的 Scala List
还是 Java,您可以像这样从描述符实际创建状态:
lazy val stateMap = getRuntimeContext.getMapState(myMap)
那么你可以简单地做:
val list = stateMap.get("someKey")
stateMap.put("someKey", list +: "SomeVal")
请注意,如果您要使用可变数据结构,则不一定需要再次调用 put
,因为数据结构的更新也会更新状态。但是这种方法在 RocksDB 状态的情况下不起作用,因为在这种情况下状态仅在您调用 put
后更新,因此始终建议更新状态本身而不是仅仅更新底层对象。