Flink SQL 不支持 "table.exec.source.idle-timeout" 设置

Flink SQL does not honor "table.exec.source.idle-timeout" setting

我有一个 Flink 作业 运行 FlinkSQL 具有以下设置:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final EnvironmentSettings settings =
        EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

env.setMaxParallelism(env.getParallelism() * 8);
env.getConfig().setAutoWatermarkInterval(config.autowatermarkInterval());

final TableConfig tConfig = tEnv.getConfig();
tConfig.setIdleStateRetention(Duration.ofMinutes(60));

tConfig.getConfiguration().setString("table.exec.source.idle-timeout", "180000 ms");

为了使用 Kafka 源在本地进行测试,我向 Flink 作业发射了一些事件。 Flink UI 显示它产生了一个水印。我等了 3 分钟,看看水印是否在没有发送新事件(即空闲分区)的情况下前进。但是,没有出现水印推进。

注意:我在本地使用具有三个分区的 Kafka 代理。我的测试数据是键控的,因此被发送到同一个分区。但是,即使其他分区空闲并且我等待 3 分钟,我也没有看到水印前进。

  1. 作业中的任何位置 UI 我可以查看我设置的 3 分钟值是否真的被拾取了?我使用的单位是否正确(秒与毫秒)

  2. 还有什么我可以检查以测试此设置的吗?

我们是运行 Flink 1.12.1.

更新:我在我的 Flink SQL 作业中看到这个异常,异常:不知道是否存在版本不匹配。

2021-10-26 16:38:14
java.lang.NoClassDefFoundError: org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest$PartitionData
    at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.lambda$null[=13=](OffsetsForLeaderEpochClient.java:52)
    at java.base/java.util.Optional.ifPresent(Unknown Source)
    at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.lambda$prepareRequest(OffsetsForLeaderEpochClient.java:51)
    at java.base/java.util.HashMap.forEach(Unknown Source)
    at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.prepareRequest(OffsetsForLeaderEpochClient.java:51)
    at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.prepareRequest(OffsetsForLeaderEpochClient.java:37)
    at org.apache.kafka.clients.consumer.internals.AsyncClient.sendAsyncRequest(AsyncClient.java:37)
    at org.apache.kafka.clients.consumer.internals.Fetcher.lambda$validateOffsetsAsync(Fetcher.java:798)
    at java.base/java.util.HashMap.forEach(Unknown Source)
    at org.apache.kafka.clients.consumer.internals.Fetcher.validateOffsetsAsync(Fetcher.java:774)
    at org.apache.kafka.clients.consumer.internals.Fetcher.validateOffsetsIfNeeded(Fetcher.java:498)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2328)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1271)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:249)

问题是此设置在 Flink 1.12.0 或 1.12.1 中不起作用。我不得不升级到 Flink 1.13.2 并且该设置得到认可并按预期工作。

唯一的例外是转移注意力并且无法始终如一地重现。