如何解决状态大小问题的慢速检查点问题?
How to fix the slow checkpoint with small state size issue?
我有一个flink应用程序(flink版本是1.9.2)启用了checkpoint功能。当我运行它在apache flink 平台上。我总是收到检查点失败消息:检查点在 completing.After 之前过期检查点期间检查 taskManager 的线程转储,我发现包含两个请求外部服务的操作符的线程始终处于 运行 启用状态。下面是我对该运算符的设计和检查点配置。请帮忙指教如何解决这个问题?
算子设计:
public class OperatorA extends RichMapFunction<POJOA, POJOA> {
private Connection connection;
private String getCusipSourceIdPairsQuery;
private String getCusipListQuery;
private MapState<String, List<POJOX>> modifiedCusipState;
private MapState<String, List<POJOX>> bwicMatchedModifiedCusipState;
@Override
public POJOA map(POJOA value) throw Exception {
// create local variable PreparedStatement every time invoke this map method
// update/clear those two MapStates
}
@Override
public void open(Configuration parameters) {
// initialize jdbc connection and TTL MapStates using GlobalJobParameters
}
@Override
public void close() {
// close jdbc connection
}
}
public class OperatorB extends RichMapFunction<POJOA, POJOA> {
private MyServiceA serviceA;
private MyServiceB serviceB;
@Override
public POJOA map(POJOA value) throw Exception {
// call a restful GET API of ServiceB, get a XML response, about 500 fields in the response.
// use serviceA's function to extract the XML document and then populate the value fields.
}
@Override
public void open(Configuration parameters) {
// initialize local jdbc connection and PreparedStatement using globalJobParameters. then use the executed results to initialize serviceA.
// initialize serviceB.
}
}
检查点配置:
Checkpointing Mode Exactly Once
Interval 15m 0s
Timeout 10m 0s
Minimum Pause Between Checkpoints 5m 0s
Maximum Concurrent Checkpoints 1
Persist Checkpoints Externally Disabled
示例检查点历史记录:
ID Status Acknowledged Trigger Time Latest Acknowledgement End to End Duration State Size Buffered During Alignment
20 In Progress 3/12 (25%) 15:03:13 15:04:14 1m 1s 5.65 KB 0 B
19 Failed 3/12 14:48:13 14:50:12 10m 0s 5.65 KB 0 B
18 Failed 3/12 14:33:13 14:34:50 10m 0s 5.65 KB 0 B
17 Failed 4/12 14:18:13 14:27:04 9m 59s 2.91 MB 64.0 KB
16 Failed 3/12 14:03:13 14:05:18 10m 0s 5.65 KB 0 B
以下是我在定位过期检查点问题时经常使用的一些技巧:
- 检查检查点UI以了解导致过期的子任务的分布。
- 如果大多数子任务已经完成检查点,请跳至提示 3,否则跳至提示 4。
- 最可能的原因是数据倾斜,问题子任务收到的记录比其他子任务多得多。如果不是数据倾斜的问题,查看子任务运行所在的主机,是否有CPU/MEM/DISK的问题导致子任务的消耗变慢
- 这种情况比较少见,一般是使用过的代码造成的。例如,用户尝试访问操作员中的数据库,但连接不稳定,这会减慢处理速度。
在 Flink 用户函数(例如,RichMap 或 ProcessFunction)中进行任何类型的阻塞 i/o 都会给检查点带来麻烦。原因是很容易以显着的背压结束,这会阻止检查点障碍在执行图中取得足够快的进展,从而导致检查点超时。
对此进行改进的首选方法是使用 async i/o 而不是 RichMap。这将允许在任何给定时刻有更多未完成的请求(假设外部服务能够处理更高的负载),并且不会让操作员阻塞在用户代码中等待对同步请求的响应——从而允许检查点畅行无阻
另一种方法是增加集群的并行度,这应该可以减少背压,但代价是占用更多的计算资源,这些资源除了等待之外不会做太多事情。
在最坏的情况下,外部服务根本无法满足您的吞吐量要求,那么背压是不可避免的。这将变得更加难以管理,但是 unaligned checkpoints,即将在 Flink 1.11 中出现,应该会有所帮助。
我最近遇到了类似的问题。 @David Anderson 提供的建议真的很好!不过,我还有几点要补充。
你可以试试tune your checkpoints according to Apache Flink documentation.
在我的例子中,检查点间隔低于检查点之间的最小暂停时间,所以我增加了它以使其更大。在我的例子中,我将检查点间隔乘以 2,并将此值设置为检查点之间的最小暂停。
您也可以尝试增加检查点超时。
另一个问题可能是 ValueState
。我的流水线在很长一段时间内保持状态,并且没有驱逐导致吞吐量问题的原因。我设置TTL for the ValueState
(in my case for 30 minutes) and it started to work better. TTL is well described in the Apache Flink documentation。它真的很简单,看起来像这样:
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
还值得注意的是,此 SO 线程与类似主题相关:Flink Checkpoint Failure - Checkpoints time out after 10 mins 并且其中提供的提示可能有用。
此致,
彼得
我有一个flink应用程序(flink版本是1.9.2)启用了checkpoint功能。当我运行它在apache flink 平台上。我总是收到检查点失败消息:检查点在 completing.After 之前过期检查点期间检查 taskManager 的线程转储,我发现包含两个请求外部服务的操作符的线程始终处于 运行 启用状态。下面是我对该运算符的设计和检查点配置。请帮忙指教如何解决这个问题?
算子设计:
public class OperatorA extends RichMapFunction<POJOA, POJOA> {
private Connection connection;
private String getCusipSourceIdPairsQuery;
private String getCusipListQuery;
private MapState<String, List<POJOX>> modifiedCusipState;
private MapState<String, List<POJOX>> bwicMatchedModifiedCusipState;
@Override
public POJOA map(POJOA value) throw Exception {
// create local variable PreparedStatement every time invoke this map method
// update/clear those two MapStates
}
@Override
public void open(Configuration parameters) {
// initialize jdbc connection and TTL MapStates using GlobalJobParameters
}
@Override
public void close() {
// close jdbc connection
}
}
public class OperatorB extends RichMapFunction<POJOA, POJOA> {
private MyServiceA serviceA;
private MyServiceB serviceB;
@Override
public POJOA map(POJOA value) throw Exception {
// call a restful GET API of ServiceB, get a XML response, about 500 fields in the response.
// use serviceA's function to extract the XML document and then populate the value fields.
}
@Override
public void open(Configuration parameters) {
// initialize local jdbc connection and PreparedStatement using globalJobParameters. then use the executed results to initialize serviceA.
// initialize serviceB.
}
}
检查点配置:
Checkpointing Mode Exactly Once
Interval 15m 0s
Timeout 10m 0s
Minimum Pause Between Checkpoints 5m 0s
Maximum Concurrent Checkpoints 1
Persist Checkpoints Externally Disabled
示例检查点历史记录:
ID Status Acknowledged Trigger Time Latest Acknowledgement End to End Duration State Size Buffered During Alignment
20 In Progress 3/12 (25%) 15:03:13 15:04:14 1m 1s 5.65 KB 0 B
19 Failed 3/12 14:48:13 14:50:12 10m 0s 5.65 KB 0 B
18 Failed 3/12 14:33:13 14:34:50 10m 0s 5.65 KB 0 B
17 Failed 4/12 14:18:13 14:27:04 9m 59s 2.91 MB 64.0 KB
16 Failed 3/12 14:03:13 14:05:18 10m 0s 5.65 KB 0 B
以下是我在定位过期检查点问题时经常使用的一些技巧:
- 检查检查点UI以了解导致过期的子任务的分布。
- 如果大多数子任务已经完成检查点,请跳至提示 3,否则跳至提示 4。
- 最可能的原因是数据倾斜,问题子任务收到的记录比其他子任务多得多。如果不是数据倾斜的问题,查看子任务运行所在的主机,是否有CPU/MEM/DISK的问题导致子任务的消耗变慢
- 这种情况比较少见,一般是使用过的代码造成的。例如,用户尝试访问操作员中的数据库,但连接不稳定,这会减慢处理速度。
在 Flink 用户函数(例如,RichMap 或 ProcessFunction)中进行任何类型的阻塞 i/o 都会给检查点带来麻烦。原因是很容易以显着的背压结束,这会阻止检查点障碍在执行图中取得足够快的进展,从而导致检查点超时。
对此进行改进的首选方法是使用 async i/o 而不是 RichMap。这将允许在任何给定时刻有更多未完成的请求(假设外部服务能够处理更高的负载),并且不会让操作员阻塞在用户代码中等待对同步请求的响应——从而允许检查点畅行无阻
另一种方法是增加集群的并行度,这应该可以减少背压,但代价是占用更多的计算资源,这些资源除了等待之外不会做太多事情。
在最坏的情况下,外部服务根本无法满足您的吞吐量要求,那么背压是不可避免的。这将变得更加难以管理,但是 unaligned checkpoints,即将在 Flink 1.11 中出现,应该会有所帮助。
我最近遇到了类似的问题。 @David Anderson 提供的建议真的很好!不过,我还有几点要补充。
你可以试试tune your checkpoints according to Apache Flink documentation.
在我的例子中,检查点间隔低于检查点之间的最小暂停时间,所以我增加了它以使其更大。在我的例子中,我将检查点间隔乘以 2,并将此值设置为检查点之间的最小暂停。
您也可以尝试增加检查点超时。
另一个问题可能是 ValueState
。我的流水线在很长一段时间内保持状态,并且没有驱逐导致吞吐量问题的原因。我设置TTL for the ValueState
(in my case for 30 minutes) and it started to work better. TTL is well described in the Apache Flink documentation。它真的很简单,看起来像这样:
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
还值得注意的是,此 SO 线程与类似主题相关:Flink Checkpoint Failure - Checkpoints time out after 10 mins 并且其中提供的提示可能有用。
此致,
彼得