这是从 Concurrent Hash Map 中提取计数而不遗漏某些计数或重复计数的正确方法吗?

Is this the correct way to extract counts from a Concurrent Hash Map without missing some or double counting?

我正在做一些事情,我试图计算某件事发生的次数。我不是用数百万次调用向数据库发送垃圾邮件,而是尝试对内存中的更新求和,然后每秒将结果转储到数据库中一次(就像将 10 +1 变成单个 +10)

我注意到一些奇怪的计数不一致(比如应该有恰好 100 万笔交易,但实际上有 1,000,016 笔或类似的东西)。

我正在调查其他可能的原因,但我想检查一下这是否是正确的处理方式。用例是它需要最终是正确的,所以只要计数没有被重复计算或丢弃就可以了。

这是我的示例实现。

public class Aggregator {
    private Map<String, LongAdder> transactionsPerUser = new ConcurrentHashMap<>();
    private StatisticsDAO statisticsDAO;

    public Aggregator(StatisticsDAO statisticsDAO) {
        this.statisticsDAO = statisticsDAO;
    }

    public void incrementCount(String userId) {
        transactionsPerId.computeIfAbsent(userId, k -> new LongAdder()).increment();
    }

    @Scheduled(every = "1s")
    public void sendAggregatedStatisticsToDatabase() {
        for (String userId : transactionsPerUser.keySet()) {
            long count = transactionsPerUser.remove(userId).sum();
            statisticsDAO.updateCount(userId, count);
        }
    }
}

在以下情况下您将删除更新:

  • 线程 A 调用 incrementCount,并为给定的 userId 找到一个已经存在的 LongAdder 实例,该实例是从 computeIfAbsent 返回的。
  • 线程 B 同时处理 sendAggregatedStatisticsToDatabase 调用,该调用从映射中删除该 LongAdder 实例。
  • 线程 B 在 LongAdder 实例上调用 sum()。
  • 线程 A,仍在执行相同的 incrementCount 调用,现在在 LongAdder 实例上调用 increment()。

此更新现已删除。它不会在下一次调用 sendAggregatedStatisticsToDatabase 时看到,因为 increment() 调用发生在一个实例上,该实例在 incrementCount 方法中调用 computeIfAbsent() 和 increment() 之间从映射中删除。

您最好通过在 sendAggregatedStatisticsToDatabase 中执行类似的操作来重用 LongAdder 实例:

        LongAdder longAdder = transactionsPerUser.get(userId);
        long count = longAdder.sum();
        longAdder.add(-count);

同意@NorthernSky 的回答。我的回答应该被视为问题的替代解决方案。专门解决对已接受答案的评论,称正确且高效的解决方案会更加复杂。

我建议在这里使用 producer/consumer 模式,使用无界阻塞队列。生产者调用 incrementCount() 只是将 userId 添加到队列中。

消费者被安排为每秒 运行 并将队列读入 HashMap,然后将地图的数据推送到 DAO。

public class Aggregator {
    private final Queue<String> queue = new LinkedBlockingQueue<>();
    private final StatisticsDao statisticsDAO;

    public Aggregator(StatisticsDao statisticsDAO) {
        this.statisticsDAO = statisticsDAO;
    }

    public void incrementCount(String userId) {
        queue.add(userId);
    }

    @Scheduled(every = "1s")
    public void sendAggregatedStatisticsToDatabase() {
        int size = queue.size();
        HashMap<String, LongAdder> counts = new HashMap<>();
        for (int i = 0; i < size; i++) {
            counts.computeIfAbsent(queue.remove(), k -> new LongAdder()).increment();
        }
        counts.forEach((userId, adder) -> statisticsDAO.updateCount(userId, adder.sum()));
    }
}

更好的办法是没有预定的消费者,而是不断从队列读取到本地 HashMap 直到发生超时或达到大小阈值,甚至当队列为空时. 然后它将处理当前地图并将其完全推入 DAO,清除地图并再次开始读取队列,直到下一次有足够的数据来处理。