跨同一处理器不同实例的 StateMap 键

StateMap keys across different instances of the same processor

Nifi 1.2.0.

在自定义处理器中,LSN 用于从 SQL 服务器数据库 table 获取数据。

以下是用于以下用途的代码片段:

存储键值对

final StateManager stateManager = context.getStateManager();
try {
StateMap stateMap = stateManager.getState(Scope.CLUSTER);
final Map<String, String> newStateMapProperties = new HashMap<>();

String lsnUsedDuringLastLoadStr = Base64.getEncoder().encodeToString(lsnUsedDuringLastLoad);
//Just a constant String used as key
newStateMapProperties.put(ProcessorConstants.LAST_MAX_LSN, lsnUsedDuringLastLoadStr);


if (stateMap.getVersion() == -1) {
stateManager.setState(newStateMapProperties, Scope.CLUSTER);
} else {
stateManager.replace(stateMap, newStateMapProperties, Scope.CLUSTER);
}
}

正在检索键值对

final StateManager stateManager = context.getStateManager();
final StateMap stateMap;
final Map<String, String> stateMapProperties;
byte[] lastMaxLSN = null;
try {
stateMap = stateManager.getState(Scope.CLUSTER);
stateMapProperties = new HashMap<>(stateMap.toMap());

lastMaxLSN = (stateMapProperties.get(ProcessorConstants.LAST_MAX_LSN) == null
|| stateMapProperties.get(ProcessorConstants.LAST_MAX_LSN).isEmpty()) ? null
: Base64.getDecoder()
.decode(stateMapProperties.get(ProcessorConstants.LAST_MAX_LSN).getBytes());
}

当此处理器的单个实例为 运行 时,LSN 被正确存储和检索,并且从 SQL 服务器 tables 获取数据的逻辑工作正常。

根据 NiFi 文档。关于状态管理:

Storing and Retrieving State State is stored using the StateManager’s getState, setState, replace, and clear methods. All of these methods require that a Scope be provided. It should be noted that the state that is stored with the Local scope is entirely different than state stored with a Cluster scope. If a Processor stores a value with the key of My Key using the Scope.CLUSTER scope, and then attempts to retrieve the value using the Scope.LOCAL scope, the value retrieved will be null (unless a value was also stored with the same key using the Scope.CLUSTER scope). Each Processor’s state, is stored in isolation from other Processors' state.

当此处理器的两个实例 运行 时,只有一个能够获取数据。这导致了以下问题:

StateMap 是一个 'global map',它必须在同一处理器的实例和不同处理器的实例之间具有 唯一键吗? 简单来说,每当一个处理器在状态图中放置一个密钥,该密钥在 NiFi 处理器(以及其他使用状态 API 的服务,如果有的话)中应该是唯一的?如果是,谁能建议我应该在我的案例中使用什么唯一密钥?

注意:我快速浏览了标准MySQL CDC处理器代码class(CaptureChangeMySQL.java),它有类似的逻辑存储和检索状态,但我是不是忽略了什么?

处理器的 StateMap 存储在组件的 ID 下,因此如果您有两个相同类型处理器的实例(意味着您可以在 canvas 上看到两个处理器),您将拥有一些东西喜欢:

/components/1111-1111-1111-1111 -> serialized state map
/components/2222-2222-2222-2222 -> serialized state map

假设 1111-1111-1111-1111 是处理器 1 的 UUID,2222-2222-22222-2222 是处理器 2 的 UUID。因此 StateMap 中的键不必在所有实例中都是唯一的因为它们是按组件 ID 划分的。

在集群中,每个组件的组件id在所有节点上都是相同的。因此,如果您有一个 3 节点集群并且处理器 1 的 ID 为 1111-1111-1111-1111,那么每个节点上都有一个具有该 ID 的处理器。

如果该处理器被安排在所有节点上 运行 并存储集群状态,那么处理器的所有三个实例将更新集群状态提供程序 (ZooKeeper) 中的同一个 StateMap。