NiFi Rest API 在获取远程进程组详细信息时出现问题

Issue with NiFi Rest API in fetching Remote Process Group details

使用下面提到的 NiFi Rest API 端点和代码片段,

我正在获取远程进程组 (RPG) 列表,迭代并获取每个 RPG 详细信息。问题是,我得到的 RPG 数据不准确。如果我命中此端点 (https://nifihost:8080/nifi-api/remote-process-groups/{id}),我将收到准确的详细信息。请澄清,

  1. 为什么这两个结果有出入 终点? (https://nifihost:8080/nifi-api/process-groups/{id}/remote-process-groups 比。 https://nifihost:8080/nifi-api/remote-process-groups/{id})
  2. 因为我的要求是遍历每个过程组,得到一个列表 其中的远程进程组 (RPG) 并获取每个 RPG 详细信息?什么是正确的方法 要做到这一点?

端点:

https://nifihost:8080/nifi-api/process-groups/{id}/remote-process-groups

源代码

    ArrayList<NifiRemoteProcessGroup> remoteProcessGroupArrayList = new ArrayList<>();
    String returnedJSON = "";
    String remoteProcessGroupURL =  getNifiURL() + "/nifi-api/process-groups/" + processGroup + "/remote-process-groups";

    HttpEntity httpEntity = RestCall.oAuthHeaders(token);
    RestTemplate restTemplate = new RestTemplate();

    try{
        ResponseEntity<String> response = restTemplate.exchange(remoteProcessGroupURL,HttpMethod.GET,httpEntity,String.class);
        returnedJSON = response.getBody();
    }
    catch(Exception e){
        logger.error("There was an error retrieving the remote-process-groups : " + e.getMessage());
    }

    try{
        ObjectMapper objectMapper = new ObjectMapper();
        JsonNode rootNode = objectMapper.readTree(returnedJSON);
        JsonNode processorNode = rootNode.path("remoteProcessGroups");
        Iterator<JsonNode> elements = processorNode.elements();

        while(elements.hasNext()){
            JsonNode remoteProcessGroup = elements.next();
            JsonNode statusElement = remoteProcessGroup.path("status");
            JsonNode bulletinElement = remoteProcessGroup.path("bulletins");
            JsonNode componentElement = remoteProcessGroup.path("component");
            JsonNode aggregateSnapshot = statusElement.path("aggregateSnapshot");

            NifiRemoteProcessGroup remoteProcessGroupInstance = new NifiRemoteProcessGroup();
            remoteProcessGroupInstance.setRemoteProcessGroupId(checkExists(statusElement,"id"));
            remoteProcessGroupInstance.setRemoteProcessGroupName(checkExists(componentElement,"name"));
            remoteProcessGroupInstance.setRemoteProcessGroupGroupId(checkExists(statusElement,"groupId"));
            remoteProcessGroupInstance.setRemoteProcessGroupTargetURL(checkExists(componentElement,"targetUri"));
            remoteProcessGroupInstance.setRemoteProcessGroupBulletins(bulletinElement.asText());
            remoteProcessGroupInstance.setRemoteProcessGroupTransmitting(Boolean.valueOf(checkExists(componentElement,"transmitting")));
            remoteProcessGroupInstance.setRemoteProcessGroupTransmissionStatus(checkExists(statusElement,"transmissionStatus"));
            remoteProcessGroupInstance.setRemoteProcessGroupActiveThreadCount(Double.valueOf(checkExists(aggregateSnapshot,"activeThreadCount")));
            remoteProcessGroupInstance.setRemoteProcessGroupFlowFilesReceived(Double.valueOf(checkExists(aggregateSnapshot,"flowFilesReceived")));
            remoteProcessGroupInstance.setRemoteProcessGroupBytesReceived(Double.valueOf(checkExists(aggregateSnapshot,"bytesReceived")));
            remoteProcessGroupInstance.setRemoteProcessGroupReceived(checkExists(aggregateSnapshot,"received"));
            remoteProcessGroupArrayList.add(remoteProcessGroupInstance);
        }
    }
    catch(Exception e){
        logger.info("There was an error creating the list of remote process groups: " + e.getMessage());
    }
  1. 'process-groups/{id}/remote-process-groups' 是 ProcessGroupsAPI 小节的一部分,并且将 return 一个 RemoteProcessGroupsEntity,其中包含与您提交的 ID 的 ProcessGroup 绑定的远程进程组列表.
  2. 'remote-process-groups/{id}' 是 RemoteProcessGroups API 的一部分,并将获取请求的确切 RemoteProcessGroupEntity(注意缺少复数)。

我为 NiFi 维护名义上的 Python 客户端,考虑到您提到的搜索结果,我建议您可以尝试:

import nipyapi
nipyapi.utils.set_endpoint('http://localhost:8080/nifi')
rpg_info = [nipyapi.canvas.get_remote_process_group(rpg.id) for rpg in nipyapi.canvas.list_all_remote_process_groups('root', True)]

RPG 信息 returned 将为您提供 .component.parent_group_id 下的父 ProcessGroup ID,允许您重建树,但您应该会发现它比单独查找每个更高效。