如何从 NiFi 1.11.4 中的 processContext 获取连接列表

How do I get the get list of connections from processContext in NiFi 1.11.4

我们的 NiFi 生产实例是 1.8.0 版本。我们有一个自定义处理器,它不断查看它的下游连接,以便根据连接的队列大小路由流文件。

这是我们如何做到这一点的重要片段。 . .

String processorId = this.getIdentifier();

ProcessorGroupStatus processGroupStatus = ((EventAccess) getControllerService()).getContollerStatus();
Collection<ConnectionStatus> groupConnections = processGroupStatus.getConnectionStatus();
ArrayList connections = new ArrayList<>(groupConnections);

for (Object processorConnection : connections) {

    ConnectionStatus connection = (ConnectionStatus) processorConnection;
    if(connection.getSourceId().equals(processorId){

        //do stuff with connection.getQueuedCount() & connection.getQueuedBytes()
        break;
    }
}

过去几年一切都按预期进行。但是,将我们的 NiFi 实例升级到版本 1.11.4 已经打破了这种方法。抛出的异常是:

class org.apache.nifi.contoller.serviceStandardContollerServiceProvider cannot be cast to class org.apache.nifi.reporting.EventAccess

还有其他方法可以从 processContext 检索连接吗?

一种可能比自定义 Java 处理器更向上兼容(并且更易于维护)的方法是使用 ExecuteGroovyScript处理器。

本例中的 Groovy 脚本类似于:

ff = session.get()
if (ff) {
  me = context.procNode
  processorId = me.identifier
  connections = me.processGroup.connections

  connections.each { connection ->
    if(connection.source.identifier.equals(processorId)) {
      ff[connection.identifier] = "I am the source " +
         "[" + connection.flowFileQueue.size().objectCount + "]" +
         "[" + connection.flowFileQueue.size().byteCount + "]"
    }
    else {
      ff[connection.identifier] = "I am NOT the source; my name is [" + connection.name + "]"
    }
  }

  REL_SUCCESS << ff
}

为了找出 Groovy 脚本可用的内容,我结合使用了 NiFi JavaDocs (https://javadoc.io/static/org.apache.nifi/nifi-api/1.12.0/index.html) and the Github code for NiFi (https://github.com/apache/nifi/tree/c396927299586b896df4ebc745793b4c451f3898/nifi-api/src/main/java/org/apache/nifi).

附带说明一下,我们将自定义 Java 处理器转换为 Groovy 脚本,因为升级到 时不兼容(讽刺的是)1.8 .0。从那时起我们就没有遇到过 NiFi 升级的问题,目前是 运行 v 1.11.4.