Flink 1.11 StreamingFileSink 刷新记录到正在进行的文件

Flink 1.11 StreamingFileSink flush record(s) early to in-progress file

我们有一个每小时分桶的 StreamingFileSink(到 HDFS),其中记录相对不频繁。 有没有一种方法可以配置 Flink 将记录一到达(不到 1 分钟)就刷新到 in-progress 文件,而不是 Flink 将它们保存在缓冲区中?

要求后续数据分析进程近乎实时地读取 in-progress 文件。

我知道如何缩短InactivityInterval,但最终导致文件太多,这是不可取的。

StreamingFileSink时间数据实时写入HDFS,不会一直保存在buffer中

为了支持exactly once语义,progress文件只会在checkpoint时重命名为正式文件,然后才能进行后续数据分析

StreamingFileSink不支持实时数据查询,可以减小cp间隔提高数据可见性的实时性,但cp间隔太小容易导致小文件问题

也许你可以看看写函数的实现。有几种实现会将数据真正写入 hdfs 文件 time.StreamingFileSink 本身不会将它们保存在缓冲区中,但是 FileOutputStream

中会有一些缓冲区
public interface InProgressFileWriter<IN, BucketID> extends PartFileInfo<BucketID> {

/**
 * Write a element to the part file.
 *
 * @param element the element to be written.
 * @param currentTime the writing time.
 * @throws IOException Thrown if writing the element fails.
 */
void write(final IN element, final long currentTime) throws IOException;

/**
 * @return The state of the current part file.
 * @throws IOException Thrown if persisting the part file fails.
 */
InProgressFileRecoverable persist() throws IOException;

/**
 * @return The state of the pending part file. {@link Bucket} uses this to commit the pending
 *     file.
 * @throws IOException Thrown if an I/O error occurs.
 */
PendingFileRecoverable closeForCommit() throws IOException;

/** Dispose the part file. */
void dispose();

// ------------------------------------------------------------------------

/** A handle can be used to recover in-progress file.. */
interface InProgressFileRecoverable extends PendingFileRecoverable {}

/** The handle can be used to recover pending file. */
interface PendingFileRecoverable {}

}

您是否考虑过启用文件压缩选项? https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#file-compaction