收集 flink flatmap 函数所用平均时间的指标

Collecting metrics for average time taken by flink flatmap function

我已经实现了一个 flink flatmap 函数,我想为这个函数收集我计划通过 prometheus 监控的平均时间指标。

有什么好的方法吗?我在方法中添加了一个量规(从 flink API 扩展 Gauge 接口)。

public class SimpleGauge<T> implements Gauge<T> {

    private T mValue;

    @Override
    public T getValue() {
        return mValue;
    }

    public void setValue(T value){
        mValue = value;
    }
}

然后我从 flatmap 函数调用 setValue:

float endTime = (System.currentTimeMillis() - startTime) / 1000F;
this.gauge.setValue(endTime);

有用吗?

它是否有效取决于您如何创建 Gauge。如果您使用的是普通 FlatMapFunction,您可能应该切换到 RichFlatMapFunction,然后初始化并注册您的仪表 getRuntimeContext

此外,您可能需要阅读一些关于度量类型的内容,因为在这种情况下 Histogram 似乎比 Gauge 更好,因为在大多数情况下测量平均延迟通常不是最好的主意个案。