如何解决流程功能中的繁忙时间问题?
How can I solve busy time problem in process function?
我有一个带有无界流的 flink(v1.13.3) 应用程序(使用 kafka)。我的一个流很忙。随着时间的推移,繁忙值(我可以在 UI 上看到)也会增加。当我刚启动flink应用时:
sum by(task_name) (flink_taskmanager_job_task_busyTimeMsPerSecond{job="Flink", task_name="MyProcessFunction"})
returns 300-450 毫秒
- 五小时后
sum by(task_name) (flink_taskmanager_job_task_busyTimeMsPerSecond{job="Flink", task_name="MyProcessFunction"})
returns 5-7 sn.
这个功能很简单,状态后端使用rocksdb:
public class MyObj implements Serializable
{
private Set<String> distinctValues;
public MyObj()
{
this.distinctValues = new HashSet<>();
}
public Set<String> getDistinctValues() {
return distinctValues;
}
public void setDistinctValues(Set<String> values) {
this.distinctValues = values;
}
}
public class MyProcessFunction extends KeyedProcessFunction<String, KafkaRecord, Output>
{
private transient ValueState<MyObj> state;
@Override
public void open(Configuration parameters)
{
ValueStateDescriptor<MyObj> stateDescriptor = new ValueStateDescriptor<>("MyObj",
TypeInformation.of(MyObj.class));
state = getRuntimeContext().getState(stateDescriptor);
}
@Override
public void processElement(KafkaRecord value, Context ctx, Collector<Output> out) throws Exception
{
MyObj stateValue = state.value();
if (stateValue == null)
{
stateValue = new MyObj();
ctx.timerService().registerProcessingTimeTimer(value.getTimestamp() + 10mins);
}
stateValue.getDistinctValues().add(value.getValue());
if (stateValue.getDistinctValues().size() >= 20)
{
state.clear();
}
else
{
state.update(stateValue);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Output> out)
{
state.clear();
}
}
注意:在实现 valueState 之前,我只是使用 ListState。但是使用 with listState flink_taskmanager_job_task_busyTimeMsPerSecond
returns 25-30sn:
public class MyProcessFunction extends extends KeyedProcessFunction<String, KafkaRecord, Output>
{
private transient ListState<String> listState;
@Override
public void open(Configuration parameters)
{
ListStateDescriptor<String> listStateDescriptor = new ListStateDescriptor<>("myobj", TypeInformation.of(String.class));
listState = getRuntimeContext().getListState(listStateDescriptor);
}
@Override
public void processElement(KafkaRecord value, Context ctx, Collector<KafkaRecord> out) throws Exception
{
List<String> values = IteratorUtils.toList(listState.get().iterator());
if (CollectionUtils.isEmpty(values))
{
ctx.timerService().registerProcessingTimeTimer(value.getTimestamp() + 10min);
}
if (!values.contains(value.getValue()))
{
values.add(value.getValue());
listState.update(values);
}
if (values.size() >= 20)
{
...
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<KafkaRecord> out)
{
listState.clear();
}
}
一旦 RocksDB 达到工作状态不再适合内存的程度,预计会出现一些减速。但是,在这种情况下,您应该能够通过从 ValueState
切换到 MapState
.
来显着提高性能
目前您正在反序列化和重新序列化每条记录的整个 hashSet。随着这些哈希集随时间增长,性能会下降。
RocksDB 状态后端具有 MapState
的优化实现。地图中每个单独的 key/value 条目都存储为单独的 RocksDB 对象,因此您可以查找、插入和更新条目,而无需在地图的其余部分执行 serde。
ListState
也针对 RocksDB 进行了优化(它可以在不反序列化列表的情况下附加到)。一般来说,最好避免在使用 RocksDB 时将集合存储在 ValueState
中,并尽可能使用 ListState
或 MapState
。
由于基于堆的状态后端将其工作状态保持为堆上的对象,因此不存在相同的问题。
我有一个带有无界流的 flink(v1.13.3) 应用程序(使用 kafka)。我的一个流很忙。随着时间的推移,繁忙值(我可以在 UI 上看到)也会增加。当我刚启动flink应用时:
sum by(task_name) (flink_taskmanager_job_task_busyTimeMsPerSecond{job="Flink", task_name="MyProcessFunction"})
returns 300-450 毫秒- 五小时后
sum by(task_name) (flink_taskmanager_job_task_busyTimeMsPerSecond{job="Flink", task_name="MyProcessFunction"})
returns 5-7 sn.
这个功能很简单,状态后端使用rocksdb:
public class MyObj implements Serializable
{
private Set<String> distinctValues;
public MyObj()
{
this.distinctValues = new HashSet<>();
}
public Set<String> getDistinctValues() {
return distinctValues;
}
public void setDistinctValues(Set<String> values) {
this.distinctValues = values;
}
}
public class MyProcessFunction extends KeyedProcessFunction<String, KafkaRecord, Output>
{
private transient ValueState<MyObj> state;
@Override
public void open(Configuration parameters)
{
ValueStateDescriptor<MyObj> stateDescriptor = new ValueStateDescriptor<>("MyObj",
TypeInformation.of(MyObj.class));
state = getRuntimeContext().getState(stateDescriptor);
}
@Override
public void processElement(KafkaRecord value, Context ctx, Collector<Output> out) throws Exception
{
MyObj stateValue = state.value();
if (stateValue == null)
{
stateValue = new MyObj();
ctx.timerService().registerProcessingTimeTimer(value.getTimestamp() + 10mins);
}
stateValue.getDistinctValues().add(value.getValue());
if (stateValue.getDistinctValues().size() >= 20)
{
state.clear();
}
else
{
state.update(stateValue);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Output> out)
{
state.clear();
}
}
注意:在实现 valueState 之前,我只是使用 ListState。但是使用 with listState flink_taskmanager_job_task_busyTimeMsPerSecond
returns 25-30sn:
public class MyProcessFunction extends extends KeyedProcessFunction<String, KafkaRecord, Output>
{
private transient ListState<String> listState;
@Override
public void open(Configuration parameters)
{
ListStateDescriptor<String> listStateDescriptor = new ListStateDescriptor<>("myobj", TypeInformation.of(String.class));
listState = getRuntimeContext().getListState(listStateDescriptor);
}
@Override
public void processElement(KafkaRecord value, Context ctx, Collector<KafkaRecord> out) throws Exception
{
List<String> values = IteratorUtils.toList(listState.get().iterator());
if (CollectionUtils.isEmpty(values))
{
ctx.timerService().registerProcessingTimeTimer(value.getTimestamp() + 10min);
}
if (!values.contains(value.getValue()))
{
values.add(value.getValue());
listState.update(values);
}
if (values.size() >= 20)
{
...
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<KafkaRecord> out)
{
listState.clear();
}
}
一旦 RocksDB 达到工作状态不再适合内存的程度,预计会出现一些减速。但是,在这种情况下,您应该能够通过从 ValueState
切换到 MapState
.
目前您正在反序列化和重新序列化每条记录的整个 hashSet。随着这些哈希集随时间增长,性能会下降。
RocksDB 状态后端具有 MapState
的优化实现。地图中每个单独的 key/value 条目都存储为单独的 RocksDB 对象,因此您可以查找、插入和更新条目,而无需在地图的其余部分执行 serde。
ListState
也针对 RocksDB 进行了优化(它可以在不反序列化列表的情况下附加到)。一般来说,最好避免在使用 RocksDB 时将集合存储在 ValueState
中,并尽可能使用 ListState
或 MapState
。
由于基于堆的状态后端将其工作状态保持为堆上的对象,因此不存在相同的问题。