以编程方式读取 Nifi 计数器值
Read Nifi Counter value programmatically
我正在开发一个自定义处理器,我想在其中读取 Nifi 计数器的值。除了使用 Nifi Rest Api "http://nifi-host:port/nifi-api/counters"?
之外,还有其他方法可以读取计数器的值吗?
没有。 Apache NiFi 没有任何直接的 APIs 可用于以编程方式读取计数器值。一种简单的方法是使用 GetHTTP
处理器并使用您提到的 NiFi REST API URL:http(s)://nifi-host:port/nifi-api/counters
.
然后使用 EvaluateJsonPath
从 GetHTTP
处理器收到的响应 JSON 中解析和读取计数器值。
虽然 read/write 计数器值不像修改流文件属性那么容易,但 Apache NiFi 确实有用于修改计数器的 API。然而,计数器的目的是为人类用户提供信息,而不是让处理器根据它们的值做出决定。根据您要完成的目标,使用本地地图或 DistributedMapCacheServer
and DistributedMapCacheClientService
. If the values are only relevant to this processor, you can just use an in-memory map to store and retrieve the values. If you need to communicate with other processors, use the cache () 可能会更成功。
Pierre Villard 有 written a good tutorial about using counters, and you can use ProcessSession#adjustCounter(String counter, int delta, boolean immediate)
to modify counter values. Because counters were not designed to allow programmatic there is no way to retrieve the CounterRepository
instance from the RepositoryContext
object. You may also want to read about Reporting Tasks
,因为根据您的目标,这可能是更好的实现方式。
根据安迪的建议,我使用反射读取计数器如下:
private void printCounters(ProcessSession session) throws NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
Class standardProcessSession=session.getClass();
Field fieldContext = standardProcessSession.getDeclaredField("context");
fieldContext.setAccessible(true);
Object processContext = fieldContext.get(session);
Class processContextClass = processContext.getClass();
Field fieldCounterRepo = processContextClass.getDeclaredField("counterRepo");
fieldCounterRepo.setAccessible(true);
Object counterRepo = fieldCounterRepo.get(processContext);
Method declaredMethod = counterRepo.getClass().getDeclaredMethod("getCounters");
ArrayList<Object> counters = (ArrayList<Object>)declaredMethod.invoke(counterRepo);
for(Object obj:counters) {
Method methodName = obj.getClass().getDeclaredMethod("getName");
methodName.setAccessible(true);
Method methodVal = obj.getClass().getDeclaredMethod("getValue");
methodVal.setAccessible(true);
System.out.println("Counter name: "+methodName.invoke(obj));
System.out.println("Counter value: "+methodVal.invoke(obj));
}
}
注意: NIFI 版本为 1.5.0
我正在开发一个自定义处理器,我想在其中读取 Nifi 计数器的值。除了使用 Nifi Rest Api "http://nifi-host:port/nifi-api/counters"?
之外,还有其他方法可以读取计数器的值吗?没有。 Apache NiFi 没有任何直接的 APIs 可用于以编程方式读取计数器值。一种简单的方法是使用 GetHTTP
处理器并使用您提到的 NiFi REST API URL:http(s)://nifi-host:port/nifi-api/counters
.
然后使用 EvaluateJsonPath
从 GetHTTP
处理器收到的响应 JSON 中解析和读取计数器值。
虽然 read/write 计数器值不像修改流文件属性那么容易,但 Apache NiFi 确实有用于修改计数器的 API。然而,计数器的目的是为人类用户提供信息,而不是让处理器根据它们的值做出决定。根据您要完成的目标,使用本地地图或 DistributedMapCacheServer
and DistributedMapCacheClientService
. If the values are only relevant to this processor, you can just use an in-memory map to store and retrieve the values. If you need to communicate with other processors, use the cache (
Pierre Villard 有 written a good tutorial about using counters, and you can use ProcessSession#adjustCounter(String counter, int delta, boolean immediate)
to modify counter values. Because counters were not designed to allow programmatic there is no way to retrieve the CounterRepository
instance from the RepositoryContext
object. You may also want to read about Reporting Tasks
,因为根据您的目标,这可能是更好的实现方式。
根据安迪的建议,我使用反射读取计数器如下:
private void printCounters(ProcessSession session) throws NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
Class standardProcessSession=session.getClass();
Field fieldContext = standardProcessSession.getDeclaredField("context");
fieldContext.setAccessible(true);
Object processContext = fieldContext.get(session);
Class processContextClass = processContext.getClass();
Field fieldCounterRepo = processContextClass.getDeclaredField("counterRepo");
fieldCounterRepo.setAccessible(true);
Object counterRepo = fieldCounterRepo.get(processContext);
Method declaredMethod = counterRepo.getClass().getDeclaredMethod("getCounters");
ArrayList<Object> counters = (ArrayList<Object>)declaredMethod.invoke(counterRepo);
for(Object obj:counters) {
Method methodName = obj.getClass().getDeclaredMethod("getName");
methodName.setAccessible(true);
Method methodVal = obj.getClass().getDeclaredMethod("getValue");
methodVal.setAccessible(true);
System.out.println("Counter name: "+methodName.invoke(obj));
System.out.println("Counter value: "+methodVal.invoke(obj));
}
}
注意: NIFI 版本为 1.5.0