如何附加到 Flink 中的文件接收器
How to append to file sink in Flink
我使用的是 Flink 1.12,我有以下简单的代码片段。我想在每次 运行 程序时向 D:/Sql004_ConnectFileReadAndWrite.csv
添加一些数据。
我在运行程序的时候,第一次发现只有文件不存在才能写入数据。但是我想在我再次 运行 应用程序时追加数据。
我会问如何将数据追加到文件中,即使文件已经存在。
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}
object Sql004_ConnectFileReadAndWrite {
def main(args: Array[String]): Unit = {
println("Sql004_ConnectFileReadAndWrite")
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tenv = StreamTableEnvironment.create(env)
val fmt = new Csv().fieldDelimiter(',').deriveSchema()
val schema = new Schema()
.field("a", DataTypes.STRING())
.field("b", DataTypes.STRING())
.field("c", DataTypes.STRING())
val path = "D:/Sql004_ConnectFileReadAndWrite.csv"
tenv.connect(new FileSystem().path(path)).withSchema(schema).withFormat(fmt).createTemporaryTable("sinkTable")
val sourceStream = env.fromElements(("a", "b", "c"), ("d", "e", "f"))
sourceStream.print()
val table = tenv.fromDataStream(sourceStream).as("c1", "c2", "c3")
table.executeInsert("sinkTable")
env.execute("Sql004_ConnectFileReadAndWrite")
}
}
Flink 的文件系统抽象不支持附加到现有文件,或覆盖以前写入的数据的一部分。这是因为 Flink 希望将某些对象存储(例如 S3)视为只为所涉及的操作提供最终一致性的文件系统。
我使用的是 Flink 1.12,我有以下简单的代码片段。我想在每次 运行 程序时向 D:/Sql004_ConnectFileReadAndWrite.csv
添加一些数据。
我在运行程序的时候,第一次发现只有文件不存在才能写入数据。但是我想在我再次 运行 应用程序时追加数据。
我会问如何将数据追加到文件中,即使文件已经存在。
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}
object Sql004_ConnectFileReadAndWrite {
def main(args: Array[String]): Unit = {
println("Sql004_ConnectFileReadAndWrite")
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tenv = StreamTableEnvironment.create(env)
val fmt = new Csv().fieldDelimiter(',').deriveSchema()
val schema = new Schema()
.field("a", DataTypes.STRING())
.field("b", DataTypes.STRING())
.field("c", DataTypes.STRING())
val path = "D:/Sql004_ConnectFileReadAndWrite.csv"
tenv.connect(new FileSystem().path(path)).withSchema(schema).withFormat(fmt).createTemporaryTable("sinkTable")
val sourceStream = env.fromElements(("a", "b", "c"), ("d", "e", "f"))
sourceStream.print()
val table = tenv.fromDataStream(sourceStream).as("c1", "c2", "c3")
table.executeInsert("sinkTable")
env.execute("Sql004_ConnectFileReadAndWrite")
}
}
Flink 的文件系统抽象不支持附加到现有文件,或覆盖以前写入的数据的一部分。这是因为 Flink 希望将某些对象存储(例如 S3)视为只为所涉及的操作提供最终一致性的文件系统。