Flink Collector.collect(T) 消耗超过 150 秒

Flink Collector.collect(T) is consuming more than 150 sec

我正在扩展 Apache Flink 的 KeyedProcessFunction 来定义工作流。我的工作流程由大约 10-15 个处理器组成。所有其他处理器 collector.collect(T) 在 1 秒内完成。而在最坏的情况下,一个特定的 ProcessFuntion 需要超过 150 秒。此流程函数发出与其他流程函数相同类型的有效负载。有效载荷的大小也与其他处理器非常相似。我还在每个 keyedProcessFunction 之后依赖 KeyBy()。 KeyBy() 对所有 processfunction 具有相同的定义,并且在整个 workFlow 中依赖于相同的属性。

如何debug/resolve collector.collect 占用这么多时间的问题?

我使用的是 Flink 1.8.0.

public class AProcessFunction extends KeyedProcessFunction<String, Foo, Foo> {

    @Override
    public void processElement(Foo foo, Context ctx, Collector<Foo> out) {

        try {
            if(Contant.Foo.equals(foo.y)) {
                collect(foo, out);
                return;
            }
            work(foo);
            collectEventTimePayload(foo, out);
        } catch (Exception e) {
            log.error("error occurred while processing {} with exception", x, e);
        }
    }

    @Timed(aspect = "ProcessFunctionWork")
    private void work(Foo foo) {
        //some business logic. In worst casem time taken is 400 ms.
    }

    @Timed(aspect = "AProcessFunctionCollector")
    private void collect(Foo foo, Collector<Foo> out) {
        out.collect(foo);
    }


    @Timed(aspect = "AProcessFunctionEventTimeCollector")
    private void collectEventTimePayload(Foo foo, Collector<Foo> out) {
        if(CollectionUtils.isNotEmpty(foo.ids())){
            collect(foo, out);
        }
    }
}
public class BProcessFunction extends KeyedProcessFunction<String, Foo, Foo> {
    private final ProviderWorker providerWorker;
    @Override
    public void processElement(Foo foo, Context ctx, Collector<Foo> out) {
        try {
            if(!handleResourceIdExceptions(foo, out)) {
                Optional<Foo> workedFoo = providerWorker.get(foo.getEventType())
                    .work(foo);
                if (workedFoo.isPresent()) {
                    collectorCollect(workedFoo.get(), out);
                    return;
                }
            }
            collectorCollect(foo, out);
        } catch (Exception e) {
            log.error("error occurred while processing {} with exception", foo, e);
        }
    }



    @Timed(aspect = "BProcessFunctionCollector")
    private void collectorCollect(Foo foo, Collector<Foo> out) {
        out.collect(foo);
    }
}

AProcessFunction.collect() 在最坏的情况下需要 150 秒。而 BProcessFunction 需要 < 30 毫秒。 我的工作流程是

dataStream.keyBy(fooKeyByFunction).process(someOtherProcessFunction).keyBy(fooKeyByFunction).process(aProcessFunction).keyBy(fooKeyByFunction).process(bProcessFunction).keyBy(fooKeyByFunction).process(cProcessFunction).keyBy(fooKeyByFunction).sink(sink);

collector.collect 方法究竟有什么作用?它包括消息写入缓冲区之前的时间,还是包括下一个任务的输入缓冲区填满之前的时间?

Collector.collect 以阻塞方式将数据写入缓冲区,这些缓冲区通过网络异步发送到相应的任务。所以所需的时间取决于序列化时间 + 如果所有缓冲区都用完了,等待空闲缓冲区的时间。缓冲区只有在通过网络发送到下游任务后才可用。如果该任务出现瓶颈,则意味着无法立即发送缓冲区并受到背压。

在你的情况下,我怀疑你确实有背压(在 Web UI 中很容易看到)并且缓冲区需要很长时间才能可用。常见的背压情况有两种:

  • 来自sink:如果写入数据的时间比产生数据的时间长,最终你的整个DAG都会被背压,整个处理速度就会变成sink的写入速度。解决方案可能是使用不同的接收器或增强目标系统。
  • 来自数据倾斜:其中一个 keyby 可能使用少数值占主导地位的键。这意味着您的整个并行性有效地下降到这几个值。每个键只能由一个子任务处理(一致性保证)。然后这个子任务过载,而该特定流程功能的其他子任务或多或少地空闲。这里的解决方案是使用不同的密钥或使用一些支持预聚合的聚合。

在这两种情况下,出发点都是缩小 Web 问题的范围 UI。很乐意提供更多信息。

注意:根据您的消息来源,我完全没有必要 keyBy。如果没有 keyBy,您可能会获得更好的并行性,而且速度应该更快。