如何将 flink 用户自定义指标导出到 prometheus & grafana
How to export flink user custom metric to prometheus & grafana
我正在尝试根据 this tutorial
创建我的自定义指标变量
使用它提供的示例代码,我可以获得事件和直方图。
我对 prometheus 和 grafana 如何使用标识符感到困惑。我也尝试稍微修改示例代码,但指标不再有效。
此外,我只能访问系统指标,不能访问我自己的指标。
我的问题是:
- 如何访问我创建的计数器?例如计数器 1
- metricGroup 到底是什么?
- 例如,我想检测一个模式
来自输入流,在
指标或只是将结果输出到时间序列数据库,如
influxdb?
提前致谢。
这是地图函数
class FlinkMetricsExposingMapFunction extends RichMapFunction<SensorReading, SensorReading> {
private static final long serialVersionUID = 1L;
private transient Counter eventCounter;
private transient Counter customCounter1;
private transient Counter customCounter2;
@Override
public void open(Configuration parameters) {
eventCounter = getRuntimeContext()
.getMetricGroup().counter("events");
customCounter1 = getRuntimeContext()
.getMetricGroup()
.addGroup("customCounterKey", "mod2")
.counter("counter1");
customCounter2 = getRuntimeContext()
.getMetricGroup()
.addGroup("customCounterKey", "mod5")
.counter("counter2");
// meter = getRuntimeContext().getMetricGroup().meter("eventMeter", new DropwizardMeterWrapper(dropwizardMeter));
}
@Override
public SensorReading map(SensorReading value) {
eventCounter.inc();
if (value.getCurrTimestamp() % 2 == 0)
customCounter1.inc();
if (value.getCurrTimestamp() % 5 == 0)
customCounter2.inc();
if (value.getCurrTimestamp() % 2 == 0 && value.getCurrTimestamp() % 5 == 0)
customCounter1.dec();
return value;
}
}
示例作业:
env
.addSource(new SimpleSensorReadingGenerator())
.name(SimpleSensorReadingGenerator.class.getSimpleName())
.map(new FlinkMetricsExposingMapFunction())
.name(FlinkMetricsExposingMapFunction.class.getSimpleName())
.print()
.name(DataStreamSink.class.getSimpleName());
更新
从 grafana 访问 flink 指标的屏幕截图:
flink-config.yaml
FROM flink:1.9.0
RUN echo "metrics.reporters: prom" >> "$FLINK_HOME/conf/flink-conf.yaml"; \
echo "metrics.latency.interval: 1000" >> "$FLINK_HOME/conf/flink-conf.yaml"; \
echo "metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter" >> "$FLINK_HOME/conf/flink-conf.yaml"; \
mv $FLINK_HOME/opt/flink-metrics-prometheus-*.jar $FLINK_HOME/lib
COPY --from=builder /home/gradle/build/libs/*.jar $FLINK_HOME/lib/
教程中的默认地图函数:
@Override
public void open(Configuration parameters) {
eventCounter = getRuntimeContext().getMetricGroup().counter("events");
valueHistogram = getRuntimeContext()
.getMetricGroup()
.histogram("value_histogram", new DescriptiveStatisticsHistogram(10_000_000));
}
您创建的计数器可由 <system-scope>. customCounterKey.mod2.counter1
访问。 <system-scope>
定义在你的 flink-conf.yaml 中。如果您没有在此处定义它,则默认值为 <host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>
.
度量组基本上定义了度量名称的层次结构。根据文档,度量组是度量的命名容器。它由 3 个部分(范围)组成:系统范围(在 flink-conf.yaml 中定义)、用户范围(无论您在 addGroup()
中定义什么)和指标名称。
这取决于你想衡量什么。对于您可以检测到的计数器、量规或仪表的所有内容,我会选择指标。如果涉及到直方图,如果你使用普罗米修斯记者,你应该仔细看看你从 flink 得到了什么。 Flink 概括了所有不同的度量框架——直方图在 prometheus 中的实现方式与例如在 prometheus 中不同。石墨。桶的定义由 flink 给出,据我所知无法更改(尽管有一些反射魔法)。
所有这些都在此处进行了更详细的描述:https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#registering-metrics
希望对您有所帮助。
我正在尝试根据 this tutorial
创建我的自定义指标变量使用它提供的示例代码,我可以获得事件和直方图。
我对 prometheus 和 grafana 如何使用标识符感到困惑。我也尝试稍微修改示例代码,但指标不再有效。
此外,我只能访问系统指标,不能访问我自己的指标。
我的问题是:
- 如何访问我创建的计数器?例如计数器 1
- metricGroup 到底是什么?
- 例如,我想检测一个模式 来自输入流,在 指标或只是将结果输出到时间序列数据库,如 influxdb?
提前致谢。
这是地图函数
class FlinkMetricsExposingMapFunction extends RichMapFunction<SensorReading, SensorReading> {
private static final long serialVersionUID = 1L;
private transient Counter eventCounter;
private transient Counter customCounter1;
private transient Counter customCounter2;
@Override
public void open(Configuration parameters) {
eventCounter = getRuntimeContext()
.getMetricGroup().counter("events");
customCounter1 = getRuntimeContext()
.getMetricGroup()
.addGroup("customCounterKey", "mod2")
.counter("counter1");
customCounter2 = getRuntimeContext()
.getMetricGroup()
.addGroup("customCounterKey", "mod5")
.counter("counter2");
// meter = getRuntimeContext().getMetricGroup().meter("eventMeter", new DropwizardMeterWrapper(dropwizardMeter));
}
@Override
public SensorReading map(SensorReading value) {
eventCounter.inc();
if (value.getCurrTimestamp() % 2 == 0)
customCounter1.inc();
if (value.getCurrTimestamp() % 5 == 0)
customCounter2.inc();
if (value.getCurrTimestamp() % 2 == 0 && value.getCurrTimestamp() % 5 == 0)
customCounter1.dec();
return value;
}
}
示例作业:
env
.addSource(new SimpleSensorReadingGenerator())
.name(SimpleSensorReadingGenerator.class.getSimpleName())
.map(new FlinkMetricsExposingMapFunction())
.name(FlinkMetricsExposingMapFunction.class.getSimpleName())
.print()
.name(DataStreamSink.class.getSimpleName());
更新
从 grafana 访问 flink 指标的屏幕截图:
flink-config.yaml
FROM flink:1.9.0
RUN echo "metrics.reporters: prom" >> "$FLINK_HOME/conf/flink-conf.yaml"; \
echo "metrics.latency.interval: 1000" >> "$FLINK_HOME/conf/flink-conf.yaml"; \
echo "metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter" >> "$FLINK_HOME/conf/flink-conf.yaml"; \
mv $FLINK_HOME/opt/flink-metrics-prometheus-*.jar $FLINK_HOME/lib
COPY --from=builder /home/gradle/build/libs/*.jar $FLINK_HOME/lib/
教程中的默认地图函数:
@Override
public void open(Configuration parameters) {
eventCounter = getRuntimeContext().getMetricGroup().counter("events");
valueHistogram = getRuntimeContext()
.getMetricGroup()
.histogram("value_histogram", new DescriptiveStatisticsHistogram(10_000_000));
}
您创建的计数器可由
<system-scope>. customCounterKey.mod2.counter1
访问。<system-scope>
定义在你的 flink-conf.yaml 中。如果您没有在此处定义它,则默认值为<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>
.度量组基本上定义了度量名称的层次结构。根据文档,度量组是度量的命名容器。它由 3 个部分(范围)组成:系统范围(在 flink-conf.yaml 中定义)、用户范围(无论您在
addGroup()
中定义什么)和指标名称。这取决于你想衡量什么。对于您可以检测到的计数器、量规或仪表的所有内容,我会选择指标。如果涉及到直方图,如果你使用普罗米修斯记者,你应该仔细看看你从 flink 得到了什么。 Flink 概括了所有不同的度量框架——直方图在 prometheus 中的实现方式与例如在 prometheus 中不同。石墨。桶的定义由 flink 给出,据我所知无法更改(尽管有一些反射魔法)。
所有这些都在此处进行了更详细的描述:https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#registering-metrics
希望对您有所帮助。