NiFi Rest API 在获取远程进程组详细信息时出现问题
Issue with NiFi Rest API in fetching Remote Process Group details
使用下面提到的 NiFi Rest API 端点和代码片段,
我正在获取远程进程组 (RPG) 列表,迭代并获取每个 RPG 详细信息。问题是,我得到的 RPG 数据不准确。如果我命中此端点 (https://nifihost:8080/nifi-api/remote-process-groups/{id}
),我将收到准确的详细信息。请澄清,
- 为什么这两个结果有出入
终点?
(
https://nifihost:8080/nifi-api/process-groups/{id}/remote-process-groups
比。
https://nifihost:8080/nifi-api/remote-process-groups/{id}
)
- 因为我的要求是遍历每个过程组,得到一个列表
其中的远程进程组 (RPG) 并获取每个 RPG 详细信息?什么是正确的方法
要做到这一点?
端点:
https://nifihost:8080/nifi-api/process-groups/{id}/remote-process-groups
源代码
ArrayList<NifiRemoteProcessGroup> remoteProcessGroupArrayList = new ArrayList<>();
String returnedJSON = "";
String remoteProcessGroupURL = getNifiURL() + "/nifi-api/process-groups/" + processGroup + "/remote-process-groups";
HttpEntity httpEntity = RestCall.oAuthHeaders(token);
RestTemplate restTemplate = new RestTemplate();
try{
ResponseEntity<String> response = restTemplate.exchange(remoteProcessGroupURL,HttpMethod.GET,httpEntity,String.class);
returnedJSON = response.getBody();
}
catch(Exception e){
logger.error("There was an error retrieving the remote-process-groups : " + e.getMessage());
}
try{
ObjectMapper objectMapper = new ObjectMapper();
JsonNode rootNode = objectMapper.readTree(returnedJSON);
JsonNode processorNode = rootNode.path("remoteProcessGroups");
Iterator<JsonNode> elements = processorNode.elements();
while(elements.hasNext()){
JsonNode remoteProcessGroup = elements.next();
JsonNode statusElement = remoteProcessGroup.path("status");
JsonNode bulletinElement = remoteProcessGroup.path("bulletins");
JsonNode componentElement = remoteProcessGroup.path("component");
JsonNode aggregateSnapshot = statusElement.path("aggregateSnapshot");
NifiRemoteProcessGroup remoteProcessGroupInstance = new NifiRemoteProcessGroup();
remoteProcessGroupInstance.setRemoteProcessGroupId(checkExists(statusElement,"id"));
remoteProcessGroupInstance.setRemoteProcessGroupName(checkExists(componentElement,"name"));
remoteProcessGroupInstance.setRemoteProcessGroupGroupId(checkExists(statusElement,"groupId"));
remoteProcessGroupInstance.setRemoteProcessGroupTargetURL(checkExists(componentElement,"targetUri"));
remoteProcessGroupInstance.setRemoteProcessGroupBulletins(bulletinElement.asText());
remoteProcessGroupInstance.setRemoteProcessGroupTransmitting(Boolean.valueOf(checkExists(componentElement,"transmitting")));
remoteProcessGroupInstance.setRemoteProcessGroupTransmissionStatus(checkExists(statusElement,"transmissionStatus"));
remoteProcessGroupInstance.setRemoteProcessGroupActiveThreadCount(Double.valueOf(checkExists(aggregateSnapshot,"activeThreadCount")));
remoteProcessGroupInstance.setRemoteProcessGroupFlowFilesReceived(Double.valueOf(checkExists(aggregateSnapshot,"flowFilesReceived")));
remoteProcessGroupInstance.setRemoteProcessGroupBytesReceived(Double.valueOf(checkExists(aggregateSnapshot,"bytesReceived")));
remoteProcessGroupInstance.setRemoteProcessGroupReceived(checkExists(aggregateSnapshot,"received"));
remoteProcessGroupArrayList.add(remoteProcessGroupInstance);
}
}
catch(Exception e){
logger.info("There was an error creating the list of remote process groups: " + e.getMessage());
}
- 'process-groups/{id}/remote-process-groups' 是 ProcessGroupsAPI 小节的一部分,并且将 return 一个 RemoteProcessGroupsEntity,其中包含与您提交的 ID 的 ProcessGroup 绑定的远程进程组列表.
- 'remote-process-groups/{id}' 是 RemoteProcessGroups API 的一部分,并将获取请求的确切 RemoteProcessGroupEntity(注意缺少复数)。
我为 NiFi 维护名义上的 Python 客户端,考虑到您提到的搜索结果,我建议您可以尝试:
import nipyapi
nipyapi.utils.set_endpoint('http://localhost:8080/nifi')
rpg_info = [nipyapi.canvas.get_remote_process_group(rpg.id) for rpg in nipyapi.canvas.list_all_remote_process_groups('root', True)]
RPG 信息 returned 将为您提供 .component.parent_group_id 下的父 ProcessGroup ID,允许您重建树,但您应该会发现它比单独查找每个更高效。
使用下面提到的 NiFi Rest API 端点和代码片段,
我正在获取远程进程组 (RPG) 列表,迭代并获取每个 RPG 详细信息。问题是,我得到的 RPG 数据不准确。如果我命中此端点 (https://nifihost:8080/nifi-api/remote-process-groups/{id}
),我将收到准确的详细信息。请澄清,
- 为什么这两个结果有出入
终点?
(
https://nifihost:8080/nifi-api/process-groups/{id}/remote-process-groups
比。https://nifihost:8080/nifi-api/remote-process-groups/{id}
) - 因为我的要求是遍历每个过程组,得到一个列表 其中的远程进程组 (RPG) 并获取每个 RPG 详细信息?什么是正确的方法 要做到这一点?
端点:
https://nifihost:8080/nifi-api/process-groups/{id}/remote-process-groups
源代码
ArrayList<NifiRemoteProcessGroup> remoteProcessGroupArrayList = new ArrayList<>();
String returnedJSON = "";
String remoteProcessGroupURL = getNifiURL() + "/nifi-api/process-groups/" + processGroup + "/remote-process-groups";
HttpEntity httpEntity = RestCall.oAuthHeaders(token);
RestTemplate restTemplate = new RestTemplate();
try{
ResponseEntity<String> response = restTemplate.exchange(remoteProcessGroupURL,HttpMethod.GET,httpEntity,String.class);
returnedJSON = response.getBody();
}
catch(Exception e){
logger.error("There was an error retrieving the remote-process-groups : " + e.getMessage());
}
try{
ObjectMapper objectMapper = new ObjectMapper();
JsonNode rootNode = objectMapper.readTree(returnedJSON);
JsonNode processorNode = rootNode.path("remoteProcessGroups");
Iterator<JsonNode> elements = processorNode.elements();
while(elements.hasNext()){
JsonNode remoteProcessGroup = elements.next();
JsonNode statusElement = remoteProcessGroup.path("status");
JsonNode bulletinElement = remoteProcessGroup.path("bulletins");
JsonNode componentElement = remoteProcessGroup.path("component");
JsonNode aggregateSnapshot = statusElement.path("aggregateSnapshot");
NifiRemoteProcessGroup remoteProcessGroupInstance = new NifiRemoteProcessGroup();
remoteProcessGroupInstance.setRemoteProcessGroupId(checkExists(statusElement,"id"));
remoteProcessGroupInstance.setRemoteProcessGroupName(checkExists(componentElement,"name"));
remoteProcessGroupInstance.setRemoteProcessGroupGroupId(checkExists(statusElement,"groupId"));
remoteProcessGroupInstance.setRemoteProcessGroupTargetURL(checkExists(componentElement,"targetUri"));
remoteProcessGroupInstance.setRemoteProcessGroupBulletins(bulletinElement.asText());
remoteProcessGroupInstance.setRemoteProcessGroupTransmitting(Boolean.valueOf(checkExists(componentElement,"transmitting")));
remoteProcessGroupInstance.setRemoteProcessGroupTransmissionStatus(checkExists(statusElement,"transmissionStatus"));
remoteProcessGroupInstance.setRemoteProcessGroupActiveThreadCount(Double.valueOf(checkExists(aggregateSnapshot,"activeThreadCount")));
remoteProcessGroupInstance.setRemoteProcessGroupFlowFilesReceived(Double.valueOf(checkExists(aggregateSnapshot,"flowFilesReceived")));
remoteProcessGroupInstance.setRemoteProcessGroupBytesReceived(Double.valueOf(checkExists(aggregateSnapshot,"bytesReceived")));
remoteProcessGroupInstance.setRemoteProcessGroupReceived(checkExists(aggregateSnapshot,"received"));
remoteProcessGroupArrayList.add(remoteProcessGroupInstance);
}
}
catch(Exception e){
logger.info("There was an error creating the list of remote process groups: " + e.getMessage());
}
- 'process-groups/{id}/remote-process-groups' 是 ProcessGroupsAPI 小节的一部分,并且将 return 一个 RemoteProcessGroupsEntity,其中包含与您提交的 ID 的 ProcessGroup 绑定的远程进程组列表.
- 'remote-process-groups/{id}' 是 RemoteProcessGroups API 的一部分,并将获取请求的确切 RemoteProcessGroupEntity(注意缺少复数)。
我为 NiFi 维护名义上的 Python 客户端,考虑到您提到的搜索结果,我建议您可以尝试:
import nipyapi
nipyapi.utils.set_endpoint('http://localhost:8080/nifi')
rpg_info = [nipyapi.canvas.get_remote_process_group(rpg.id) for rpg in nipyapi.canvas.list_all_remote_process_groups('root', True)]
RPG 信息 returned 将为您提供 .component.parent_group_id 下的父 ProcessGroup ID,允许您重建树,但您应该会发现它比单独查找每个更高效。