以编程方式读取 Nifi 计数器值

Read Nifi Counter value programmatically

我正在开发一个自定义处理器,我想在其中读取 Nifi 计数器的值。除了使用 Nifi Rest Api "http://nifi-host:port/nifi-api/counters"?

之外,还有其他方法可以读取计数器的值吗?

没有。 Apache NiFi 没有任何直接的 APIs 可用于以编程方式读取计数器值。一种简单的方法是使用 GetHTTP 处理器并使用您提到的 NiFi REST API URL:http(s)://nifi-host:port/nifi-api/counters.

然后使用 EvaluateJsonPathGetHTTP 处理器收到的响应 JSON 中解析和读取计数器值。

虽然 read/write 计数器值不像修改流文件属性那么容易,但 Apache NiFi 确实有用于修改计数器的 API。然而,计数器的目的是为人类用户提供信息,而不是让处理器根据它们的值做出决定。根据您要完成的目标,使用本地地图或 DistributedMapCacheServer and DistributedMapCacheClientService. If the values are only relevant to this processor, you can just use an in-memory map to store and retrieve the values. If you need to communicate with other processors, use the cache () 可能会更成功。

Pierre Villard 有 written a good tutorial about using counters, and you can use ProcessSession#adjustCounter(String counter, int delta, boolean immediate) to modify counter values. Because counters were not designed to allow programmatic there is no way to retrieve the CounterRepository instance from the RepositoryContext object. You may also want to read about Reporting Tasks,因为根据您的目标,这可能是更好的实现方式。

根据安迪的建议,我使用反射读取计数器如下:

private void  printCounters(ProcessSession session) throws NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
        Class standardProcessSession=session.getClass();
        Field fieldContext = standardProcessSession.getDeclaredField("context");
        fieldContext.setAccessible(true);
        Object processContext =  fieldContext.get(session);
        Class processContextClass = processContext.getClass();
        Field fieldCounterRepo = processContextClass.getDeclaredField("counterRepo");
        fieldCounterRepo.setAccessible(true);
        Object counterRepo = fieldCounterRepo.get(processContext);
        Method declaredMethod = counterRepo.getClass().getDeclaredMethod("getCounters");

        ArrayList<Object> counters = (ArrayList<Object>)declaredMethod.invoke(counterRepo);
        for(Object obj:counters) {
            Method methodName = obj.getClass().getDeclaredMethod("getName");
            methodName.setAccessible(true);
            Method methodVal = obj.getClass().getDeclaredMethod("getValue");
            methodVal.setAccessible(true);
            System.out.println("Counter name: "+methodName.invoke(obj));
            System.out.println("Counter value: "+methodVal.invoke(obj));
        }
    }

注意: NIFI 版本为 1.5.0