如何为自定义处理器定义新指标(并使它们在 jconsole 中可用)?

How to define new metrics for custom Processor (and make them available in jconsole)?

我有一个应该生成 kstream JMX 指标的处理器:

public class ProcessorJMX implements Processor<String, GenericRecord> {
  private StreamsMetrics streamsMetrics;
  private Sensor sensorStartTs;

  @Override
  public void init(ProcessorContext processorContext) {
    streamsMetrics = processorContext.metrics();
    sensorStartTs = streamsMetrics.addSensor("start_ts", Sensor.RecordingLevel.INFO);
  } 
  @Override
  public void process(String key, GenericRecord val) {
    streamsMetrics.recordThroughput(sensorStartTs, Long.valueOf(val.get("start_ts").toString()));
  }
  @Override
  public void punctuate(long l) { }

  @Override
  public void close() { }
}

然后我在我的输出主题上使用它并开始我的集成测试。但是当我查看 jconsole 时,我在任何地方都看不到这个指标。我在哪里可以在 MBean 下的 jconsole 中找到它?

在它变得可见之前我还需要做其他事情吗?

这是我正在使用的属性:

Properties testProperties = new Properties();
testProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
testProperties.put("confluent.metrics.reporter.bootstrap.servers", CLUSTER.bootstrapServers());
testProperties.put("metrics.recording.level", "DEBUG");
testProperties.put("metric.reporters", "org.apache.kafka.common.metrics.JmxReporter");

这个配置有什么问题?

以下是我在init中添加的:

@Override
public void init(ProcessorContext processorContext) {
    streamsMetrics = processorContext.metrics();

    Map<String, String> metricTags = new HashMap<String, String>();
    metricTags.put("metricTagKey", "metricsTagVal");

    MetricConfig metricConfig = new MetricConfig().tags(metricTags);
    Metrics metrics = new Metrics(metricConfig);
    sensorStartTs = metrics.sensor("start_ts");
    MetricName metricName = metrics.metricName("x-name", "x-group", "x-description");
    sensorStartTs = streamsMetrics.addSensor("start_ts", Sensor.RecordingLevel.INFO);
    sensorStartTs.add(metricName, new Min());
}

MetricName class 有帮助。