文件系统连接器接收器如何工作
How does the file system connector sink work
我正在使用以下简单代码来说明文件系统连接器的行为。
我有两个观察结果想询问并确认。
如果我没有启用检查点,那么所有生成的part-XXX文件的文件名中总是包含inprogress
,这是否意味着这些文件没有提交?另外,这是否意味着如果我想使用文件系统连接器接收器,那么我总是需要 enable checkpointing
以便可以提交生成的文件并且下游(如 hive 或 flink)可以发现并读取这些文件?
分区中的inprogress
文件什么时候移动到正常?是不是在创建新分区的时候,checkpoint开始到运行的时候,就把之前分区的文件从inprogress变成正式的了?如果是这样,那么可能有一个 deplay(checkpoint interval) 分区是可见的。
我在代码中设置了滚动间隔为20秒,但是当我查看生成的part-XXX文件时,后续文件的创建时间相差25秒。我以为应该是20秒
例如,
part-90e63e04-466f-45ce-94d4-9781065a8a8a-0-10 2021-01-03 12:39:04
part-90e63e04-466f-45ce-94d4-9781065a8a8a-0-11 2021-01-03 12:39:29
密码是:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.enableCheckpointing(10*1000)
env.setStateBackend(new FsStateBackend("file:///d:/flink-checkpoints"))
val ds: DataStream[MyEvent] = env.addSource(new InfiniteEventSource(emitInterval = 5 * 1000))
val tenv = StreamTableEnvironment.create(env)
tenv.createTemporaryView("sourceTable", ds)
ds.print()
val ddl =
s"""
create table sinkTable(
id string,
p_day STRING,
p_hour STRING,
p_min STRING
) partitioned by(p_day, p_hour, p_min) with (
'connector' = 'filesystem',
'path' = 'D:/csv-${System.currentTimeMillis()}',
'format' = 'csv',
'sink.rolling-policy.check-interval' = '5 s',
'sink.rolling-policy.rollover-interval' = '20 s',
'sink.partition-commit.trigger'='process-time',
'sink.partition-commit.policy.kind'='success-file',
'sink.partition-commit.delay' = '0 s'
)
""".stripMargin(' ')
tenv.executeSql(ddl)
tenv.executeSql(
"""
insert into sinkTable
select id, date_format(occurrenceTime,'yyyy-MM-dd'), date_format(occurrenceTime, 'HH'), date_format(occurrenceTime, 'mm') from sourceTable
""".stripMargin(' '))
env.execute()
}
要点 1 涵盖在 StreamingFileSink docs:
IMPORTANT: Checkpointing needs to be enabled when using the StreamingFileSink. Part files can only be finalized on successful checkpoints. If checkpointing is disabled, part files will forever stay in the in-progress
or the pending
state, and cannot be safely read by downstream systems.
对于第 2 点,部分文件生命周期记录在案 here,这解释了 in-progress
文件根据滚动策略转换为 pending
,并且仅变为 finished
当检查点完成时。因此,根据滚动策略和检查点间隔,某些文件可能 pending
相当长一段时间。
对于第 3 点,rollover-interval
为 20 秒,check-interval
为 5 秒,翻转将在 20 到 25 秒之间的某个时间后发生。有关 check-interval
:
的解释,请参阅 Rolling Policy 文档
The interval for checking time based rolling policies. This controls the frequency to check whether a part file should rollover based on 'sink.rolling-policy.rollover-interval'.
我正在使用以下简单代码来说明文件系统连接器的行为。 我有两个观察结果想询问并确认。
如果我没有启用检查点,那么所有生成的part-XXX文件的文件名中总是包含
inprogress
,这是否意味着这些文件没有提交?另外,这是否意味着如果我想使用文件系统连接器接收器,那么我总是需要enable checkpointing
以便可以提交生成的文件并且下游(如 hive 或 flink)可以发现并读取这些文件?分区中的
inprogress
文件什么时候移动到正常?是不是在创建新分区的时候,checkpoint开始到运行的时候,就把之前分区的文件从inprogress变成正式的了?如果是这样,那么可能有一个 deplay(checkpoint interval) 分区是可见的。我在代码中设置了滚动间隔为20秒,但是当我查看生成的part-XXX文件时,后续文件的创建时间相差25秒。我以为应该是20秒
例如,
part-90e63e04-466f-45ce-94d4-9781065a8a8a-0-10 2021-01-03 12:39:04
part-90e63e04-466f-45ce-94d4-9781065a8a8a-0-11 2021-01-03 12:39:29
密码是:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.enableCheckpointing(10*1000)
env.setStateBackend(new FsStateBackend("file:///d:/flink-checkpoints"))
val ds: DataStream[MyEvent] = env.addSource(new InfiniteEventSource(emitInterval = 5 * 1000))
val tenv = StreamTableEnvironment.create(env)
tenv.createTemporaryView("sourceTable", ds)
ds.print()
val ddl =
s"""
create table sinkTable(
id string,
p_day STRING,
p_hour STRING,
p_min STRING
) partitioned by(p_day, p_hour, p_min) with (
'connector' = 'filesystem',
'path' = 'D:/csv-${System.currentTimeMillis()}',
'format' = 'csv',
'sink.rolling-policy.check-interval' = '5 s',
'sink.rolling-policy.rollover-interval' = '20 s',
'sink.partition-commit.trigger'='process-time',
'sink.partition-commit.policy.kind'='success-file',
'sink.partition-commit.delay' = '0 s'
)
""".stripMargin(' ')
tenv.executeSql(ddl)
tenv.executeSql(
"""
insert into sinkTable
select id, date_format(occurrenceTime,'yyyy-MM-dd'), date_format(occurrenceTime, 'HH'), date_format(occurrenceTime, 'mm') from sourceTable
""".stripMargin(' '))
env.execute()
}
要点 1 涵盖在 StreamingFileSink docs:
IMPORTANT: Checkpointing needs to be enabled when using the StreamingFileSink. Part files can only be finalized on successful checkpoints. If checkpointing is disabled, part files will forever stay in the
in-progress
or thepending
state, and cannot be safely read by downstream systems.
对于第 2 点,部分文件生命周期记录在案 here,这解释了 in-progress
文件根据滚动策略转换为 pending
,并且仅变为 finished
当检查点完成时。因此,根据滚动策略和检查点间隔,某些文件可能 pending
相当长一段时间。
对于第 3 点,rollover-interval
为 20 秒,check-interval
为 5 秒,翻转将在 20 到 25 秒之间的某个时间后发生。有关 check-interval
:
The interval for checking time based rolling policies. This controls the frequency to check whether a part file should rollover based on 'sink.rolling-policy.rollover-interval'.