如何从不同的线程将条目填充到地图中,然后从单个后台线程迭代地图并发送?
How to populate entries into a map from a different thread and then from a single background thread iterate the map and send?
我有一个下面的 class,其中我有一个 add
方法,该方法被另一个线程调用以填充我的 clientidToTimestampHolder
多图。然后在下面的 class 中,我启动了一个每 60 秒运行一次的后台线程,并调用一个 processData()
方法来迭代同一个地图并将所有这些数据发送到其他服务。
public class Handler {
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
private final Multimap<String, Long> clientidToTimestampHolder = ArrayListMultimap.create();
private static class Holder {
private static final Handler INSTANCE = new Handler();
}
public static Handler getInstance() {
return Holder.INSTANCE;
}
private Handler() {
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
processData();
}
}, 0, 60, TimeUnit.SECONDS);
}
// called by another thread to populate clientidToTimestampHolder map
public void add(final String clientid, final Long timestamp) {
clientidToTimestampHolder.put(clientid, timestamp);
}
// called by background thread
public void processData() {
for (Entry<String, Collection<Long>> entry : clientidToTimestampHolder.asMap().entrySet()) {
String clientid = entry.getKey();
Collection<Long> timestamps = entry.getValue();
for (long timestamp : timestamps) {
boolean isUpdated = isUpdatedClient(clientid, timestamp);
if (!isUpdated) {
updateClient(String.valueOf(clientid));
}
}
}
}
}
我的问题是,add
方法每次都会从不同的线程不断被调用。那么我是否需要创建 clientidToTimestampHolder
地图的副本并将该副本作为参数传递给 processData()
方法而不是直接在该地图上工作?
因为现在我正在使用相同的地图在其中填充数据,然后还迭代相同的地图以将内容发送到其他服务,所以我不会从该地图中删除数据,因此这些条目将始终存在那张地图。
解决这个问题的最佳方法是什么?而且我需要确保它是线程安全的并且没有竞争条件,因为我不能丢失任何 clientid
.
更新
那么我的 processData
方法将如下所示?
public void processData() {
synchronized (clientidToTimestampHolder) {
Iterator<Map.Entry<String, Long>> i = clientidToTimestampHolder.entries().iterator();
while (i.hasNext()) {
String clientid = i.next().getKey();
long timestamp = i.next().getValue();
boolean isUpdated = isUpdatedClient(clientid, timestamp);
if (!isUpdated) {
updateClient(clientid);
}
i.remove();
}
}
}
使用 Multimaps.synchronized(List)Multimap
包装器对多重映射进行线程安全引用(ArrayListMultimap
是一个 ListMultimap
,即将值存储在列表中):
private final ListMultimap<String, Long> clientidToTimestampHolder =
Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
请注意同步多图包装器有以下警告:
It is imperative that the user manually synchronize on the returned multimap when accessing any of its collection views:
// ...
Failure to follow this advice may result in non-deterministic behavior.
在您的情况下,您必须手动同步条目视图的迭代,因为它的迭代器未同步:
public void processData() {
synchronized (clientidToTimestampHolder) {
for (Map.Entry<String, Long> entry : clientidToTimestampHolder.entries()) {
String clientid = entry.getKey();
long timestamp = entry.getValue();
boolean isUpdated = isUpdatedClient(clientid, timestamp);
if (!isUpdated) {
updateClient(String.valueOf(clientid));
}
}
clientidToTimestampHolder.clear();
}
}
(我使用 Mutlimap.entries()
而不是 Multimap.asMap().entrySet()
因为这样更干净)。
此外,如果您想知道为什么没有通用 ConcurrentXxxMultimap
实现,请参阅 Guava's issue #135 and this comment quoting internal discussion about this:
I tried to build a general-purpose concurrent multimap, and it turned
out to be slightly faster in a small fraction of uses and Much slower
in most uses (compared to a synchronized multimap). I was focused on
making as many operations as possible atomic; a weaker contract would
eliminate some of this slowness, but would also detract from its
usefulness.
I believe the Multimap interface is too "large" to support an
efficient concurrent implementation - sorted or otherwise. (Clearly,
this is an overstatement, but at the very least it requires either a
lot of work or a loosening of the Multimap interface.)
编辑:
阅读您的评论,似乎是 XY Problem to me. Having that said, IMO you shouldn't use Multimap
here as you don't use any of its features, but rather take BlockingQueue
which has a handy drainTo(Collection)
方法(并且是线程安全的):
private final LinkedBlockingQueue<Map.Entry<String, Long>> clientidToTimestampHolder =
new LinkedBlockingQueue<>();
public void add(final String clientid, final Long timestamp) {
clientidToTimestampHolder.offer(Maps.immutableEntry(clientid, timestamp));
}
public void processData() {
final List<Map.Entry<String, Long>> entries = new ArrayList<>();
clientidToTimestampHolder.drainTo(entries);
for (Map.Entry<String, Long> entry : entries) {
String clientid = entry.getKey();
long timestamp = entry.getValue();
boolean isUpdated = isUpdatedClient(clientid, timestamp);
if (!isUpdated) {
updateClient(String.valueOf(clientid));
}
}
}
您可以(应该?)为您的数据创建自己的值 class 以存储 String
和 long
字段并使用它而不是通用 Map.Entry<String, Long>
.
现在,使用您的代码,您将主要观察到您的地图不一致,因为在一次迭代中,您的地图中可能有 [1: "value1",2: "value2",3: "value3"]
,而下一次迭代中您的地图可能是 [=14] =]。主要问题是,我认为 MultiMap 不能确保元素入队的顺序(请参阅 this post),因此您可以在迭代期间跳过一个元素(由您决定它是否危险)
如果您确实需要停止每个放置操作,您确实可以使用@Xaerxess 方法在 processData() 中同步映射。您提到的另一种可能性是制作一些 defensive copying,基本上是迭代 MultiMap 的快照,首先您会做:
public Multimap<String, Long> getClientidToTimestampHolder(){
return ImmutableSetMultimap.copyOf(clientidToTimestampHolder);
}
迭代将在此快照上完成:
public void processData() {
Multimap<String, Long> tmpClientToTimestampHolder = getClientidToTimestampHolder();
for (Entry<String, Collection<Long>> entry : tmpClientToTimestampHolder.asMap().entrySet()) {
String clientid = entry.getKey();
Collection<Long> timestamps = entry.getValue();
for (long timestamp : timestamps) {
boolean isUpdated = isUpdatedClient(clientid, timestamp);
if (!isUpdated) {
updateClient(String.valueOf(clientid));
}
}
}
}
看到你对删除的评论,你会想做一个同步块来做到这一点atomically:
synchronized (clientidToTimestampHolder){
clientidToTimestampHolder.remove(key, value);//fill key,value, or use removAll(key)
}
为什么需要同步?因为如果你想在时间 t 拥有精确的地图,那么你需要阻止其他线程向它添加元素。这是通过 Java 中的 locking 完成的,因此只要一个线程(这里是您的后台线程)获得地图上的锁,当您读取它时,其他线程将无法访问该多重地图.
我有一个下面的 class,其中我有一个 add
方法,该方法被另一个线程调用以填充我的 clientidToTimestampHolder
多图。然后在下面的 class 中,我启动了一个每 60 秒运行一次的后台线程,并调用一个 processData()
方法来迭代同一个地图并将所有这些数据发送到其他服务。
public class Handler {
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
private final Multimap<String, Long> clientidToTimestampHolder = ArrayListMultimap.create();
private static class Holder {
private static final Handler INSTANCE = new Handler();
}
public static Handler getInstance() {
return Holder.INSTANCE;
}
private Handler() {
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
processData();
}
}, 0, 60, TimeUnit.SECONDS);
}
// called by another thread to populate clientidToTimestampHolder map
public void add(final String clientid, final Long timestamp) {
clientidToTimestampHolder.put(clientid, timestamp);
}
// called by background thread
public void processData() {
for (Entry<String, Collection<Long>> entry : clientidToTimestampHolder.asMap().entrySet()) {
String clientid = entry.getKey();
Collection<Long> timestamps = entry.getValue();
for (long timestamp : timestamps) {
boolean isUpdated = isUpdatedClient(clientid, timestamp);
if (!isUpdated) {
updateClient(String.valueOf(clientid));
}
}
}
}
}
我的问题是,add
方法每次都会从不同的线程不断被调用。那么我是否需要创建 clientidToTimestampHolder
地图的副本并将该副本作为参数传递给 processData()
方法而不是直接在该地图上工作?
因为现在我正在使用相同的地图在其中填充数据,然后还迭代相同的地图以将内容发送到其他服务,所以我不会从该地图中删除数据,因此这些条目将始终存在那张地图。
解决这个问题的最佳方法是什么?而且我需要确保它是线程安全的并且没有竞争条件,因为我不能丢失任何 clientid
.
更新
那么我的 processData
方法将如下所示?
public void processData() {
synchronized (clientidToTimestampHolder) {
Iterator<Map.Entry<String, Long>> i = clientidToTimestampHolder.entries().iterator();
while (i.hasNext()) {
String clientid = i.next().getKey();
long timestamp = i.next().getValue();
boolean isUpdated = isUpdatedClient(clientid, timestamp);
if (!isUpdated) {
updateClient(clientid);
}
i.remove();
}
}
}
使用 Multimaps.synchronized(List)Multimap
包装器对多重映射进行线程安全引用(ArrayListMultimap
是一个 ListMultimap
,即将值存储在列表中):
private final ListMultimap<String, Long> clientidToTimestampHolder =
Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
请注意同步多图包装器有以下警告:
It is imperative that the user manually synchronize on the returned multimap when accessing any of its collection views:
// ...
Failure to follow this advice may result in non-deterministic behavior.
在您的情况下,您必须手动同步条目视图的迭代,因为它的迭代器未同步:
public void processData() {
synchronized (clientidToTimestampHolder) {
for (Map.Entry<String, Long> entry : clientidToTimestampHolder.entries()) {
String clientid = entry.getKey();
long timestamp = entry.getValue();
boolean isUpdated = isUpdatedClient(clientid, timestamp);
if (!isUpdated) {
updateClient(String.valueOf(clientid));
}
}
clientidToTimestampHolder.clear();
}
}
(我使用 Mutlimap.entries()
而不是 Multimap.asMap().entrySet()
因为这样更干净)。
此外,如果您想知道为什么没有通用 ConcurrentXxxMultimap
实现,请参阅 Guava's issue #135 and this comment quoting internal discussion about this:
I tried to build a general-purpose concurrent multimap, and it turned out to be slightly faster in a small fraction of uses and Much slower in most uses (compared to a synchronized multimap). I was focused on making as many operations as possible atomic; a weaker contract would eliminate some of this slowness, but would also detract from its usefulness.
I believe the Multimap interface is too "large" to support an efficient concurrent implementation - sorted or otherwise. (Clearly, this is an overstatement, but at the very least it requires either a lot of work or a loosening of the Multimap interface.)
编辑:
阅读您的评论,似乎是 XY Problem to me. Having that said, IMO you shouldn't use Multimap
here as you don't use any of its features, but rather take BlockingQueue
which has a handy drainTo(Collection)
方法(并且是线程安全的):
private final LinkedBlockingQueue<Map.Entry<String, Long>> clientidToTimestampHolder =
new LinkedBlockingQueue<>();
public void add(final String clientid, final Long timestamp) {
clientidToTimestampHolder.offer(Maps.immutableEntry(clientid, timestamp));
}
public void processData() {
final List<Map.Entry<String, Long>> entries = new ArrayList<>();
clientidToTimestampHolder.drainTo(entries);
for (Map.Entry<String, Long> entry : entries) {
String clientid = entry.getKey();
long timestamp = entry.getValue();
boolean isUpdated = isUpdatedClient(clientid, timestamp);
if (!isUpdated) {
updateClient(String.valueOf(clientid));
}
}
}
您可以(应该?)为您的数据创建自己的值 class 以存储 String
和 long
字段并使用它而不是通用 Map.Entry<String, Long>
.
现在,使用您的代码,您将主要观察到您的地图不一致,因为在一次迭代中,您的地图中可能有 [1: "value1",2: "value2",3: "value3"]
,而下一次迭代中您的地图可能是 [=14] =]。主要问题是,我认为 MultiMap 不能确保元素入队的顺序(请参阅 this post),因此您可以在迭代期间跳过一个元素(由您决定它是否危险)
如果您确实需要停止每个放置操作,您确实可以使用@Xaerxess 方法在 processData() 中同步映射。您提到的另一种可能性是制作一些 defensive copying,基本上是迭代 MultiMap 的快照,首先您会做:
public Multimap<String, Long> getClientidToTimestampHolder(){
return ImmutableSetMultimap.copyOf(clientidToTimestampHolder);
}
迭代将在此快照上完成:
public void processData() {
Multimap<String, Long> tmpClientToTimestampHolder = getClientidToTimestampHolder();
for (Entry<String, Collection<Long>> entry : tmpClientToTimestampHolder.asMap().entrySet()) {
String clientid = entry.getKey();
Collection<Long> timestamps = entry.getValue();
for (long timestamp : timestamps) {
boolean isUpdated = isUpdatedClient(clientid, timestamp);
if (!isUpdated) {
updateClient(String.valueOf(clientid));
}
}
}
}
看到你对删除的评论,你会想做一个同步块来做到这一点atomically:
synchronized (clientidToTimestampHolder){
clientidToTimestampHolder.remove(key, value);//fill key,value, or use removAll(key)
}
为什么需要同步?因为如果你想在时间 t 拥有精确的地图,那么你需要阻止其他线程向它添加元素。这是通过 Java 中的 locking 完成的,因此只要一个线程(这里是您的后台线程)获得地图上的锁,当您读取它时,其他线程将无法访问该多重地图.