Flink MapStateDescriptor 更新 List 值

Flink MapStateDescriptor update List value

我使用 MapStateDescriptor 进行状态计算。这里有一些代码

final val myMap = new MapStateDescriptor[String, List[String]]("myMap", classOf[String], classOf[List[String]])

在我的计算过程中,我想通过向 List[String] 添加新元素来更新我的地图。

可能吗?

更新 #1

已编写以下 def 来管理我的地图

def updateTagsMapState(mapKey: String, tagId: String, mapToUpdate: MapState[String, List[String]]): Unit = {
  if (mapToUpdate.contains(mapKey)) {
    val mapValues: List[String] = mapToUpdate.get(mapKey)
    val updatedMapValues: List[String] = tagId :: mapValues
    mapToUpdate.put(mapKey, updatedMapValues)
  } else {
    mapToUpdate.put(mapKey,List(tagId))
  }
}

当然可以。根据您将在那里使用的 Scala List 还是 Java,您可以像这样从描述符实际创建状态:

lazy val stateMap = getRuntimeContext.getMapState(myMap)

那么你可以简单地做:

val list = stateMap.get("someKey")
stateMap.put("someKey", list +: "SomeVal")

请注意,如果您要使用可变数据结构,则不一定需要再次调用 put,因为数据结构的更新也会更新状态。但是这种方法在 RocksDB 状态的情况下不起作用,因为在这种情况下状态仅在您调用 put 后更新,因此始终建议更新状态本身而不是仅仅更新底层对象。