对于 Flink v 1.10.1 或更高版本,您如何以编程方式编辑保存点元数据文件中的容器绝对路径?
For Flink v 1.10.1 or later how can you programmatically edit the container absolute path in a savepoint metadate file?
我正在探索如何更改 Flink 保存点的元数据文件中包含的绝对路径。
我们正在寻求跨 AWS 区域迁移 flink 流;但是预计 运行 由于这个绝对路径会出现问题。 Flink 文档提到了这个问题并建议使用 SavepointV2Serializer 来编辑路径:
任何人都可以帮助我找出一个说明如何执行此操作的示例吗?我没能在网上找到参考资料。
此外,虽然在 _metadata 文件中看到了一个绝对路径,但我没有在生成的反序列化对象中找到对它的任何引用,也没有将其保存到序列化文件中。
提前感谢您的指导。
这是我写的主要文件:
object Main extends App {
val meta = "src" / "main" / "resources" / "_metadata"
println( s"meta: ${meta.path}: ${meta.exists}" )
val contents = meta.contentAsString
println( contents )
// val serde1 = SavepointV1Serializer.INSTANCE
val serde2 = SavepointV2Serializer.INSTANCE
import scala.jdk.CollectionConverters._
val data = meta.inputStream() { in =>
val dis = new java.io.DataInputStream( in )
serde2.deserialize( dis, Thread.currentThread().getContextClassLoader )
}
println( s"META: ${data}" )
println( s"METADATA.version: ${data.getVersion}" )
println( s"METADATA.checkpointId: ${data.getCheckpointId}" )
println( s"METADATA.masterStates: ${Option( data.getMasterStates ).map( _.asScala.mkString( "[", ", ", "]" ) )}" )
println(
s"METADATA.operatorStates: ${Option( data.getOperatorStates ).map( _.asScala.mkString( "[", ", ", "]" ) )}"
)
println( s"METADATA.taskStates: ${Option( data.getTaskStates ).map( _.asScala.mkString( "[", ", ", "]" ) )}" )
val newMeta = "src" / "main" / "resources" / "_NEW_metedata"
val newData = new SavepointV2(
data.getCheckpointId,
Seq.empty[OperatorState].asJava,
data.getMasterStates
)
println( s"NEW_DATA:OpStates: ${newData.getOperatorStates}" )
newMeta.outputStream() { out =>
serde2.serialize( newData, new java.io.DataOutputStream( out ) )
}
}
根本问题实际上已在 Flink 1.11 中修复——参见 FLINK-5763——保存点现在可以重定位,并且不再包含绝对路径。唯一的例外似乎是您使用 GenericWriteAheadLog
接收器。
文档需要更新,参见FLINK-19381。
所以如果你能先升级到1.11.x,那么你应该可以避免这个问题。
我正在探索如何更改 Flink 保存点的元数据文件中包含的绝对路径。
我们正在寻求跨 AWS 区域迁移 flink 流;但是预计 运行 由于这个绝对路径会出现问题。 Flink 文档提到了这个问题并建议使用 SavepointV2Serializer 来编辑路径:
任何人都可以帮助我找出一个说明如何执行此操作的示例吗?我没能在网上找到参考资料。
此外,虽然在 _metadata 文件中看到了一个绝对路径,但我没有在生成的反序列化对象中找到对它的任何引用,也没有将其保存到序列化文件中。
提前感谢您的指导。
这是我写的主要文件:
object Main extends App {
val meta = "src" / "main" / "resources" / "_metadata"
println( s"meta: ${meta.path}: ${meta.exists}" )
val contents = meta.contentAsString
println( contents )
// val serde1 = SavepointV1Serializer.INSTANCE
val serde2 = SavepointV2Serializer.INSTANCE
import scala.jdk.CollectionConverters._
val data = meta.inputStream() { in =>
val dis = new java.io.DataInputStream( in )
serde2.deserialize( dis, Thread.currentThread().getContextClassLoader )
}
println( s"META: ${data}" )
println( s"METADATA.version: ${data.getVersion}" )
println( s"METADATA.checkpointId: ${data.getCheckpointId}" )
println( s"METADATA.masterStates: ${Option( data.getMasterStates ).map( _.asScala.mkString( "[", ", ", "]" ) )}" )
println(
s"METADATA.operatorStates: ${Option( data.getOperatorStates ).map( _.asScala.mkString( "[", ", ", "]" ) )}"
)
println( s"METADATA.taskStates: ${Option( data.getTaskStates ).map( _.asScala.mkString( "[", ", ", "]" ) )}" )
val newMeta = "src" / "main" / "resources" / "_NEW_metedata"
val newData = new SavepointV2(
data.getCheckpointId,
Seq.empty[OperatorState].asJava,
data.getMasterStates
)
println( s"NEW_DATA:OpStates: ${newData.getOperatorStates}" )
newMeta.outputStream() { out =>
serde2.serialize( newData, new java.io.DataOutputStream( out ) )
}
}
根本问题实际上已在 Flink 1.11 中修复——参见 FLINK-5763——保存点现在可以重定位,并且不再包含绝对路径。唯一的例外似乎是您使用 GenericWriteAheadLog
接收器。
文档需要更新,参见FLINK-19381。
所以如果你能先升级到1.11.x,那么你应该可以避免这个问题。