如何解决状态大小问题的慢速检查点问题?

How to fix the slow checkpoint with small state size issue?

我有一个flink应用程序(flink版本是1.9.2)启用了checkpoint功能。当我运行它在apache flink 平台上。我总是收到检查点失败消息:检查点在 completing.After 之前过期检查点期间检查 taskManager 的线程转储,我发现包含两个请求外部服务的操作符的线程始终处于 运行 启用状态。下面是我对该运算符的设计和检查点配置。请帮忙指教如何解决这个问题?

算子设计:

public class OperatorA extends RichMapFunction<POJOA, POJOA> {
    private Connection connection;
    private String getCusipSourceIdPairsQuery;
    private String getCusipListQuery;
    private MapState<String, List<POJOX>> modifiedCusipState;
    private MapState<String, List<POJOX>> bwicMatchedModifiedCusipState;
    
    @Override
    public POJOA map(POJOA value) throw Exception {
        // create local variable PreparedStatement every time invoke this map method
        // update/clear those two MapStates
    }
    
    @Override
    public void open(Configuration parameters) {
        // initialize jdbc connection and TTL MapStates using GlobalJobParameters
    }

    @Override
    public void close() {
        // close jdbc connection
    }
}

public class OperatorB extends RichMapFunction<POJOA, POJOA> {
    private MyServiceA serviceA;
    private MyServiceB serviceB;

    @Override
    public POJOA map(POJOA value) throw Exception {
        // call a restful GET API of ServiceB, get a XML response, about 500 fields in the response.
        // use serviceA's function to extract the XML document and then populate the value fields.
    }
    
    @Override
    public void open(Configuration parameters) {
        // initialize local jdbc connection and PreparedStatement using globalJobParameters. then use the executed results to initialize serviceA.
        // initialize serviceB.
    }
}

检查点配置:

Checkpointing Mode                  Exactly Once
Interval                            15m 0s
Timeout                             10m 0s
Minimum Pause Between Checkpoints   5m 0s
Maximum Concurrent Checkpoints      1
Persist Checkpoints Externally      Disabled

示例检查点历史记录:

ID  Status          Acknowledged    Trigger Time    Latest Acknowledgement  End to End Duration     State Size  Buffered During Alignment   
20  In Progress     3/12 (25%)      15:03:13        15:04:14                1m 1s                   5.65 KB     0 B 
19  Failed          3/12            14:48:13        14:50:12                10m 0s                  5.65 KB     0 B 
18  Failed          3/12            14:33:13        14:34:50                10m 0s                  5.65 KB     0 B 
17  Failed          4/12            14:18:13        14:27:04                9m 59s                  2.91 MB     64.0 KB 
16  Failed          3/12            14:03:13        14:05:18                10m 0s                  5.65 KB     0 B

以下是我在定位过期检查点问题时经常使用的一些技巧:

  1. 检查检查点UI以了解导致过期的子任务的分布。
  2. 如果大多数子任务已经完成检查点,请跳至提示 3,否则跳至提示 4。
  3. 最可能的原因是数据倾斜,问题子任务收到的​​记录比其他子任务多得多。如果不是数据倾斜的问题,查看子任务运行所在的主机,是否有CPU/MEM/DISK的问题导致子任务的消耗变慢
  4. 这种情况比较少见,一般是使用过的代码造成的。例如,用户尝试访问操作员中的数据库,但连接不稳定,这会减慢处理速度。

在 Flink 用户函数(例如,RichMap 或 ProcessFunction)中进行任何类型的阻塞 i/o 都会给检查点带来麻烦。原因是很容易以显着的背压结束,这会阻止检查点障碍在执行图中取得足够快的进展,从而导致检查点超时。

对此进行改进的首选方法是使用 async i/o 而不是 RichMap。这将允许在任何给定时刻有更多未完成的请求(假设外部服务能够处理更高的负载),并且不会让操作员阻塞在用户代码中等待对同步请求的响应——从而允许检查点畅行无阻

另一种方法是增加集群的并行度,这应该可以减少背压,但代价是占用更多的计算资源,这些资源除了等待之外不会做太多事情。

在最坏的情况下,外部服务根本无法满足您的吞吐量要求,那么背压是不可避免的。这将变得更加难以管理,但是 unaligned checkpoints,即将在 Flink 1.11 中出现,应该会有所帮助。

我最近遇到了类似的问题。 @David Anderson 提供的建议真的很好!不过,我还有几点要补充。

你可以试试tune your checkpoints according to Apache Flink documentation.

在我的例子中,检查点间隔低于检查点之间的最小暂停时间,所以我增加了它以使其更大。在我的例子中,我将检查点间隔乘以 2,并将此值设置为检查点之间的最小暂停。

您也可以尝试增加检查点超时。

另一个问题可能是 ValueState。我的流水线在很长一段时间内保持状态,并且没有驱逐导致吞吐量问题的原因。我设置TTL for the ValueState (in my case for 30 minutes) and it started to work better. TTL is well described in the Apache Flink documentation。它真的很简单,看起来像这样:

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
    
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

还值得注意的是,此 SO 线程与类似主题相关:Flink Checkpoint Failure - Checkpoints time out after 10 mins 并且其中提供的提示可能有用。

此致,

彼得