使用 Ignite 集群中另一个缓存中的数据丰富缓存中的每个现有值

Enrich each existing value in a cache with the data from another cache in an Ignite cluster

用同一集群中另一个缓存中的数据以最高效的方式更新 Ignite 缓存中每个现有值的字段的最佳方法是什么(数千万条记录,每条记录约 1 KB)?

伪代码:

try (mappings = getCache("mappings")) {
    try (entities = getCache("entities")) {
        entities.foreach((key, entity) -> entity.setInternalId(mappings.getValue(entity.getExternalId());
    }
}

我建议使用计算并向缓存拓扑中的所有节点发送一个闭包。然后,在每个节点上,您将遍历本地主集并进行更新。即使使用这种方法,您仍然最好将更新分批处理并通过 putAll 调用(或者可能使用 IgniteDataStreamer)发布它们。

注意:对于下面的示例,重要的是 "mappings" 和 "entities" 缓存中的键是相同的或位于同一位置。更多关于搭配的信息在这里: https://apacheignite.readme.io/docs/affinity-collocation

伪代码看起来像这样:

ClusterGroup cacheNodes = ignite.cluster().forCache("mappings");

IgniteCompute compute = ignite.compute(cacheNodes.nodes());

compute.broadcast(() -> {
    IgniteCache<> mappings = getCache("mappings");
    IgniteCache<> entities = getCache("entities");

    // Iterate over local primary entries.
    entities.localEntries(CachePeekMode.PRIMARY).forEach((entry) -> {
       V1 mappingVal = mappings.get(entry.getKey());
       V2 entityVal = entry.getValue();

       V2 newEntityVal = // do enrichment;

       // It would be better to create a batch, and then call putAll(...)
       // Using simple put call for simplicity.
       entities.put(entry.getKey(), newEntityVal);
    }
});