存储自定义 NiFi 处理器所需的数据

Store data required by a custom NiFi processor

HDP-2.5.3.0, NiFi 1.1.1

我正在用 NiFi 编写自定义处理器。我需要将几个字符串和时间戳字段存储在某个地方,以便在 all/any 个节点上可用。

@Tags({ "example" })
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({ @ReadsAttribute(attribute = "", description = "") })
@WritesAttributes({ @WritesAttribute(attribute = "", description = "") })
public class MyProcessor extends AbstractProcessor {
.
.
.
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;

 /* Persist these, probably, in ZK */
private Timestamp lastRunAt;
private String startPoint;
.
.
.

@Override
public void onTrigger(final ProcessContext context,final ProcessSession session) throws ProcessException {FlowFile flowFile = session.get();

/*Retrieve lastRunAt & startPoint and use*/
lastRunAt ;
startPoint ;
.
.
.
}
}

请注意,HDFS 不是一个选项,因为 NiFi 可能 运行 图片中没有任何 Hadoop 安装。

执行此操作的选项是什么 - 我想知道是否可以使用 Zookeeper 来存储此数据,因为它的大小很小并且 NiFi 由 ZK 支持。我试图找到使用 Zookeeper API 来持久化这些字段的方法,但没有成功。

NiFi 公开了一个称为 "state manager" 的概念,供处理器存储这样的信息。当 运行 独立的 NiFi 有一个本地状态管理器,当 运行 集群时有一个 ZooKeeper 状态管理器。

在此处查看开发人员指南:

https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#state_manager

此外,NiFi 中的许多源处理器都使用了它,因此您可以在代码中查找示例:

https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java#L249

状态提供者配置管理指南:

https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#state_management