Flink Collector.collect(T) 消耗超过 150 秒
Flink Collector.collect(T) is consuming more than 150 sec
我正在扩展 Apache Flink 的 KeyedProcessFunction
来定义工作流。我的工作流程由大约 10-15 个处理器组成。所有其他处理器 collector.collect(T) 在 1 秒内完成。而在最坏的情况下,一个特定的 ProcessFuntion 需要超过 150 秒。此流程函数发出与其他流程函数相同类型的有效负载。有效载荷的大小也与其他处理器非常相似。我还在每个 keyedProcessFunction 之后依赖 KeyBy()。 KeyBy() 对所有 processfunction 具有相同的定义,并且在整个 workFlow 中依赖于相同的属性。
如何debug/resolve collector.collect 占用这么多时间的问题?
我使用的是 Flink 1.8.0.
public class AProcessFunction extends KeyedProcessFunction<String, Foo, Foo> {
@Override
public void processElement(Foo foo, Context ctx, Collector<Foo> out) {
try {
if(Contant.Foo.equals(foo.y)) {
collect(foo, out);
return;
}
work(foo);
collectEventTimePayload(foo, out);
} catch (Exception e) {
log.error("error occurred while processing {} with exception", x, e);
}
}
@Timed(aspect = "ProcessFunctionWork")
private void work(Foo foo) {
//some business logic. In worst casem time taken is 400 ms.
}
@Timed(aspect = "AProcessFunctionCollector")
private void collect(Foo foo, Collector<Foo> out) {
out.collect(foo);
}
@Timed(aspect = "AProcessFunctionEventTimeCollector")
private void collectEventTimePayload(Foo foo, Collector<Foo> out) {
if(CollectionUtils.isNotEmpty(foo.ids())){
collect(foo, out);
}
}
}
public class BProcessFunction extends KeyedProcessFunction<String, Foo, Foo> {
private final ProviderWorker providerWorker;
@Override
public void processElement(Foo foo, Context ctx, Collector<Foo> out) {
try {
if(!handleResourceIdExceptions(foo, out)) {
Optional<Foo> workedFoo = providerWorker.get(foo.getEventType())
.work(foo);
if (workedFoo.isPresent()) {
collectorCollect(workedFoo.get(), out);
return;
}
}
collectorCollect(foo, out);
} catch (Exception e) {
log.error("error occurred while processing {} with exception", foo, e);
}
}
@Timed(aspect = "BProcessFunctionCollector")
private void collectorCollect(Foo foo, Collector<Foo> out) {
out.collect(foo);
}
}
AProcessFunction.collect() 在最坏的情况下需要 150 秒。而 BProcessFunction 需要 < 30 毫秒。
我的工作流程是
dataStream.keyBy(fooKeyByFunction).process(someOtherProcessFunction).keyBy(fooKeyByFunction).process(aProcessFunction).keyBy(fooKeyByFunction).process(bProcessFunction).keyBy(fooKeyByFunction).process(cProcessFunction).keyBy(fooKeyByFunction).sink(sink);
collector.collect 方法究竟有什么作用?它包括消息写入缓冲区之前的时间,还是包括下一个任务的输入缓冲区填满之前的时间?
Collector.collect
以阻塞方式将数据写入缓冲区,这些缓冲区通过网络异步发送到相应的任务。所以所需的时间取决于序列化时间 + 如果所有缓冲区都用完了,等待空闲缓冲区的时间。缓冲区只有在通过网络发送到下游任务后才可用。如果该任务出现瓶颈,则意味着无法立即发送缓冲区并受到背压。
在你的情况下,我怀疑你确实有背压(在 Web UI 中很容易看到)并且缓冲区需要很长时间才能可用。常见的背压情况有两种:
- 来自sink:如果写入数据的时间比产生数据的时间长,最终你的整个DAG都会被背压,整个处理速度就会变成sink的写入速度。解决方案可能是使用不同的接收器或增强目标系统。
- 来自数据倾斜:其中一个 keyby 可能使用少数值占主导地位的键。这意味着您的整个并行性有效地下降到这几个值。每个键只能由一个子任务处理(一致性保证)。然后这个子任务过载,而该特定流程功能的其他子任务或多或少地空闲。这里的解决方案是使用不同的密钥或使用一些支持预聚合的聚合。
在这两种情况下,出发点都是缩小 Web 问题的范围 UI。很乐意提供更多信息。
注意:根据您的消息来源,我完全没有必要 keyBy
。如果没有 keyBy
,您可能会获得更好的并行性,而且速度应该更快。
我正在扩展 Apache Flink 的 KeyedProcessFunction
来定义工作流。我的工作流程由大约 10-15 个处理器组成。所有其他处理器 collector.collect(T) 在 1 秒内完成。而在最坏的情况下,一个特定的 ProcessFuntion 需要超过 150 秒。此流程函数发出与其他流程函数相同类型的有效负载。有效载荷的大小也与其他处理器非常相似。我还在每个 keyedProcessFunction 之后依赖 KeyBy()。 KeyBy() 对所有 processfunction 具有相同的定义,并且在整个 workFlow 中依赖于相同的属性。
如何debug/resolve collector.collect 占用这么多时间的问题?
我使用的是 Flink 1.8.0.
public class AProcessFunction extends KeyedProcessFunction<String, Foo, Foo> {
@Override
public void processElement(Foo foo, Context ctx, Collector<Foo> out) {
try {
if(Contant.Foo.equals(foo.y)) {
collect(foo, out);
return;
}
work(foo);
collectEventTimePayload(foo, out);
} catch (Exception e) {
log.error("error occurred while processing {} with exception", x, e);
}
}
@Timed(aspect = "ProcessFunctionWork")
private void work(Foo foo) {
//some business logic. In worst casem time taken is 400 ms.
}
@Timed(aspect = "AProcessFunctionCollector")
private void collect(Foo foo, Collector<Foo> out) {
out.collect(foo);
}
@Timed(aspect = "AProcessFunctionEventTimeCollector")
private void collectEventTimePayload(Foo foo, Collector<Foo> out) {
if(CollectionUtils.isNotEmpty(foo.ids())){
collect(foo, out);
}
}
}
public class BProcessFunction extends KeyedProcessFunction<String, Foo, Foo> {
private final ProviderWorker providerWorker;
@Override
public void processElement(Foo foo, Context ctx, Collector<Foo> out) {
try {
if(!handleResourceIdExceptions(foo, out)) {
Optional<Foo> workedFoo = providerWorker.get(foo.getEventType())
.work(foo);
if (workedFoo.isPresent()) {
collectorCollect(workedFoo.get(), out);
return;
}
}
collectorCollect(foo, out);
} catch (Exception e) {
log.error("error occurred while processing {} with exception", foo, e);
}
}
@Timed(aspect = "BProcessFunctionCollector")
private void collectorCollect(Foo foo, Collector<Foo> out) {
out.collect(foo);
}
}
AProcessFunction.collect() 在最坏的情况下需要 150 秒。而 BProcessFunction 需要 < 30 毫秒。 我的工作流程是
dataStream.keyBy(fooKeyByFunction).process(someOtherProcessFunction).keyBy(fooKeyByFunction).process(aProcessFunction).keyBy(fooKeyByFunction).process(bProcessFunction).keyBy(fooKeyByFunction).process(cProcessFunction).keyBy(fooKeyByFunction).sink(sink);
collector.collect 方法究竟有什么作用?它包括消息写入缓冲区之前的时间,还是包括下一个任务的输入缓冲区填满之前的时间?
Collector.collect
以阻塞方式将数据写入缓冲区,这些缓冲区通过网络异步发送到相应的任务。所以所需的时间取决于序列化时间 + 如果所有缓冲区都用完了,等待空闲缓冲区的时间。缓冲区只有在通过网络发送到下游任务后才可用。如果该任务出现瓶颈,则意味着无法立即发送缓冲区并受到背压。
在你的情况下,我怀疑你确实有背压(在 Web UI 中很容易看到)并且缓冲区需要很长时间才能可用。常见的背压情况有两种:
- 来自sink:如果写入数据的时间比产生数据的时间长,最终你的整个DAG都会被背压,整个处理速度就会变成sink的写入速度。解决方案可能是使用不同的接收器或增强目标系统。
- 来自数据倾斜:其中一个 keyby 可能使用少数值占主导地位的键。这意味着您的整个并行性有效地下降到这几个值。每个键只能由一个子任务处理(一致性保证)。然后这个子任务过载,而该特定流程功能的其他子任务或多或少地空闲。这里的解决方案是使用不同的密钥或使用一些支持预聚合的聚合。
在这两种情况下,出发点都是缩小 Web 问题的范围 UI。很乐意提供更多信息。
注意:根据您的消息来源,我完全没有必要 keyBy
。如果没有 keyBy
,您可能会获得更好的并行性,而且速度应该更快。