使用 CACHE_CONSUMER 调用 ActiveMQConnection 清理和更改用户信息是否会影响 DefaultMessageListenerContainer
Does calling ActiveMQConnection cleanup and changeuserinfo impact DefaultMessageListenerContainer with CACHE_CONSUMER
这里有一些背景信息:我已经设置了一个 ActiveMQBroker(版本 5.13.1)和一个 BrokerPlugin,它接受一个 requestId(只是一个 UUID,用于跨不同服务器跟踪请求)和 OAuth 令牌(更多关于下面的 OAuth 令牌) 分别在 org.apache.activemq.command.ConnectionInfo 的 'username' 和 'password' 字段中……它基于 this great post。在 client/consumer 方面,我用 cacheLeve=CACHE_CONSUMER 将 ActiveMQConnection 包装在 Spring DefaultMessageListenerContainer(版本 4.2.4-RELEASE)中(因此它还会缓存 Session 和 Consumer与连接)。
问题是客户端的 OAuth 令牌每 20 分钟过期一次,因此我在客户端设置了一个 ScheduledExecutorService 以每 18 分钟刷新一次 OAuth 令牌。
我的问题是,如果我在客户端的计划任务调用 ActiveMQConnection#cleanup() 后跟 ActiveMQConnection#changeUserInfo(newRequestId, newAuthToken) ... 是否会对 spring 的 DefaultMessageListenerContainer 产生负面影响坚持同一个 ActiveMQConnection?或另一种询问方式,我的客户端代码是否有 "right" 方式在 ActiveMQConnection 中设置新的 "username" 和 "password" 字段,而不会弄乱 DefaultMessageListenerContainer 中的任何内容?我特别关心任何多线程问题,因为 DefaultMessageListenerContainer 有多个消费者线程......当然,我的 ScheduledExecutorService 运行 它是自己的线程,用于将 OAuth 令牌更新到 ActiveMQConnection。
是否足以扩展 DefaultMessageListenerContainer 并将我对 OAuth 令牌的更新包装在 'sharedConnectionMonitor' 的同步块中,例如以下内容是否必要 and/or 就足够了:
public class OAuthMessageListenerContainer extends DefaultMessageListenerContainer {
public void updateOAuthToken(String requestId, String authToken) throws JMSException {
synchronized(sharedConnectionMonitor) {
ActiveMQConnection amqConn = (ActiveMQConnection)getSharedConnection();
amqConn.doCleanup(true);
amqConn.changeUserInfo(requestId, authToken);
}
}
}
FWIW 这是我解决问题的方法:
1) 我首先必须将 ActiveMQ 从 5.13.0 升级到 5.13.3,因为在 t运行运动被打断,例如网络故障,服务器重启,zookeeper 选择一个新的 "master" 代理(我们在服务器上使用复制的 leveldb 和 3 个代理)
2) 在解决了 ActiveMQ 中的线程死锁之后,我意识到更改 ActiveMQConnection 上的用户名和密码基本上会重置整个连接...这意味着 DMLC 已重置...所以我创建了一个子类当 t运行sport 被中断时,我可以从连接上的 T运行sportListener 重置 DMLC ...但随后 运行 进入 DMLC 中的另一个线程死锁...并发现this answer 帮助我过去了。
DMLC的子类只需要这个方法:
public void restart() {
try {
shutdown();
initialize();
start();
}
catch (Exception e) {
LOG.error("Could not restart DMLC", e);
}
}
t运行体育听众需要这样做:
@Override
public void transportInterrupted() {
dmlc.restart();
}
这里有一些背景信息:我已经设置了一个 ActiveMQBroker(版本 5.13.1)和一个 BrokerPlugin,它接受一个 requestId(只是一个 UUID,用于跨不同服务器跟踪请求)和 OAuth 令牌(更多关于下面的 OAuth 令牌) 分别在 org.apache.activemq.command.ConnectionInfo 的 'username' 和 'password' 字段中……它基于 this great post。在 client/consumer 方面,我用 cacheLeve=CACHE_CONSUMER 将 ActiveMQConnection 包装在 Spring DefaultMessageListenerContainer(版本 4.2.4-RELEASE)中(因此它还会缓存 Session 和 Consumer与连接)。
问题是客户端的 OAuth 令牌每 20 分钟过期一次,因此我在客户端设置了一个 ScheduledExecutorService 以每 18 分钟刷新一次 OAuth 令牌。
我的问题是,如果我在客户端的计划任务调用 ActiveMQConnection#cleanup() 后跟 ActiveMQConnection#changeUserInfo(newRequestId, newAuthToken) ... 是否会对 spring 的 DefaultMessageListenerContainer 产生负面影响坚持同一个 ActiveMQConnection?或另一种询问方式,我的客户端代码是否有 "right" 方式在 ActiveMQConnection 中设置新的 "username" 和 "password" 字段,而不会弄乱 DefaultMessageListenerContainer 中的任何内容?我特别关心任何多线程问题,因为 DefaultMessageListenerContainer 有多个消费者线程......当然,我的 ScheduledExecutorService 运行 它是自己的线程,用于将 OAuth 令牌更新到 ActiveMQConnection。
是否足以扩展 DefaultMessageListenerContainer 并将我对 OAuth 令牌的更新包装在 'sharedConnectionMonitor' 的同步块中,例如以下内容是否必要 and/or 就足够了:
public class OAuthMessageListenerContainer extends DefaultMessageListenerContainer {
public void updateOAuthToken(String requestId, String authToken) throws JMSException {
synchronized(sharedConnectionMonitor) {
ActiveMQConnection amqConn = (ActiveMQConnection)getSharedConnection();
amqConn.doCleanup(true);
amqConn.changeUserInfo(requestId, authToken);
}
}
}
FWIW 这是我解决问题的方法:
1) 我首先必须将 ActiveMQ 从 5.13.0 升级到 5.13.3,因为在 t运行运动被打断,例如网络故障,服务器重启,zookeeper 选择一个新的 "master" 代理(我们在服务器上使用复制的 leveldb 和 3 个代理)
2) 在解决了 ActiveMQ 中的线程死锁之后,我意识到更改 ActiveMQConnection 上的用户名和密码基本上会重置整个连接...这意味着 DMLC 已重置...所以我创建了一个子类当 t运行sport 被中断时,我可以从连接上的 T运行sportListener 重置 DMLC ...但随后 运行 进入 DMLC 中的另一个线程死锁...并发现this answer 帮助我过去了。
DMLC的子类只需要这个方法:
public void restart() {
try {
shutdown();
initialize();
start();
}
catch (Exception e) {
LOG.error("Could not restart DMLC", e);
}
}
t运行体育听众需要这样做:
@Override
public void transportInterrupted() {
dmlc.restart();
}