如何从不同的线程将条目填充到地图中,然后从单个后台线程迭代地图并发送?

How to populate entries into a map from a different thread and then from a single background thread iterate the map and send?

我有一个下面的 class,其中我有一个 add 方法,该方法被另一个线程调用以填充我的 clientidToTimestampHolder 多图。然后在下面的 class 中,我启动了一个每 60 秒运行一次的后台线程,并调用一个 processData() 方法来迭代同一个地图并将所有这些数据发送到其他服务。

public class Handler {
  private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
  private final Multimap<String, Long> clientidToTimestampHolder = ArrayListMultimap.create();

  private static class Holder {
    private static final Handler INSTANCE = new Handler();
  }

  public static Handler getInstance() {
    return Holder.INSTANCE;
  }

  private Handler() {
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        processData();
      }
    }, 0, 60, TimeUnit.SECONDS);
  }

  // called by another thread to populate clientidToTimestampHolder map
  public void add(final String clientid, final Long timestamp) {
    clientidToTimestampHolder.put(clientid, timestamp);
  }

  // called by background thread
  public void processData() {
    for (Entry<String, Collection<Long>> entry : clientidToTimestampHolder.asMap().entrySet()) {
      String clientid = entry.getKey();
      Collection<Long> timestamps = entry.getValue();
      for (long timestamp : timestamps) {
        boolean isUpdated = isUpdatedClient(clientid, timestamp);
        if (!isUpdated) {
          updateClient(String.valueOf(clientid));
        }
      }
    }
  }
}

我的问题是,add 方法每次都会从不同的线程不断被调用。那么我是否需要创建 clientidToTimestampHolder 地图的副本并将该副本作为参数传递给 processData() 方法而不是直接在该地图上工作?

因为现在我正在使用相同的地图在其中填充数据,然后还迭代相同的地图以将内容发送到其他服务,所以我不会从该地图中删除数据,因此这些条目将始终存在那张地图。

解决这个问题的最佳方法是什么?而且我需要确保它是线程安全的并且没有竞争条件,因为我不能丢失任何 clientid.

更新

那么我的 processData 方法将如下所示?

  public void processData() {
    synchronized (clientidToTimestampHolder) {
      Iterator<Map.Entry<String, Long>> i = clientidToTimestampHolder.entries().iterator();
      while (i.hasNext()) {
        String clientid = i.next().getKey();
        long timestamp = i.next().getValue();
        boolean isUpdated = isUpdatedClient(clientid, timestamp);
        if (!isUpdated) {
          updateClient(clientid);
        }
        i.remove();
      }
    }
  }

使用 Multimaps.synchronized(List)Multimap 包装器对多重映射进行线程安全引用(ArrayListMultimap 是一个 ListMultimap,即将值存储在列表中):

private final ListMultimap<String, Long> clientidToTimestampHolder = 
    Multimaps.synchronizedListMultimap(ArrayListMultimap.create());

请注意同步多图包装器有以下警告:

It is imperative that the user manually synchronize on the returned multimap when accessing any of its collection views:

// ...  

Failure to follow this advice may result in non-deterministic behavior.

在您的情况下,您必须手动同步条目视图的迭代,因为它的迭代器未同步:

public void processData() {
  synchronized (clientidToTimestampHolder) {
    for (Map.Entry<String, Long> entry : clientidToTimestampHolder.entries()) {
      String clientid = entry.getKey();
      long timestamp = entry.getValue();
      boolean isUpdated = isUpdatedClient(clientid, timestamp);
      if (!isUpdated) {
        updateClient(String.valueOf(clientid));
      }
    }
    clientidToTimestampHolder.clear();
  }
}

(我使用 Mutlimap.entries() 而不是 Multimap.asMap().entrySet() 因为这样更干净)。

此外,如果您想知道为什么没有通用 ConcurrentXxxMultimap 实现,请参阅 Guava's issue #135 and this comment quoting internal discussion about this:

I tried to build a general-purpose concurrent multimap, and it turned out to be slightly faster in a small fraction of uses and Much slower in most uses (compared to a synchronized multimap). I was focused on making as many operations as possible atomic; a weaker contract would eliminate some of this slowness, but would also detract from its usefulness.

I believe the Multimap interface is too "large" to support an efficient concurrent implementation - sorted or otherwise. (Clearly, this is an overstatement, but at the very least it requires either a lot of work or a loosening of the Multimap interface.)

编辑:

阅读您的评论,似乎是 XY Problem to me. Having that said, IMO you shouldn't use Multimap here as you don't use any of its features, but rather take BlockingQueue which has a handy drainTo(Collection) 方法(并且是线程安全的):

private final LinkedBlockingQueue<Map.Entry<String, Long>> clientidToTimestampHolder =
    new LinkedBlockingQueue<>();

public void add(final String clientid, final Long timestamp) {
  clientidToTimestampHolder.offer(Maps.immutableEntry(clientid, timestamp));
}

public void processData() {
  final List<Map.Entry<String, Long>> entries = new ArrayList<>();
  clientidToTimestampHolder.drainTo(entries);
  for (Map.Entry<String, Long> entry : entries) {
    String clientid = entry.getKey();
    long timestamp = entry.getValue();
    boolean isUpdated = isUpdatedClient(clientid, timestamp);
    if (!isUpdated) {
      updateClient(String.valueOf(clientid));
    }
  }
}

您可以(应该?)为您的数据创建自己的值 class 以存储 Stringlong 字段并使用它而不是通用 Map.Entry<String, Long>.

现在,使用您的代码,您将主要观察到您的地图不一致,因为在一次迭代中,您的地图中可能有 [1: "value1",2: "value2",3: "value3"],而下一次迭代中您的地图可能是 [=14] =]。主要问题是,我认为 MultiMap 不能确保元素入队的顺序(请参阅 this post),因此您可以在迭代期间跳过一个元素(由您决定它是否危险)

如果您确实需要停止每个放置操作,您确实可以使用@Xaerxess 方法在 processData() 中同步映射。您提到的另一种可能性是制作一些 defensive copying,基本上是迭代 MultiMap 的快照,首先您会做:

public Multimap<String, Long> getClientidToTimestampHolder(){
    return ImmutableSetMultimap.copyOf(clientidToTimestampHolder);
}

迭代将在此快照上完成:

 public void processData() {
    Multimap<String, Long> tmpClientToTimestampHolder = getClientidToTimestampHolder();
    for (Entry<String, Collection<Long>> entry : tmpClientToTimestampHolder.asMap().entrySet()) {
      String clientid = entry.getKey();
      Collection<Long> timestamps = entry.getValue();
      for (long timestamp : timestamps) {
        boolean isUpdated = isUpdatedClient(clientid, timestamp);
        if (!isUpdated) {
          updateClient(String.valueOf(clientid));
        }
      }
    }
  }

看到你对删除的评论,你会想做一个同步块来做到这一点atomically:

synchronized (clientidToTimestampHolder){
            clientidToTimestampHolder.remove(key, value);//fill key,value, or use removAll(key)
}

为什么需要同步?因为如果你想在时间 t 拥有精确的地图,那么你需要阻止其他线程向它添加元素。这是通过 Java 中的 locking 完成的,因此只要一个线程(这里是您的后台线程)获得地图上的锁,当您读取它时,其他线程将无法访问该多重地图.