如何解决流程功能中的繁忙时间问题?

How can I solve busy time problem in process function?

我有一个带有无界流的 flink(v1.13.3) 应用程序(使用 kafka)。我的一个流很忙。随着时间的推移,繁忙值(我可以在 UI 上看到)也会增加。当我刚启动flink应用时:

这个功能很简单,状态后端使用rocksdb:


public class MyObj implements Serializable
{

    private Set<String> distinctValues;

    public MyObj()
    {
        this.distinctValues = new HashSet<>();
    }

    public Set<String> getDistinctValues() {
        return distinctValues;
    }

    public void setDistinctValues(Set<String> values) {
        this.distinctValues = values;
    }
}

public class MyProcessFunction extends KeyedProcessFunction<String, KafkaRecord, Output> 
{
    
    private transient ValueState<MyObj> state;

    @Override
    public void open(Configuration parameters)
    {

        ValueStateDescriptor<MyObj> stateDescriptor = new ValueStateDescriptor<>("MyObj",
                TypeInformation.of(MyObj.class));
        state = getRuntimeContext().getState(stateDescriptor);

    }

    @Override
    public void processElement(KafkaRecord value, Context ctx, Collector<Output> out) throws Exception
    {
        MyObj stateValue = state.value();
        if (stateValue == null)
        {
            stateValue = new MyObj();
            ctx.timerService().registerProcessingTimeTimer(value.getTimestamp() + 10mins);
        }

        stateValue.getDistinctValues().add(value.getValue());
        if (stateValue.getDistinctValues().size() >= 20)
        {
            state.clear();
        }
        else
        {
            state.update(stateValue);
        }
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Output> out)
    {
        state.clear();
    }
}

注意:在实现 valueState 之前,我只是使用 ListState。但是使用 with listState flink_taskmanager_job_task_busyTimeMsPerSecond returns 25-30sn:

public class MyProcessFunction extends extends KeyedProcessFunction<String, KafkaRecord, Output> 
{

    private transient ListState<String> listState;

    @Override
    public void open(Configuration parameters)
    {
        ListStateDescriptor<String> listStateDescriptor = new ListStateDescriptor<>("myobj", TypeInformation.of(String.class));
        listState = getRuntimeContext().getListState(listStateDescriptor);
    }

    @Override
    public void processElement(KafkaRecord value, Context ctx, Collector<KafkaRecord> out) throws Exception
    {
        List<String> values = IteratorUtils.toList(listState.get().iterator());

        if (CollectionUtils.isEmpty(values))
        {
            ctx.timerService().registerProcessingTimeTimer(value.getTimestamp() + 10min);
        }

        if (!values.contains(value.getValue()))
        {
            values.add(value.getValue());
            listState.update(values);
        }
        
        if (values.size() >= 20)
        {
            ...
        }
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<KafkaRecord> out)
    {
        listState.clear();
    }
}

一旦 RocksDB 达到工作状态不再适合内存的程度,预计会出现一些减速。但是,在这种情况下,您应该能够通过从 ValueState 切换到 MapState.

来显着提高性能

目前您正在反序列化和重新序列化每条记录的整个 hashSet。随着这些哈希集随时间增长,性能会下降。

RocksDB 状态后端具有 MapState 的优化实现。地图中每个单独的 key/value 条目都存储为单独的 RocksDB 对象,因此您可以查找、插入和更新条目,而无需在地图的其余部分执行 serde。

ListState 也针对 RocksDB 进行了优化(它可以在不反序列化列表的情况下附加到)。一般来说,最好避免在使用 RocksDB 时将集合存储在 ValueState 中,并尽可能使用 ListStateMapState

由于基于堆的状态后端将其工作状态保持为堆上的对象,因此不存在相同的问题。