ConcurrentSkipListMap 如何使删除和添加调用原子化
ConcurrentSkipListMap how to make remove and add calls atomic
我有 N 个添加值的线程和一个删除线程。我正在考虑如何同步添加到现有值列表和删除列表的最佳方法。
我猜可能是以下情况:
thread 1 checked condition containsKey, and entered in else block
thread 2 removed the value
thread 1 try to add value to existing list, and get returns null
我认为我唯一可以使用的方法是通过地图值同步,在我们的例子中是添加和删除时的列表
private ConcurrentSkipListMap<LocalDateTime, List<Task>> tasks = new ConcurrentSkipListMap<>();
//Thread1,3...N
public void add(LocalDateTime time, Task task) {
if (!tasks.containsKey(time)) {
tasks.computeIfAbsent(time, k -> createValue(task));
} else {
//potentially should be synced
tasks.get(time).add(task);
}
}
private List<Task> createValue(Task val) {
return new ArrayList<>(Arrays.asList(val));
}
//thread 2
public void remove()
while(true){
Map.Entry<LocalDateTime, List<Task>> keyVal = tasks.firstEntry();
if (isSomeCondition(keyVal)) {
tasks.remove(keyVal.getKey());
for (Task t : keyVal.getValue()) {
//do task processing
}
}
}
}
关于 add
部分,您 会 真的倾向于使用 merge
,但文档对此非常清楚 - 说它不是保证以原子方式发生。
我会用 merge
替换你的 add
,但是 处于锁定状态 。
SomeLock lock ...
public void add(LocalDateTime time, Task task) {
lock.lock();
tasks.merge...
lock.unlock();
}
remove
方法也一样。但是,如果你在锁定下做事,那么首先就不需要 ConcurrentSkipListMap
。
另一方面,如果您可以更改为 ConcurrentHashMap
- 例如,它有 merge
是原子的。
不完全清楚您的 remove()
方法应该做什么。在目前的形式下,它是一个无限循环,首先,它会遍历头部元素并删除它们,直到头部元素不满足条件,然后,它会反复轮询该头部元素并重新评估条件.除非,它设法删除所有元素,在这种情况下,它会异常退出。
如果你想处理当前在map中的所有元素,你可以简单地遍历它,弱一致性迭代器允许你边修改边处理;您可能会注意到正在进行的并发更新。
如果你只想处理匹配的头元素,你必须插入一个条件,return给调用者或者让线程进入睡眠状态(或者最好添加一个通知机制),以避免用重复的失败测试燃烧 CPU(甚至在地图为空时抛出)。
除此之外,在确保函数之间没有干扰的情况下,可以使用ConcurrentSkipListMap
实现操作。假设 remove
应该处理所有当前元素一次,实现可能看起来像
public void add(LocalDateTime time, Task task) {
tasks.merge(time, Collections.singletonList(task),
(l1,l2) -> Stream.concat(l1.stream(),l2.stream()).collect(Collectors.toList()));
}
public void remove() {
for(Map.Entry<LocalDateTime, List<Task>> keyVal : tasks.entrySet()) {
final List<Task> values = keyVal.getValue();
if(isSomeCondition(keyVal) && tasks.remove(keyVal.getKey(), values)) {
for (Task t : values) {
//do task processing
}
}
}
}
关键是地图中包含的列表永远不会被修改。如果没有先前的映射,merge(time, Collections.singletonList(task), …
操作甚至会存储单个任务的不可变列表。如果有以前的任务,合并函数 (l1,l2) -> Stream.concat(l1.stream(),l2.stream()).collect(Collectors.toList())
将创建一个新列表而不是修改现有的列表。当列表变得更大时,这可能会对性能产生影响,尤其是在竞争的情况下必须重复操作时,但这是不需要锁定和额外同步的代价。
remove
操作使用 remove(key, value)
方法,只有当地图的值仍然匹配预期的值时才会成功。这依赖于这样一个事实,即我们的方法都不会修改映射中包含的列表,而是在合并时用新的列表实例替换它们。如果remove(key, value)
成功,则可以处理该列表;此时,它不再包含在地图中。请注意,在 isSomeCondition(keyVal)
的评估期间,列表仍包含在地图中,因此,isSomeCondition(keyVal)
不得 修改它,但我认为这应该无论如何,像 isSomeCondition
这样的测试方法就是这种情况。当然,在 isSomeCondition
中评估列表也依赖于其他从不修改列表的方法。
我有 N 个添加值的线程和一个删除线程。我正在考虑如何同步添加到现有值列表和删除列表的最佳方法。
我猜可能是以下情况:
thread 1 checked condition containsKey, and entered in else block
thread 2 removed the value
thread 1 try to add value to existing list, and get returns null
我认为我唯一可以使用的方法是通过地图值同步,在我们的例子中是添加和删除时的列表
private ConcurrentSkipListMap<LocalDateTime, List<Task>> tasks = new ConcurrentSkipListMap<>();
//Thread1,3...N
public void add(LocalDateTime time, Task task) {
if (!tasks.containsKey(time)) {
tasks.computeIfAbsent(time, k -> createValue(task));
} else {
//potentially should be synced
tasks.get(time).add(task);
}
}
private List<Task> createValue(Task val) {
return new ArrayList<>(Arrays.asList(val));
}
//thread 2
public void remove()
while(true){
Map.Entry<LocalDateTime, List<Task>> keyVal = tasks.firstEntry();
if (isSomeCondition(keyVal)) {
tasks.remove(keyVal.getKey());
for (Task t : keyVal.getValue()) {
//do task processing
}
}
}
}
关于 add
部分,您 会 真的倾向于使用 merge
,但文档对此非常清楚 - 说它不是保证以原子方式发生。
我会用 merge
替换你的 add
,但是 处于锁定状态 。
SomeLock lock ...
public void add(LocalDateTime time, Task task) {
lock.lock();
tasks.merge...
lock.unlock();
}
remove
方法也一样。但是,如果你在锁定下做事,那么首先就不需要 ConcurrentSkipListMap
。
另一方面,如果您可以更改为 ConcurrentHashMap
- 例如,它有 merge
是原子的。
不完全清楚您的 remove()
方法应该做什么。在目前的形式下,它是一个无限循环,首先,它会遍历头部元素并删除它们,直到头部元素不满足条件,然后,它会反复轮询该头部元素并重新评估条件.除非,它设法删除所有元素,在这种情况下,它会异常退出。
如果你想处理当前在map中的所有元素,你可以简单地遍历它,弱一致性迭代器允许你边修改边处理;您可能会注意到正在进行的并发更新。
如果你只想处理匹配的头元素,你必须插入一个条件,return给调用者或者让线程进入睡眠状态(或者最好添加一个通知机制),以避免用重复的失败测试燃烧 CPU(甚至在地图为空时抛出)。
除此之外,在确保函数之间没有干扰的情况下,可以使用ConcurrentSkipListMap
实现操作。假设 remove
应该处理所有当前元素一次,实现可能看起来像
public void add(LocalDateTime time, Task task) {
tasks.merge(time, Collections.singletonList(task),
(l1,l2) -> Stream.concat(l1.stream(),l2.stream()).collect(Collectors.toList()));
}
public void remove() {
for(Map.Entry<LocalDateTime, List<Task>> keyVal : tasks.entrySet()) {
final List<Task> values = keyVal.getValue();
if(isSomeCondition(keyVal) && tasks.remove(keyVal.getKey(), values)) {
for (Task t : values) {
//do task processing
}
}
}
}
关键是地图中包含的列表永远不会被修改。如果没有先前的映射,merge(time, Collections.singletonList(task), …
操作甚至会存储单个任务的不可变列表。如果有以前的任务,合并函数 (l1,l2) -> Stream.concat(l1.stream(),l2.stream()).collect(Collectors.toList())
将创建一个新列表而不是修改现有的列表。当列表变得更大时,这可能会对性能产生影响,尤其是在竞争的情况下必须重复操作时,但这是不需要锁定和额外同步的代价。
remove
操作使用 remove(key, value)
方法,只有当地图的值仍然匹配预期的值时才会成功。这依赖于这样一个事实,即我们的方法都不会修改映射中包含的列表,而是在合并时用新的列表实例替换它们。如果remove(key, value)
成功,则可以处理该列表;此时,它不再包含在地图中。请注意,在 isSomeCondition(keyVal)
的评估期间,列表仍包含在地图中,因此,isSomeCondition(keyVal)
不得 修改它,但我认为这应该无论如何,像 isSomeCondition
这样的测试方法就是这种情况。当然,在 isSomeCondition
中评估列表也依赖于其他从不修改列表的方法。