Flink 1.11 StreamingFileSink 刷新记录到正在进行的文件
Flink 1.11 StreamingFileSink flush record(s) early to in-progress file
我们有一个每小时分桶的 StreamingFileSink
(到 HDFS),其中记录相对不频繁。
有没有一种方法可以配置 Flink 将记录一到达(不到 1 分钟)就刷新到 in-progress
文件,而不是 Flink 将它们保存在缓冲区中?
要求后续数据分析进程近乎实时地读取 in-progress
文件。
我知道如何缩短InactivityInterval
,但最终导致文件太多,这是不可取的。
StreamingFileSink时间数据实时写入HDFS,不会一直保存在buffer中
为了支持exactly once语义,progress文件只会在checkpoint时重命名为正式文件,然后才能进行后续数据分析
StreamingFileSink不支持实时数据查询,可以减小cp间隔提高数据可见性的实时性,但cp间隔太小容易导致小文件问题
也许你可以看看写函数的实现。有几种实现会将数据真正写入 hdfs 文件 time.StreamingFileSink 本身不会将它们保存在缓冲区中,但是 FileOutputStream
中会有一些缓冲区
public interface InProgressFileWriter<IN, BucketID> extends PartFileInfo<BucketID> {
/**
* Write a element to the part file.
*
* @param element the element to be written.
* @param currentTime the writing time.
* @throws IOException Thrown if writing the element fails.
*/
void write(final IN element, final long currentTime) throws IOException;
/**
* @return The state of the current part file.
* @throws IOException Thrown if persisting the part file fails.
*/
InProgressFileRecoverable persist() throws IOException;
/**
* @return The state of the pending part file. {@link Bucket} uses this to commit the pending
* file.
* @throws IOException Thrown if an I/O error occurs.
*/
PendingFileRecoverable closeForCommit() throws IOException;
/** Dispose the part file. */
void dispose();
// ------------------------------------------------------------------------
/** A handle can be used to recover in-progress file.. */
interface InProgressFileRecoverable extends PendingFileRecoverable {}
/** The handle can be used to recover pending file. */
interface PendingFileRecoverable {}
}
您是否考虑过启用文件压缩选项? https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#file-compaction
我们有一个每小时分桶的 StreamingFileSink
(到 HDFS),其中记录相对不频繁。
有没有一种方法可以配置 Flink 将记录一到达(不到 1 分钟)就刷新到 in-progress
文件,而不是 Flink 将它们保存在缓冲区中?
要求后续数据分析进程近乎实时地读取 in-progress
文件。
我知道如何缩短InactivityInterval
,但最终导致文件太多,这是不可取的。
StreamingFileSink时间数据实时写入HDFS,不会一直保存在buffer中
为了支持exactly once语义,progress文件只会在checkpoint时重命名为正式文件,然后才能进行后续数据分析
StreamingFileSink不支持实时数据查询,可以减小cp间隔提高数据可见性的实时性,但cp间隔太小容易导致小文件问题
也许你可以看看写函数的实现。有几种实现会将数据真正写入 hdfs 文件 time.StreamingFileSink 本身不会将它们保存在缓冲区中,但是 FileOutputStream
中会有一些缓冲区public interface InProgressFileWriter<IN, BucketID> extends PartFileInfo<BucketID> {
/**
* Write a element to the part file.
*
* @param element the element to be written.
* @param currentTime the writing time.
* @throws IOException Thrown if writing the element fails.
*/
void write(final IN element, final long currentTime) throws IOException;
/**
* @return The state of the current part file.
* @throws IOException Thrown if persisting the part file fails.
*/
InProgressFileRecoverable persist() throws IOException;
/**
* @return The state of the pending part file. {@link Bucket} uses this to commit the pending
* file.
* @throws IOException Thrown if an I/O error occurs.
*/
PendingFileRecoverable closeForCommit() throws IOException;
/** Dispose the part file. */
void dispose();
// ------------------------------------------------------------------------
/** A handle can be used to recover in-progress file.. */
interface InProgressFileRecoverable extends PendingFileRecoverable {}
/** The handle can be used to recover pending file. */
interface PendingFileRecoverable {}
}
您是否考虑过启用文件压缩选项? https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#file-compaction