在单个后台线程定期修改地图的同时读取地图
Concurrently reading a Map while a single background thread regularly modifies it
我有一个 class,其中我在 updateLiveSockets()
方法中每 30 秒从一个后台线程填充一个映射 liveSocketsByDatacenter
然后我有一个方法 getNextSocket()
它将被多个 reader 线程调用以获得可用的实时套接字,该套接字使用相同的映射来获取此信息。
public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final AtomicReference<Map<Datacenters, List<SocketHolder>>> liveSocketsByDatacenter =
new AtomicReference<>(Collections.unmodifiableMap(new HashMap<>()));
private final ZContext ctx = new ZContext();
// Lazy Loaded Singleton Pattern
private static class Holder {
private static final SocketManager instance = new SocketManager();
}
public static SocketManager getInstance() {
return Holder.instance;
}
private SocketManager() {
connectToZMQSockets();
scheduler.scheduleAtFixedRate(new Runnable() {
public void run() {
updateLiveSockets();
}
}, 30, 30, TimeUnit.SECONDS);
}
// during startup, making a connection and populate once
private void connectToZMQSockets() {
Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
// The map in which I put all the live sockets
Map<Datacenters, List<SocketHolder>> updatedLiveSocketsByDatacenter = new HashMap<>();
for (Map.Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> addedColoSockets = connect(entry.getKey(), entry.getValue(), ZMQ.PUSH);
updatedLiveSocketsByDatacenter.put(entry.getKey(),
Collections.unmodifiableList(addedColoSockets));
}
// Update the map content
this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(updatedLiveSocketsByDatacenter));
}
private List<SocketHolder> connect(Datacenters colo, List<String> addresses, int socketType) {
List<SocketHolder> socketList = new ArrayList<>();
for (String address : addresses) {
try {
Socket client = ctx.createSocket(socketType);
// Set random identity to make tracing easier
String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
client.setIdentity(identity.getBytes(ZMQ.CHARSET));
client.setTCPKeepAlive(1);
client.setSendTimeOut(7);
client.setLinger(0);
client.connect(address);
SocketHolder zmq = new SocketHolder(client, ctx, address, true);
socketList.add(zmq);
} catch (Exception ex) {
// log error
}
}
return socketList;
}
// this method will be called by multiple threads to get the next live socket
// is there any concurrency or thread safety issue or race condition here?
public Optional<SocketHolder> getNextSocket() {
// For the sake of consistency make sure to use the same map instance
// in the whole implementation of my method by getting my entries
// from the local variable instead of the member variable
Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
this.liveSocketsByDatacenter.get();
Optional<SocketHolder> liveSocket = Optional.absent();
List<Datacenters> dcs = Datacenters.getOrderedDatacenters();
for (Datacenters dc : dcs) {
liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
if (liveSocket.isPresent()) {
break;
}
}
return liveSocket;
}
// is there any concurrency or thread safety issue or race condition here?
private Optional<SocketHolder> getLiveSocketX(final List<SocketHolder> endpoints) {
if (!CollectionUtils.isEmpty(endpoints)) {
// The list of live sockets
List<SocketHolder> liveOnly = new ArrayList<>(endpoints.size());
for (SocketHolder obj : endpoints) {
if (obj.isLive()) {
liveOnly.add(obj);
}
}
if (!liveOnly.isEmpty()) {
// The list is not empty so we shuffle it an return the first element
Collections.shuffle(liveOnly);
return Optional.of(liveOnly.get(0));
}
}
return Optional.absent();
}
// Added the modifier synchronized to prevent concurrent modification
// it is needed because to build the new map we first need to get the
// old one so both must be done atomically to prevent concistency issues
private synchronized void updateLiveSockets() {
Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
// Initialize my new map with the current map content
Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
new HashMap<>(this.liveSocketsByDatacenter.get());
for (Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) { // LINE A
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
Message message = new Message(holder, Partition.COMMAND);
boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
boolean isLive = (status) ? true : false;
// is there any problem the way I am using `SocketHolder` class?
SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
liveSocketsByDatacenter.put(entry.getKey(),
Collections.unmodifiableList(liveUpdatedSockets));
}
this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSocketsByDatacenter));
}
}
正如你在我的 class 中看到的那样:
- 从每 30 秒运行一次的单个后台线程,我在
updateLiveSockets()
方法中使用所有活动套接字填充 liveSocketsByDatacenter
映射。
- 然后从多个线程中,我调用
getNextSocket()
方法给我一个可用的实时套接字,它使用 liveSocketsByDatacenter
映射来获取所需的信息。
我的代码工作正常,没有任何问题,想看看是否有更好或更有效的方法来编写它。我还想就线程安全问题或任何竞争条件(如果有的话)发表意见,但到目前为止我还没有看到,但我可能是错的。
我最担心的是updateLiveSockets()
方法和getLiveSocketX()
方法。我正在迭代 liveSockets
,这是 A 行 SocketHolder
的 List
,然后创建一个新的 SocketHolder
对象并添加到另一个新列表。这里可以吗?
注意: SocketHolder
是不可变的 class。你可以忽略 ZeroMQ
我有的东西。
如果 SocketHolder
和 Datacenters,
是不可变的,那么您的程序看起来不错。不过,这里有一些小的反馈。
1.原子引用的用法
AtomicReference<Map<Datacenters, List<SocketHolder>>>
liveSocketsByDatacenter
这个成员变量不需要包装在AtomicReference中。您没有对它执行任何原子 CAS 操作。您可以简单地声明一个 volative Map<Datacenters, List<SocketHolder>>
,并在读取它时,简单地创建一个对它的本地引用。这足以保证对新 Map 的引用进行原子交换。
2。同步方法
private synchronized void updateLiveSockets()
这个方法是从单线程执行器调用的,所以不需要同步。
3。一些简化
从您目前对这个 class 的使用情况来看,您似乎可以过滤掉 updateLiveSockets
中不活动的套接字,避免每次客户端调用时都进行过滤 getNextSocket
你可以替换
Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS
通过 Set<Datacenters> datacenters = Utils.SERVERS.keySet()
并使用按键。
4。 Java8
如果可能,切换到 Java 8. Streams 与 Java8 的 Optional 一起删除大量样板代码并使您的代码更易于阅读。
您使用以下同步技术。
- 具有实时套接字数据的地图位于原子引用之后,这允许安全地切换地图。
updateLiveSockets()
方法是同步的(隐含在这上面),这将防止两个线程同时切换地图。
- 如果在
getNextSocket()
方法期间发生切换,则在使用地图时对地图进行本地引用以避免混淆。
像现在这样是线程安全的吗?
线程安全始终取决于是否对共享可变数据进行了适当的同步。在这种情况下,共享的可变数据是数据中心到它们的 SocketHolders 列表的映射。
地图在 AtomicReference
中,制作本地副本以供使用这一事实足以在地图上同步。您的方法采用地图的一个版本并使用它,由于 AtomicReference
的性质,切换版本是线程安全的。这也可以通过为映射 volatile
创建成员字段来实现,因为您所做的只是更新引用(您不对其进行任何先检查后操作)。
由于scheduleAtFixedRate()
保证传递的Runnable
不会与自身同时运行,因此不需要updateLiveSockets()
上的synchronized
,但是,它也不会造成任何真正的伤害。
所以是的,这个 class 是线程安全的。
但是,SocketHolder
是否可以同时被多个线程使用并不完全清楚。事实上,这个 class 只是试图通过选择一个随机的活动索引来尽量减少 SocketHolder
的并发使用(尽管不需要洗牌整个数组来选择一个随机索引)。它实际上并没有阻止并发使用。
可以提高效率吗?
我相信可以。在查看 updateLiveSockets()
方法时,它似乎构建了完全相同的地图,只是 SocketHolder
可能具有不同的 isLive
标志值。这使我得出结论,与其切换整个地图,我只想切换地图中的每个列表。为了以线程安全的方式更改映射中的条目,我可以只使用 ConcurrentHashMap
.
如果我使用 ConcurrentHashMap
,并且不切换地图,而是切换地图中的值,我可以摆脱 AtomicReference
。
要更改映射,我可以构建新列表并将其直接放入地图中。这样效率更高,因为我可以更快地发布数据,创建更少的对象,而我的同步只是建立在现成的组件上,这有利于提高可读性。
这是我的构建(为简洁起见,省略了一些不太相关的部分)
public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<>(); // use ConcurrentHashMap
private final ZContext ctx = new ZContext();
// ...
private SocketManager() {
connectToZMQSockets();
scheduler.scheduleAtFixedRate(this::updateLiveSockets, 30, 30, TimeUnit.SECONDS);
}
// during startup, making a connection and populate once
private void connectToZMQSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets); // we can put it straight into the map
}
}
// ...
// this method will be called by multiple threads to get the next live socket
// is there any concurrency or thread safety issue or race condition here?
public Optional<SocketHolder> getNextSocket() {
for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc)); // no more need for a local copy, ConcurrentHashMap, makes sure I get the latest mapped List<SocketHolder>
if (liveSocket.isPresent()) {
return liveSocket;
}
}
return Optional.absent();
}
// is there any concurrency or thread safety issue or race condition here?
private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
if (!CollectionUtils.isEmpty(listOfEndPoints)) {
// The list of live sockets
List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
for (SocketHolder obj : listOfEndPoints) {
if (obj.isLive()) {
liveOnly.add(obj);
}
}
if (!liveOnly.isEmpty()) {
// The list is not empty so we shuffle it an return the first element
return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
}
}
return Optional.absent();
}
// no need to make this synchronized
private void updateLiveSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) { // LINE A
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
Message message = new Message(holder, Partition.COMMAND);
boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
boolean isLive = (status) ? true : false;
SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)); // just put it straigth into the map, the mapping will be updated in a thread safe manner.
}
}
}
我有一个 class,其中我在 updateLiveSockets()
方法中每 30 秒从一个后台线程填充一个映射 liveSocketsByDatacenter
然后我有一个方法 getNextSocket()
它将被多个 reader 线程调用以获得可用的实时套接字,该套接字使用相同的映射来获取此信息。
public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final AtomicReference<Map<Datacenters, List<SocketHolder>>> liveSocketsByDatacenter =
new AtomicReference<>(Collections.unmodifiableMap(new HashMap<>()));
private final ZContext ctx = new ZContext();
// Lazy Loaded Singleton Pattern
private static class Holder {
private static final SocketManager instance = new SocketManager();
}
public static SocketManager getInstance() {
return Holder.instance;
}
private SocketManager() {
connectToZMQSockets();
scheduler.scheduleAtFixedRate(new Runnable() {
public void run() {
updateLiveSockets();
}
}, 30, 30, TimeUnit.SECONDS);
}
// during startup, making a connection and populate once
private void connectToZMQSockets() {
Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
// The map in which I put all the live sockets
Map<Datacenters, List<SocketHolder>> updatedLiveSocketsByDatacenter = new HashMap<>();
for (Map.Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> addedColoSockets = connect(entry.getKey(), entry.getValue(), ZMQ.PUSH);
updatedLiveSocketsByDatacenter.put(entry.getKey(),
Collections.unmodifiableList(addedColoSockets));
}
// Update the map content
this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(updatedLiveSocketsByDatacenter));
}
private List<SocketHolder> connect(Datacenters colo, List<String> addresses, int socketType) {
List<SocketHolder> socketList = new ArrayList<>();
for (String address : addresses) {
try {
Socket client = ctx.createSocket(socketType);
// Set random identity to make tracing easier
String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
client.setIdentity(identity.getBytes(ZMQ.CHARSET));
client.setTCPKeepAlive(1);
client.setSendTimeOut(7);
client.setLinger(0);
client.connect(address);
SocketHolder zmq = new SocketHolder(client, ctx, address, true);
socketList.add(zmq);
} catch (Exception ex) {
// log error
}
}
return socketList;
}
// this method will be called by multiple threads to get the next live socket
// is there any concurrency or thread safety issue or race condition here?
public Optional<SocketHolder> getNextSocket() {
// For the sake of consistency make sure to use the same map instance
// in the whole implementation of my method by getting my entries
// from the local variable instead of the member variable
Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
this.liveSocketsByDatacenter.get();
Optional<SocketHolder> liveSocket = Optional.absent();
List<Datacenters> dcs = Datacenters.getOrderedDatacenters();
for (Datacenters dc : dcs) {
liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
if (liveSocket.isPresent()) {
break;
}
}
return liveSocket;
}
// is there any concurrency or thread safety issue or race condition here?
private Optional<SocketHolder> getLiveSocketX(final List<SocketHolder> endpoints) {
if (!CollectionUtils.isEmpty(endpoints)) {
// The list of live sockets
List<SocketHolder> liveOnly = new ArrayList<>(endpoints.size());
for (SocketHolder obj : endpoints) {
if (obj.isLive()) {
liveOnly.add(obj);
}
}
if (!liveOnly.isEmpty()) {
// The list is not empty so we shuffle it an return the first element
Collections.shuffle(liveOnly);
return Optional.of(liveOnly.get(0));
}
}
return Optional.absent();
}
// Added the modifier synchronized to prevent concurrent modification
// it is needed because to build the new map we first need to get the
// old one so both must be done atomically to prevent concistency issues
private synchronized void updateLiveSockets() {
Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS;
// Initialize my new map with the current map content
Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter =
new HashMap<>(this.liveSocketsByDatacenter.get());
for (Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) { // LINE A
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
Message message = new Message(holder, Partition.COMMAND);
boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
boolean isLive = (status) ? true : false;
// is there any problem the way I am using `SocketHolder` class?
SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
liveSocketsByDatacenter.put(entry.getKey(),
Collections.unmodifiableList(liveUpdatedSockets));
}
this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSocketsByDatacenter));
}
}
正如你在我的 class 中看到的那样:
- 从每 30 秒运行一次的单个后台线程,我在
updateLiveSockets()
方法中使用所有活动套接字填充liveSocketsByDatacenter
映射。 - 然后从多个线程中,我调用
getNextSocket()
方法给我一个可用的实时套接字,它使用liveSocketsByDatacenter
映射来获取所需的信息。
我的代码工作正常,没有任何问题,想看看是否有更好或更有效的方法来编写它。我还想就线程安全问题或任何竞争条件(如果有的话)发表意见,但到目前为止我还没有看到,但我可能是错的。
我最担心的是updateLiveSockets()
方法和getLiveSocketX()
方法。我正在迭代 liveSockets
,这是 A 行 SocketHolder
的 List
,然后创建一个新的 SocketHolder
对象并添加到另一个新列表。这里可以吗?
注意: SocketHolder
是不可变的 class。你可以忽略 ZeroMQ
我有的东西。
如果 SocketHolder
和 Datacenters,
是不可变的,那么您的程序看起来不错。不过,这里有一些小的反馈。
1.原子引用的用法
AtomicReference<Map<Datacenters, List<SocketHolder>>> liveSocketsByDatacenter
这个成员变量不需要包装在AtomicReference中。您没有对它执行任何原子 CAS 操作。您可以简单地声明一个 volative Map<Datacenters, List<SocketHolder>>
,并在读取它时,简单地创建一个对它的本地引用。这足以保证对新 Map 的引用进行原子交换。
2。同步方法
private synchronized void updateLiveSockets()
这个方法是从单线程执行器调用的,所以不需要同步。
3。一些简化
从您目前对这个 class 的使用情况来看,您似乎可以过滤掉
updateLiveSockets
中不活动的套接字,避免每次客户端调用时都进行过滤getNextSocket
你可以替换
Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS
通过Set<Datacenters> datacenters = Utils.SERVERS.keySet()
并使用按键。4。 Java8
如果可能,切换到 Java 8. Streams 与 Java8 的 Optional 一起删除大量样板代码并使您的代码更易于阅读。
您使用以下同步技术。
- 具有实时套接字数据的地图位于原子引用之后,这允许安全地切换地图。
updateLiveSockets()
方法是同步的(隐含在这上面),这将防止两个线程同时切换地图。- 如果在
getNextSocket()
方法期间发生切换,则在使用地图时对地图进行本地引用以避免混淆。
像现在这样是线程安全的吗?
线程安全始终取决于是否对共享可变数据进行了适当的同步。在这种情况下,共享的可变数据是数据中心到它们的 SocketHolders 列表的映射。
地图在 AtomicReference
中,制作本地副本以供使用这一事实足以在地图上同步。您的方法采用地图的一个版本并使用它,由于 AtomicReference
的性质,切换版本是线程安全的。这也可以通过为映射 volatile
创建成员字段来实现,因为您所做的只是更新引用(您不对其进行任何先检查后操作)。
由于scheduleAtFixedRate()
保证传递的Runnable
不会与自身同时运行,因此不需要updateLiveSockets()
上的synchronized
,但是,它也不会造成任何真正的伤害。
所以是的,这个 class 是线程安全的。
但是,SocketHolder
是否可以同时被多个线程使用并不完全清楚。事实上,这个 class 只是试图通过选择一个随机的活动索引来尽量减少 SocketHolder
的并发使用(尽管不需要洗牌整个数组来选择一个随机索引)。它实际上并没有阻止并发使用。
可以提高效率吗?
我相信可以。在查看 updateLiveSockets()
方法时,它似乎构建了完全相同的地图,只是 SocketHolder
可能具有不同的 isLive
标志值。这使我得出结论,与其切换整个地图,我只想切换地图中的每个列表。为了以线程安全的方式更改映射中的条目,我可以只使用 ConcurrentHashMap
.
如果我使用 ConcurrentHashMap
,并且不切换地图,而是切换地图中的值,我可以摆脱 AtomicReference
。
要更改映射,我可以构建新列表并将其直接放入地图中。这样效率更高,因为我可以更快地发布数据,创建更少的对象,而我的同步只是建立在现成的组件上,这有利于提高可读性。
这是我的构建(为简洁起见,省略了一些不太相关的部分)
public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<>(); // use ConcurrentHashMap
private final ZContext ctx = new ZContext();
// ...
private SocketManager() {
connectToZMQSockets();
scheduler.scheduleAtFixedRate(this::updateLiveSockets, 30, 30, TimeUnit.SECONDS);
}
// during startup, making a connection and populate once
private void connectToZMQSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets); // we can put it straight into the map
}
}
// ...
// this method will be called by multiple threads to get the next live socket
// is there any concurrency or thread safety issue or race condition here?
public Optional<SocketHolder> getNextSocket() {
for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc)); // no more need for a local copy, ConcurrentHashMap, makes sure I get the latest mapped List<SocketHolder>
if (liveSocket.isPresent()) {
return liveSocket;
}
}
return Optional.absent();
}
// is there any concurrency or thread safety issue or race condition here?
private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
if (!CollectionUtils.isEmpty(listOfEndPoints)) {
// The list of live sockets
List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
for (SocketHolder obj : listOfEndPoints) {
if (obj.isLive()) {
liveOnly.add(obj);
}
}
if (!liveOnly.isEmpty()) {
// The list is not empty so we shuffle it an return the first element
return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
}
}
return Optional.absent();
}
// no need to make this synchronized
private void updateLiveSockets() {
Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
for (SocketHolder liveSocket : liveSockets) { // LINE A
Socket socket = liveSocket.getSocket();
String endpoint = liveSocket.getEndpoint();
Map<byte[], byte[]> holder = populateMap();
Message message = new Message(holder, Partition.COMMAND);
boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
boolean isLive = (status) ? true : false;
SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
liveUpdatedSockets.add(zmq);
}
liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)); // just put it straigth into the map, the mapping will be updated in a thread safe manner.
}
}
}