Flink 迁移:BucketingSink[T] 到 StreamingFileSink[T]
Flink migration: BucketingSink[T] to StreamingFileSink[T]
我有最老的 Flink 版本,想更新到最新的稳定版本。 BucketingSink
在最新版本中被删除,我尝试用 StreamingFileSink
更改它。要初始化它,我使用 StreamingFileSink.forBulkFormat
但有错误:
type arguments [T] do not conform to method forSpecificRecord's type parameter bo
unds [T <: org.apache.avro.specific.SpecificRecordBase]
[ERROR] .forBulkFormat(new Path(s"${conf.output}/$path") , AvroWriters.forSpecificRecord[T](classOf[T]) )
我也找不到如何设置bucketer:DateTimeBucketer[T], inactiveBucketThreshold, writer: Writer[T]
你能帮我找到正确的方法吗
旧代码:
trait Runner[T <: SpecificRecordBase] extends Serializable {
def createHdfsSink(conf: FlinkConfig, path: String): BucketingSink[T] = {
val bucketer = new DateTimeBucketer[T]
val sink = new BucketingSink[T](s"${conf.output}/$path")
sink
.setBatchSize(toBytes(conf.batchSize))
.setBucketer(bucketer)
.setInactiveBucketThreshold(toMillis(conf.inactiveBucketThreshold))
.setWriter(writer)
.setPendingPrefix(pendingPrefix)
.setBatchRolloverInterval(conf.fileOpenIntervalTime)
}
有错误的新代码:
def createHdfsStreamingSink[T : ClassTag](conf: FlinkConfig, path: String): StreamingFileSink[T] = {
val sink = StreamingFileSink
.forBulkFormat(new Path(s"${conf.output}/$path") , AvroWriters.forSpecificRecord[T](classOf[T]) )
.build()
// TODO: .withOutputFileConfig()
sink
}
我认为,您应该改用 forReflectRecord(Class<T> type)
方法,该方法将使用反射为该类型创建模式并使用该模式写入记录。自定义存储桶分配程序在 StreamingFileSink
设置期间配置,并通过 .withBucketAssigner(BucketAssigner<IN, ID> assigner)
方法调用指定。
因此,最终您的 StreamingFileSink
生成器将如下所示:
def createHdfsStreamingSink[T : ClassTag](conf: FlinkConfig, path: String): StreamingFileSink[T] = {
val sink = StreamingFileSink
.forBulkFormat(new Path(s"${conf.output}/$path") , AvroWriters.forReflectRecord[T](classOf[T]) )
.withBucketAssigner(bucketAssigner)
.build()
sink
}
我有最老的 Flink 版本,想更新到最新的稳定版本。 BucketingSink
在最新版本中被删除,我尝试用 StreamingFileSink
更改它。要初始化它,我使用 StreamingFileSink.forBulkFormat
但有错误:
type arguments [T] do not conform to method forSpecificRecord's type parameter bo
unds [T <: org.apache.avro.specific.SpecificRecordBase]
[ERROR] .forBulkFormat(new Path(s"${conf.output}/$path") , AvroWriters.forSpecificRecord[T](classOf[T]) )
我也找不到如何设置bucketer:DateTimeBucketer[T], inactiveBucketThreshold, writer: Writer[T]
你能帮我找到正确的方法吗
旧代码:
trait Runner[T <: SpecificRecordBase] extends Serializable {
def createHdfsSink(conf: FlinkConfig, path: String): BucketingSink[T] = {
val bucketer = new DateTimeBucketer[T]
val sink = new BucketingSink[T](s"${conf.output}/$path")
sink
.setBatchSize(toBytes(conf.batchSize))
.setBucketer(bucketer)
.setInactiveBucketThreshold(toMillis(conf.inactiveBucketThreshold))
.setWriter(writer)
.setPendingPrefix(pendingPrefix)
.setBatchRolloverInterval(conf.fileOpenIntervalTime)
}
有错误的新代码:
def createHdfsStreamingSink[T : ClassTag](conf: FlinkConfig, path: String): StreamingFileSink[T] = {
val sink = StreamingFileSink
.forBulkFormat(new Path(s"${conf.output}/$path") , AvroWriters.forSpecificRecord[T](classOf[T]) )
.build()
// TODO: .withOutputFileConfig()
sink
}
我认为,您应该改用 forReflectRecord(Class<T> type)
方法,该方法将使用反射为该类型创建模式并使用该模式写入记录。自定义存储桶分配程序在 StreamingFileSink
设置期间配置,并通过 .withBucketAssigner(BucketAssigner<IN, ID> assigner)
方法调用指定。
因此,最终您的 StreamingFileSink
生成器将如下所示:
def createHdfsStreamingSink[T : ClassTag](conf: FlinkConfig, path: String): StreamingFileSink[T] = {
val sink = StreamingFileSink
.forBulkFormat(new Path(s"${conf.output}/$path") , AvroWriters.forReflectRecord[T](classOf[T]) )
.withBucketAssigner(bucketAssigner)
.build()
sink
}