发送记录并等待其确认接收
Send record and wait for its acknowledgement to receive
我在下面使用 class 通过使用套接字以同步方式或异步方式将数据发送到我们的消息队列,如下所示。
sendAsync
- 它异步发送数据,没有任何超时。发送 (on LINE A)
后,它会添加到 retryHolder
存储桶中,这样如果未收到确认,它将从在构造函数中启动的后台线程再次重试。
send
- 它在内部调用 sendAsync
方法,然后休眠一段特定的超时时间,如果未收到确认,则它会从 retryHolder
存储桶中删除,这样我们就不会再试一次。
因此,上述两种方法之间的唯一区别是——对于异步,我需要不惜一切代价重试,但对于同步,我不需要重试,但看起来它可能会被重试,因为我们共享同一个重试桶缓存和重试线程每 1 秒运行一次。
ResponsePoller
是一个 class,它接收发送到我们的消息队列的数据的确认,然后调用下面的 removeFromretryHolder
方法来删除地址,这样我们就不会t 收到确认后重试。
public class SendToQueue {
private final ExecutorService cleanupExecutor = Executors.newFixedThreadPool(5);
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
private final Cache<Long, byte[]> retryHolder =
CacheBuilder
.newBuilder()
.maximumSize(1000000)
.concurrencyLevel(100)
.removalListener(
RemovalListeners.asynchronous(new LoggingRemovalListener(), cleanupExecutor)).build();
private static class Holder {
private static final SendToQueue INSTANCE = new SendToQueue();
}
public static SendToQueue getInstance() {
return Holder.INSTANCE;
}
private SendToQueue() {
executorService.submit(new ResponsePoller()); // another thread which receives acknowledgement and then delete entry from the `retryHolder` cache accordingly.
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
// retry again
for (Entry<Long, byte[]> entry : retryHolder.asMap().entrySet()) {
sendAsync(entry.getKey(), entry.getValue());
}
}
}, 0, 1, TimeUnit.SECONDS);
}
public boolean sendAsync(final long address, final byte[] encodedRecords, final Socket socket) {
ZMsg msg = new ZMsg();
msg.add(encodedRecords);
// send data on a socket LINE A
boolean sent = msg.send(socket);
msg.destroy();
retryHolder.put(address, encodedRecords);
return sent;
}
public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
boolean sent = sendAsync(address, encodedRecords, socket);
// if the record was sent successfully, then only sleep for timeout period
if (sent) {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
// if key is not present, then acknowledgement was received successfully
sent = !retryHolder.asMap().containsKey(address);
// and key is still present in the cache, then it means acknowledgment was not received after
// waiting for timeout period, so we will remove it from cache.
if (!sent)
removeFromretryHolder(address);
return sent;
}
public void removeFromretryHolder(final long address) {
retryHolder.invalidate(address);
}
}
如果有人调用 send
方法但我们仍然需要知道是否收到确认,我们不重试的最佳方法是什么。唯一的问题是我根本不需要重试。
我们是否需要单独的存储桶来处理所有同步调用只是为了确认并且我们不从该存储桶重试?
该代码有许多潜在问题:
- 在致电
retryHolder#put
之前可能会收到答复。
- 重试消息时可能存在竞争条件。
- 如果将两封邮件发送到同一个地址,第二封会覆盖第一封吗?
- 发送总是浪费时间睡觉,改用
wait
+notify
。
我会存储具有更多状态的 class。它可以包含重试处理程序可以检查的标志 (retryIfNoAnswer
yes/no)。它可以使用 wait
/notify
提供 waitForAnswer
/markAnswerReceived
方法,这样 send 就不必在固定时间休眠。如果获得答案,waitForAnswer
方法可以 return 为真,超时则为假。在发送之前将对象放入重试处理程序并使用时间戳,以便仅重试超过特定年龄的消息。这修复了第一个竞争条件。
编辑:更新了下面的示例代码,使用您的代码编译,未测试:
public class SendToQueue {
private final ExecutorService cleanupExecutor = Executors.newFixedThreadPool(5);
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
// Not sure why you are using a cache rather than a standard ConcurrentHashMap?
private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
.concurrencyLevel(100)
.removalListener(RemovalListeners.asynchronous(new LoggingRemovalListener(), cleanupExecutor)).build();
private static class PendingMessage {
private final long _address;
private final byte[] _encodedRecords;
private final Socket _socket;
private final boolean _retryEnabled;
private final Object _monitor = new Object();
private long _sendTimeMillis;
private volatile boolean _acknowledged;
public PendingMessage(long address, byte[] encodedRecords, Socket socket, boolean retryEnabled) {
_address = address;
_sendTimeMillis = System.currentTimeMillis();
_encodedRecords = encodedRecords;
_socket = socket;
_retryEnabled = retryEnabled;
}
public synchronized boolean hasExpired() {
return System.currentTimeMillis() - _sendTimeMillis > 500L;
}
public synchronized void markResent() {
_sendTimeMillis = System.currentTimeMillis();
}
public boolean shouldRetry() {
return _retryEnabled && !_acknowledged;
}
public boolean waitForAck() {
try {
synchronized(_monitor) {
_monitor.wait(500L);
}
return _acknowledged;
}
catch (InterruptedException e) {
return false;
}
}
public void ackReceived() {
_acknowledged = true;
synchronized(_monitor) {
_monitor.notifyAll();
}
}
public long getAddress() {
return _address;
}
public byte[] getEncodedRecords() {
return _encodedRecords;
}
public Socket getSocket() {
return _socket;
}
}
private static class Holder {
private static final SendToQueue INSTANCE = new SendToQueue();
}
public static SendToQueue getInstance() {
return Holder.INSTANCE;
}
private void handleRetries() {
List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
for (PendingMessage m : messages) {
if (m.hasExpired()) {
if (m.shouldRetry()) {
m.markResent();
doSendAsync(m, m.getSocket());
}
else {
// Or leave the message and let send remove it
cache.invalidate(m.getAddress());
}
}
}
}
private SendToQueue() {
executorService.submit(new ResponsePoller()); // another thread which receives acknowledgement and then delete entry from the cache accordingly.
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
handleRetries();
}
}, 0, 1, TimeUnit.SECONDS);
}
public boolean sendAsync(final long address, final byte[] encodedRecords, final Socket socket) {
PendingMessage m = new PendingMessage(address, encodedRecords, socket, true);
cache.put(address, m);
return doSendAsync(m, socket);
}
private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
ZMsg msg = new ZMsg();
msg.add(pendingMessage.getEncodedRecords());
try {
// send data on a socket LINE A
return msg.send(socket);
}
finally {
msg.destroy();
}
}
public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
PendingMessage m = new PendingMessage(address, encodedRecords, socket, false);
cache.put(address, m);
try {
if (doSendAsync(m, socket)) {
return m.waitForAck();
}
return false;
}
finally {
// Alternatively (checks that address points to m):
// cache.asMap().remove(address, m);
cache.invalidate(address);
}
}
public void handleAckReceived(final long address) {
PendingMessage m = cache.getIfPresent(address);
if (m != null) {
m.ackReceived();
cache.invalidate(address);
}
}
}
并从 ResponsePoller
调用:
SendToQueue.getInstance().handleAckReceived(addressFrom);
设计方面:我觉得你正在尝试编写一个线程安全且有点高效的 NIO 消息 sender/receiver 但是(两者)我在这里看到的代码都不好,而且不会没有重要意义变化。最好的做法是:
- 充分利用
0MQ
框架。我在这里看到的东西和期望在 ZMQ
和 java.util.concurrent
API. 中实际上是开箱即用的
- 或者看看
Netty
(https://netty.io/index.html) 如果它适用于您的项目,最好。 》Netty是一个异步事件驱动的网络应用框架
用于快速开发可维护的高性能协议服务器和客户端。” 如果您的项目变得复杂,这将节省您的时间,否则一开始可能会有点矫枉过正(但随后会出现问题......)。
但是,如果您认为自己的代码或@john 的代码快要完成了,那么我将给出完成的建议:
- 不要使用
wait()
和 notify()
。也不要sleep()
。
- 为您的 "flow tracker" 使用单个线程(即〜待处理的消息缓存)。
你实际上不需要 3 个线程来处理待处理的消息,除非这个处理本身很慢(或者做繁重的事情),这里不是这种情况,因为你基本上是进行异步调用(就它的实际情况而言)异步..是吗?)。
反向路径相同:仅当实际处理是 slow/blocking 或繁重时,才对接收到的数据包处理使用执行程序服务(多线程)。
我根本不是 0MQ
方面的专家,但就 socket.send(...)
而言是线程安全和非阻塞的(我个人不确定 - 告诉我)以上内容建议应该是正确的,让事情变得更简单。
也就是说,严格回答你的问题:
Do we need separate bucket for all the sync calls just for acknowledgement and we dont retry from that bucket?
我会说不,因此您如何看待以下内容?根据您的代码并独立于我自己的感受,这似乎是可以接受的:
public class SendToQueue {
// ...
private final Map<Long, Boolean> transactions = new ConcurrentHashMap<>();
// ...
private void startTransaction(long address) {
this.transactions.put(address, Boolean.FALSE);
}
public void updateTransaction(long address) {
Boolean state = this.transactions.get(address);
if (state != null) {
this.transactions.put(address, Boolean.TRUE);
}
}
private void clearTransaction(long address) {
this.transactions.remove(address);
}
public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
boolean success = false;
// If address is enough randomized or atomically counted (then ok for parallel send())
startTransaction(address);
try {
boolean sent = sendAsync(address, encodedRecords, socket);
// if the record was sent successfully, then only sleep for timeout period
if (sent) {
// wait for acknowledgement
success = waitDoneUntil(new DoneCondition() {
@Override
public boolean isDone() {
return SendToQueue.this.transactions.get(address); // no NPE
}
}, 500, TimeUnit.MILLISECONDS);
if (success) {
// Message acknowledged!
}
}
} finally {
clearTransaction(address);
}
return success;
}
public static interface DoneCondition {
public boolean isDone();
}
/**
* WaitDoneUntil(Future f, int duration, TimeUnit unit). Note: includes a
* sleep(50).
*
* @param f Will block for this future done until maxWaitMillis
* @param waitTime Duration expressed in (time) unit.
* @param unit Time unit.
* @return DoneCondition finally met or not
*/
public static boolean waitDoneUntil(DoneCondition f, int waitTime, TimeUnit unit) {
long curMillis = 0;
long maxWaitMillis = unit.toMillis(waitTime);
while (!f.isDone() && curMillis < maxWaitMillis) {
try {
Thread.sleep(50); // define your step here accordingly or set as parameter
} catch (InterruptedException ex1) {
//logger.debug("waitDoneUntil() interrupted.");
break;
}
curMillis += 50L;
}
return f.isDone();
}
//...
}
public class ResponsePoller {
//...
public void onReceive(long address) { // sample prototype
// ...
SendToQueue.getInstance().updateTransaction(address);
// The interested sender will know that its transaction is complete.
// While subsequent (late) calls will have no effect.
}
}
我在下面使用 class 通过使用套接字以同步方式或异步方式将数据发送到我们的消息队列,如下所示。
sendAsync
- 它异步发送数据,没有任何超时。发送(on LINE A)
后,它会添加到retryHolder
存储桶中,这样如果未收到确认,它将从在构造函数中启动的后台线程再次重试。send
- 它在内部调用sendAsync
方法,然后休眠一段特定的超时时间,如果未收到确认,则它会从retryHolder
存储桶中删除,这样我们就不会再试一次。
因此,上述两种方法之间的唯一区别是——对于异步,我需要不惜一切代价重试,但对于同步,我不需要重试,但看起来它可能会被重试,因为我们共享同一个重试桶缓存和重试线程每 1 秒运行一次。
ResponsePoller
是一个 class,它接收发送到我们的消息队列的数据的确认,然后调用下面的 removeFromretryHolder
方法来删除地址,这样我们就不会t 收到确认后重试。
public class SendToQueue {
private final ExecutorService cleanupExecutor = Executors.newFixedThreadPool(5);
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
private final Cache<Long, byte[]> retryHolder =
CacheBuilder
.newBuilder()
.maximumSize(1000000)
.concurrencyLevel(100)
.removalListener(
RemovalListeners.asynchronous(new LoggingRemovalListener(), cleanupExecutor)).build();
private static class Holder {
private static final SendToQueue INSTANCE = new SendToQueue();
}
public static SendToQueue getInstance() {
return Holder.INSTANCE;
}
private SendToQueue() {
executorService.submit(new ResponsePoller()); // another thread which receives acknowledgement and then delete entry from the `retryHolder` cache accordingly.
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
// retry again
for (Entry<Long, byte[]> entry : retryHolder.asMap().entrySet()) {
sendAsync(entry.getKey(), entry.getValue());
}
}
}, 0, 1, TimeUnit.SECONDS);
}
public boolean sendAsync(final long address, final byte[] encodedRecords, final Socket socket) {
ZMsg msg = new ZMsg();
msg.add(encodedRecords);
// send data on a socket LINE A
boolean sent = msg.send(socket);
msg.destroy();
retryHolder.put(address, encodedRecords);
return sent;
}
public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
boolean sent = sendAsync(address, encodedRecords, socket);
// if the record was sent successfully, then only sleep for timeout period
if (sent) {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
// if key is not present, then acknowledgement was received successfully
sent = !retryHolder.asMap().containsKey(address);
// and key is still present in the cache, then it means acknowledgment was not received after
// waiting for timeout period, so we will remove it from cache.
if (!sent)
removeFromretryHolder(address);
return sent;
}
public void removeFromretryHolder(final long address) {
retryHolder.invalidate(address);
}
}
如果有人调用 send
方法但我们仍然需要知道是否收到确认,我们不重试的最佳方法是什么。唯一的问题是我根本不需要重试。
我们是否需要单独的存储桶来处理所有同步调用只是为了确认并且我们不从该存储桶重试?
该代码有许多潜在问题:
- 在致电
retryHolder#put
之前可能会收到答复。 - 重试消息时可能存在竞争条件。
- 如果将两封邮件发送到同一个地址,第二封会覆盖第一封吗?
- 发送总是浪费时间睡觉,改用
wait
+notify
。
我会存储具有更多状态的 class。它可以包含重试处理程序可以检查的标志 (retryIfNoAnswer
yes/no)。它可以使用 wait
/notify
提供 waitForAnswer
/markAnswerReceived
方法,这样 send 就不必在固定时间休眠。如果获得答案,waitForAnswer
方法可以 return 为真,超时则为假。在发送之前将对象放入重试处理程序并使用时间戳,以便仅重试超过特定年龄的消息。这修复了第一个竞争条件。
编辑:更新了下面的示例代码,使用您的代码编译,未测试:
public class SendToQueue {
private final ExecutorService cleanupExecutor = Executors.newFixedThreadPool(5);
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
// Not sure why you are using a cache rather than a standard ConcurrentHashMap?
private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
.concurrencyLevel(100)
.removalListener(RemovalListeners.asynchronous(new LoggingRemovalListener(), cleanupExecutor)).build();
private static class PendingMessage {
private final long _address;
private final byte[] _encodedRecords;
private final Socket _socket;
private final boolean _retryEnabled;
private final Object _monitor = new Object();
private long _sendTimeMillis;
private volatile boolean _acknowledged;
public PendingMessage(long address, byte[] encodedRecords, Socket socket, boolean retryEnabled) {
_address = address;
_sendTimeMillis = System.currentTimeMillis();
_encodedRecords = encodedRecords;
_socket = socket;
_retryEnabled = retryEnabled;
}
public synchronized boolean hasExpired() {
return System.currentTimeMillis() - _sendTimeMillis > 500L;
}
public synchronized void markResent() {
_sendTimeMillis = System.currentTimeMillis();
}
public boolean shouldRetry() {
return _retryEnabled && !_acknowledged;
}
public boolean waitForAck() {
try {
synchronized(_monitor) {
_monitor.wait(500L);
}
return _acknowledged;
}
catch (InterruptedException e) {
return false;
}
}
public void ackReceived() {
_acknowledged = true;
synchronized(_monitor) {
_monitor.notifyAll();
}
}
public long getAddress() {
return _address;
}
public byte[] getEncodedRecords() {
return _encodedRecords;
}
public Socket getSocket() {
return _socket;
}
}
private static class Holder {
private static final SendToQueue INSTANCE = new SendToQueue();
}
public static SendToQueue getInstance() {
return Holder.INSTANCE;
}
private void handleRetries() {
List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
for (PendingMessage m : messages) {
if (m.hasExpired()) {
if (m.shouldRetry()) {
m.markResent();
doSendAsync(m, m.getSocket());
}
else {
// Or leave the message and let send remove it
cache.invalidate(m.getAddress());
}
}
}
}
private SendToQueue() {
executorService.submit(new ResponsePoller()); // another thread which receives acknowledgement and then delete entry from the cache accordingly.
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
handleRetries();
}
}, 0, 1, TimeUnit.SECONDS);
}
public boolean sendAsync(final long address, final byte[] encodedRecords, final Socket socket) {
PendingMessage m = new PendingMessage(address, encodedRecords, socket, true);
cache.put(address, m);
return doSendAsync(m, socket);
}
private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
ZMsg msg = new ZMsg();
msg.add(pendingMessage.getEncodedRecords());
try {
// send data on a socket LINE A
return msg.send(socket);
}
finally {
msg.destroy();
}
}
public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
PendingMessage m = new PendingMessage(address, encodedRecords, socket, false);
cache.put(address, m);
try {
if (doSendAsync(m, socket)) {
return m.waitForAck();
}
return false;
}
finally {
// Alternatively (checks that address points to m):
// cache.asMap().remove(address, m);
cache.invalidate(address);
}
}
public void handleAckReceived(final long address) {
PendingMessage m = cache.getIfPresent(address);
if (m != null) {
m.ackReceived();
cache.invalidate(address);
}
}
}
并从 ResponsePoller
调用:
SendToQueue.getInstance().handleAckReceived(addressFrom);
设计方面:我觉得你正在尝试编写一个线程安全且有点高效的 NIO 消息 sender/receiver 但是(两者)我在这里看到的代码都不好,而且不会没有重要意义变化。最好的做法是:
- 充分利用
0MQ
框架。我在这里看到的东西和期望在ZMQ
和java.util.concurrent
API. 中实际上是开箱即用的
- 或者看看
Netty
(https://netty.io/index.html) 如果它适用于您的项目,最好。 》Netty是一个异步事件驱动的网络应用框架 用于快速开发可维护的高性能协议服务器和客户端。” 如果您的项目变得复杂,这将节省您的时间,否则一开始可能会有点矫枉过正(但随后会出现问题......)。
但是,如果您认为自己的代码或@john 的代码快要完成了,那么我将给出完成的建议:
- 不要使用
wait()
和notify()
。也不要sleep()
。 - 为您的 "flow tracker" 使用单个线程(即〜待处理的消息缓存)。
你实际上不需要 3 个线程来处理待处理的消息,除非这个处理本身很慢(或者做繁重的事情),这里不是这种情况,因为你基本上是进行异步调用(就它的实际情况而言)异步..是吗?)。
反向路径相同:仅当实际处理是 slow/blocking 或繁重时,才对接收到的数据包处理使用执行程序服务(多线程)。
我根本不是 0MQ
方面的专家,但就 socket.send(...)
而言是线程安全和非阻塞的(我个人不确定 - 告诉我)以上内容建议应该是正确的,让事情变得更简单。
也就是说,严格回答你的问题:
Do we need separate bucket for all the sync calls just for acknowledgement and we dont retry from that bucket?
我会说不,因此您如何看待以下内容?根据您的代码并独立于我自己的感受,这似乎是可以接受的:
public class SendToQueue {
// ...
private final Map<Long, Boolean> transactions = new ConcurrentHashMap<>();
// ...
private void startTransaction(long address) {
this.transactions.put(address, Boolean.FALSE);
}
public void updateTransaction(long address) {
Boolean state = this.transactions.get(address);
if (state != null) {
this.transactions.put(address, Boolean.TRUE);
}
}
private void clearTransaction(long address) {
this.transactions.remove(address);
}
public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
boolean success = false;
// If address is enough randomized or atomically counted (then ok for parallel send())
startTransaction(address);
try {
boolean sent = sendAsync(address, encodedRecords, socket);
// if the record was sent successfully, then only sleep for timeout period
if (sent) {
// wait for acknowledgement
success = waitDoneUntil(new DoneCondition() {
@Override
public boolean isDone() {
return SendToQueue.this.transactions.get(address); // no NPE
}
}, 500, TimeUnit.MILLISECONDS);
if (success) {
// Message acknowledged!
}
}
} finally {
clearTransaction(address);
}
return success;
}
public static interface DoneCondition {
public boolean isDone();
}
/**
* WaitDoneUntil(Future f, int duration, TimeUnit unit). Note: includes a
* sleep(50).
*
* @param f Will block for this future done until maxWaitMillis
* @param waitTime Duration expressed in (time) unit.
* @param unit Time unit.
* @return DoneCondition finally met or not
*/
public static boolean waitDoneUntil(DoneCondition f, int waitTime, TimeUnit unit) {
long curMillis = 0;
long maxWaitMillis = unit.toMillis(waitTime);
while (!f.isDone() && curMillis < maxWaitMillis) {
try {
Thread.sleep(50); // define your step here accordingly or set as parameter
} catch (InterruptedException ex1) {
//logger.debug("waitDoneUntil() interrupted.");
break;
}
curMillis += 50L;
}
return f.isDone();
}
//...
}
public class ResponsePoller {
//...
public void onReceive(long address) { // sample prototype
// ...
SendToQueue.getInstance().updateTransaction(address);
// The interested sender will know that its transaction is complete.
// While subsequent (late) calls will have no effect.
}
}