这是从 Concurrent Hash Map 中提取计数而不遗漏某些计数或重复计数的正确方法吗?
Is this the correct way to extract counts from a Concurrent Hash Map without missing some or double counting?
我正在做一些事情,我试图计算某件事发生的次数。我不是用数百万次调用向数据库发送垃圾邮件,而是尝试对内存中的更新求和,然后每秒将结果转储到数据库中一次(就像将 10 +1 变成单个 +10)
我注意到一些奇怪的计数不一致(比如应该有恰好 100 万笔交易,但实际上有 1,000,016 笔或类似的东西)。
我正在调查其他可能的原因,但我想检查一下这是否是正确的处理方式。用例是它需要最终是正确的,所以只要计数没有被重复计算或丢弃就可以了。
这是我的示例实现。
public class Aggregator {
private Map<String, LongAdder> transactionsPerUser = new ConcurrentHashMap<>();
private StatisticsDAO statisticsDAO;
public Aggregator(StatisticsDAO statisticsDAO) {
this.statisticsDAO = statisticsDAO;
}
public void incrementCount(String userId) {
transactionsPerId.computeIfAbsent(userId, k -> new LongAdder()).increment();
}
@Scheduled(every = "1s")
public void sendAggregatedStatisticsToDatabase() {
for (String userId : transactionsPerUser.keySet()) {
long count = transactionsPerUser.remove(userId).sum();
statisticsDAO.updateCount(userId, count);
}
}
}
在以下情况下您将删除更新:
- 线程 A 调用 incrementCount,并为给定的 userId 找到一个已经存在的 LongAdder 实例,该实例是从 computeIfAbsent 返回的。
- 线程 B 同时处理 sendAggregatedStatisticsToDatabase 调用,该调用从映射中删除该 LongAdder 实例。
- 线程 B 在 LongAdder 实例上调用 sum()。
- 线程 A,仍在执行相同的 incrementCount 调用,现在在 LongAdder 实例上调用 increment()。
此更新现已删除。它不会在下一次调用 sendAggregatedStatisticsToDatabase 时看到,因为 increment() 调用发生在一个实例上,该实例在 incrementCount 方法中调用 computeIfAbsent() 和 increment() 之间从映射中删除。
您最好通过在 sendAggregatedStatisticsToDatabase 中执行类似的操作来重用 LongAdder 实例:
LongAdder longAdder = transactionsPerUser.get(userId);
long count = longAdder.sum();
longAdder.add(-count);
同意@NorthernSky 的回答。我的回答应该被视为问题的替代解决方案。专门解决对已接受答案的评论,称正确且高效的解决方案会更加复杂。
我建议在这里使用 producer/consumer 模式,使用无界阻塞队列。生产者调用 incrementCount()
只是将 userId 添加到队列中。
消费者被安排为每秒 运行 并将队列读入 HashMap
,然后将地图的数据推送到 DAO。
public class Aggregator {
private final Queue<String> queue = new LinkedBlockingQueue<>();
private final StatisticsDao statisticsDAO;
public Aggregator(StatisticsDao statisticsDAO) {
this.statisticsDAO = statisticsDAO;
}
public void incrementCount(String userId) {
queue.add(userId);
}
@Scheduled(every = "1s")
public void sendAggregatedStatisticsToDatabase() {
int size = queue.size();
HashMap<String, LongAdder> counts = new HashMap<>();
for (int i = 0; i < size; i++) {
counts.computeIfAbsent(queue.remove(), k -> new LongAdder()).increment();
}
counts.forEach((userId, adder) -> statisticsDAO.updateCount(userId, adder.sum()));
}
}
更好的办法是没有预定的消费者,而是不断从队列读取到本地 HashMap
直到发生超时或达到大小阈值,甚至当队列为空时.
然后它将处理当前地图并将其完全推入 DAO,清除地图并再次开始读取队列,直到下一次有足够的数据来处理。
我正在做一些事情,我试图计算某件事发生的次数。我不是用数百万次调用向数据库发送垃圾邮件,而是尝试对内存中的更新求和,然后每秒将结果转储到数据库中一次(就像将 10 +1 变成单个 +10)
我注意到一些奇怪的计数不一致(比如应该有恰好 100 万笔交易,但实际上有 1,000,016 笔或类似的东西)。
我正在调查其他可能的原因,但我想检查一下这是否是正确的处理方式。用例是它需要最终是正确的,所以只要计数没有被重复计算或丢弃就可以了。
这是我的示例实现。
public class Aggregator {
private Map<String, LongAdder> transactionsPerUser = new ConcurrentHashMap<>();
private StatisticsDAO statisticsDAO;
public Aggregator(StatisticsDAO statisticsDAO) {
this.statisticsDAO = statisticsDAO;
}
public void incrementCount(String userId) {
transactionsPerId.computeIfAbsent(userId, k -> new LongAdder()).increment();
}
@Scheduled(every = "1s")
public void sendAggregatedStatisticsToDatabase() {
for (String userId : transactionsPerUser.keySet()) {
long count = transactionsPerUser.remove(userId).sum();
statisticsDAO.updateCount(userId, count);
}
}
}
在以下情况下您将删除更新:
- 线程 A 调用 incrementCount,并为给定的 userId 找到一个已经存在的 LongAdder 实例,该实例是从 computeIfAbsent 返回的。
- 线程 B 同时处理 sendAggregatedStatisticsToDatabase 调用,该调用从映射中删除该 LongAdder 实例。
- 线程 B 在 LongAdder 实例上调用 sum()。
- 线程 A,仍在执行相同的 incrementCount 调用,现在在 LongAdder 实例上调用 increment()。
此更新现已删除。它不会在下一次调用 sendAggregatedStatisticsToDatabase 时看到,因为 increment() 调用发生在一个实例上,该实例在 incrementCount 方法中调用 computeIfAbsent() 和 increment() 之间从映射中删除。
您最好通过在 sendAggregatedStatisticsToDatabase 中执行类似的操作来重用 LongAdder 实例:
LongAdder longAdder = transactionsPerUser.get(userId);
long count = longAdder.sum();
longAdder.add(-count);
同意@NorthernSky 的回答。我的回答应该被视为问题的替代解决方案。专门解决对已接受答案的评论,称正确且高效的解决方案会更加复杂。
我建议在这里使用 producer/consumer 模式,使用无界阻塞队列。生产者调用 incrementCount()
只是将 userId 添加到队列中。
消费者被安排为每秒 运行 并将队列读入 HashMap
,然后将地图的数据推送到 DAO。
public class Aggregator {
private final Queue<String> queue = new LinkedBlockingQueue<>();
private final StatisticsDao statisticsDAO;
public Aggregator(StatisticsDao statisticsDAO) {
this.statisticsDAO = statisticsDAO;
}
public void incrementCount(String userId) {
queue.add(userId);
}
@Scheduled(every = "1s")
public void sendAggregatedStatisticsToDatabase() {
int size = queue.size();
HashMap<String, LongAdder> counts = new HashMap<>();
for (int i = 0; i < size; i++) {
counts.computeIfAbsent(queue.remove(), k -> new LongAdder()).increment();
}
counts.forEach((userId, adder) -> statisticsDAO.updateCount(userId, adder.sum()));
}
}
更好的办法是没有预定的消费者,而是不断从队列读取到本地 HashMap
直到发生超时或达到大小阈值,甚至当队列为空时.
然后它将处理当前地图并将其完全推入 DAO,清除地图并再次开始读取队列,直到下一次有足够的数据来处理。