如何以原子方式在作为 ConcurrentMap 映射的一部分的 ConcurrentLinkedQueue 中添加延迟?

How to add latencies in the ConcurrentLinkedQueue which is part of ConcurrentMap map in atomic way?

我正在计算我的应用程序中的延迟(以毫秒为单位),我想将这些指标插入线程安全列表结构中。然后我将使用该列表计算平均值、中位数、第 95 个百分位数。所以我 looked 它了,我没有看到太多的列表选项,所以我决定使用 ConcurrentLinkedQueue 来存储线程安全的延迟。如果我应该为此使用任何其他更好的线程安全数据结构,请告诉我。

public class LatencyMetricHolder {
  private final ConcurrentLinkedQueue<Long> latenciesHolder = new ConcurrentLinkedQueue<>();

  private static class Holder {
    private static final LatencyMetricHolder INSTANCE = new LatencyMetricHolder();
  }

  public static LatencyMetricHolder getInstance() {
    return Holder.INSTANCE;
  }

  private LatencyMetricHolder() {}    

  public void addLatencies(long latency) {
    latenciesHolder.add(latency);
  }

  public ConcurrentLinkedQueue<Long> getLatenciesHolder() {
    return latenciesHolder;
  }
}

我正在调用 addLatencies 方法来填充多线程代码的延迟。

现在我希望每个 processId 都有 latenciesHolder,这是一个字符串。这意味着我们也可以多次获得相同的 processId,有时它会是一个新的 processId,所以不知何故,我需要为那个 processId 提取 latenciesHolder 队列并添加以线程安全方式和原子方式在特定队列上延迟。

所以我决定为此使用并发映射,如下所示,其中键为 processId:

private final Map<String, ConcurrentLinkedQueue<Long>> latenciesHolderByProcessId = Maps.newConcurrentMap();

因为我使用的是地图,所以我需要同步创建新的 ConcurrentLinkedQueue 实例,这在 Java 中有点棘手 7 我猜我在 Java 7.

在没有太多争用的情况下从多个线程以原子方式填充此映射的正确方法是什么?如果有任何方法,我想使用并发集合而不是传统锁定?

更新:

  public void add(String type, long latencyInMs) {
    ConcurrentLinkedQueue<Long> latencyHolder = latenciesHolderByProcessId.get(type);
    if (latencyHolder == null) {
      latencyHolder = Queues.newConcurrentLinkedQueue();
      ConcurrentLinkedQueue<Long> currentLatencyHolder =
          latenciesHolderByProcessId.putIfAbsent(type, latencyHolder);
      if (currentLatencyHolder != null) {
        latencyHolder = currentLatencyHolder;
      }
    }
    latencyHolder.add(latencyInMs);
  }

好的,我不确定我的所有规格是否正确,但这是一种方法:

public class LatencyMetricHolder {

 HashMap<Integer, ConcurrentLinkedQueue<Long>> map = new HashMap<>();

 public void addLatency(int processID, long latency) {
    synchronized(map) {
      ConcurrentLinkedQueue<Long> q = map.get(processID);
      if (q == null) {
        q = new ConcurrentLinkedQueue<Long>()
        map.put(processID, q);
      }
    }
    q.add(latency)
  }

  public ConcurrentLinkedQueue<Long> getLatenciesHolder(int processID) {
    synchronized(map) {
      return map.get(processId);
    }
  }
}