使用 Ignite 集群中另一个缓存中的数据丰富缓存中的每个现有值
Enrich each existing value in a cache with the data from another cache in an Ignite cluster
用同一集群中另一个缓存中的数据以最高效的方式更新 Ignite 缓存中每个现有值的字段的最佳方法是什么(数千万条记录,每条记录约 1 KB)?
伪代码:
try (mappings = getCache("mappings")) {
try (entities = getCache("entities")) {
entities.foreach((key, entity) -> entity.setInternalId(mappings.getValue(entity.getExternalId());
}
}
我建议使用计算并向缓存拓扑中的所有节点发送一个闭包。然后,在每个节点上,您将遍历本地主集并进行更新。即使使用这种方法,您仍然最好将更新分批处理并通过 putAll 调用(或者可能使用 IgniteDataStreamer)发布它们。
注意:对于下面的示例,重要的是 "mappings" 和 "entities" 缓存中的键是相同的或位于同一位置。更多关于搭配的信息在这里:
https://apacheignite.readme.io/docs/affinity-collocation
伪代码看起来像这样:
ClusterGroup cacheNodes = ignite.cluster().forCache("mappings");
IgniteCompute compute = ignite.compute(cacheNodes.nodes());
compute.broadcast(() -> {
IgniteCache<> mappings = getCache("mappings");
IgniteCache<> entities = getCache("entities");
// Iterate over local primary entries.
entities.localEntries(CachePeekMode.PRIMARY).forEach((entry) -> {
V1 mappingVal = mappings.get(entry.getKey());
V2 entityVal = entry.getValue();
V2 newEntityVal = // do enrichment;
// It would be better to create a batch, and then call putAll(...)
// Using simple put call for simplicity.
entities.put(entry.getKey(), newEntityVal);
}
});
用同一集群中另一个缓存中的数据以最高效的方式更新 Ignite 缓存中每个现有值的字段的最佳方法是什么(数千万条记录,每条记录约 1 KB)?
伪代码:
try (mappings = getCache("mappings")) {
try (entities = getCache("entities")) {
entities.foreach((key, entity) -> entity.setInternalId(mappings.getValue(entity.getExternalId());
}
}
我建议使用计算并向缓存拓扑中的所有节点发送一个闭包。然后,在每个节点上,您将遍历本地主集并进行更新。即使使用这种方法,您仍然最好将更新分批处理并通过 putAll 调用(或者可能使用 IgniteDataStreamer)发布它们。
注意:对于下面的示例,重要的是 "mappings" 和 "entities" 缓存中的键是相同的或位于同一位置。更多关于搭配的信息在这里: https://apacheignite.readme.io/docs/affinity-collocation
伪代码看起来像这样:
ClusterGroup cacheNodes = ignite.cluster().forCache("mappings");
IgniteCompute compute = ignite.compute(cacheNodes.nodes());
compute.broadcast(() -> {
IgniteCache<> mappings = getCache("mappings");
IgniteCache<> entities = getCache("entities");
// Iterate over local primary entries.
entities.localEntries(CachePeekMode.PRIMARY).forEach((entry) -> {
V1 mappingVal = mappings.get(entry.getKey());
V2 entityVal = entry.getValue();
V2 newEntityVal = // do enrichment;
// It would be better to create a batch, and then call putAll(...)
// Using simple put call for simplicity.
entities.put(entry.getKey(), newEntityVal);
}
});