对于 Flink v 1.10.1 或更高版本,您如何以编程方式编辑保存点元数据文件中的容器绝对路径?

For Flink v 1.10.1 or later how can you programmatically edit the container absolute path in a savepoint metadate file?

我正在探索如何更改 Flink 保存点的元数据文件中包含的绝对路径。

我们正在寻求跨 AWS 区域迁移 flink 流;但是预计 运行 由于这个绝对路径会出现问题。 Flink 文档提到了这个问题并建议使用 SavepointV2Serializer 来编辑路径:

https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#can-i-move-the-savepoint-files-on-stable-storage

任何人都可以帮助我找出一个说明如何执行此操作的示例吗?我没能在网上找到参考资料。

此外,虽然在 _metadata 文件中看到了一个绝对路径,但我没有在生成的反序列化对象中找到对它的任何引用,也没有将其保存到序列化文件中。

提前感谢您的指导。

这是我写的主要文件:

object Main extends App {
  val meta = "src" / "main" / "resources" / "_metadata"
  println( s"meta: ${meta.path}: ${meta.exists}" )
  val contents = meta.contentAsString
  println( contents )
//  val serde1 = SavepointV1Serializer.INSTANCE
  val serde2 = SavepointV2Serializer.INSTANCE

  import scala.jdk.CollectionConverters._

  val data = meta.inputStream() { in =>
    val dis = new java.io.DataInputStream( in )
    serde2.deserialize( dis, Thread.currentThread().getContextClassLoader )
  }

  println( s"META: ${data}" )
  println( s"METADATA.version: ${data.getVersion}" )
  println( s"METADATA.checkpointId: ${data.getCheckpointId}" )
  println( s"METADATA.masterStates: ${Option( data.getMasterStates ).map( _.asScala.mkString( "[", ", ", "]" ) )}" )
  println(
    s"METADATA.operatorStates: ${Option( data.getOperatorStates ).map( _.asScala.mkString( "[", ", ", "]" ) )}"
  )
  println( s"METADATA.taskStates: ${Option( data.getTaskStates ).map( _.asScala.mkString( "[", ", ", "]" ) )}" )

  val newMeta = "src" / "main" / "resources" / "_NEW_metedata"

  val newData = new SavepointV2(
    data.getCheckpointId,
    Seq.empty[OperatorState].asJava,
    data.getMasterStates
  )
  println( s"NEW_DATA:OpStates: ${newData.getOperatorStates}" )

  newMeta.outputStream() { out =>
    serde2.serialize( newData, new java.io.DataOutputStream( out ) )
  }
}

根本问题实际上已在 Flink 1.11 中修复——参见 FLINK-5763——保存点现在可以重定位,并且不再包含绝对路径。唯一的例外似乎是您使用 GenericWriteAheadLog 接收器。

文档需要更新,参见FLINK-19381

所以如果你能先升级到1.11.x,那么你应该可以避免这个问题。