在 Java 中无法接收由 C# 发送到 acivemq 的消息
Unable to receive message sent to acivemq by C# in Java
我想做的是在 C# 应用程序和 Java 应用程序之间通过 Apache Activemq 发送消息。
C#:
using (IConnection connection = factory.CreateConnection())
using (ISession session = connection.CreateSession())
{
IDestination destination = SessionUtil.GetDestination(session, "queue://ISI");
// Create a consumer and producer
using (IMessageProducer producer = session.CreateProducer(destination))
{
// Start the connection so that messages will be processed.
connection.Start();
ITextMessage request = session.CreateTextMessage(JsonConvert.SerializeObject(obj));
/*request.NMSCorrelationID = "abc";
request.Properties["NMSXGroupID"] = "cheese";
request.Properties["myHeader"] = "Cheddar";*/
producer.Send(request);
return request;
}
}
Java:
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(isiProperties.getMqUrl());
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("ISI");
MessageConsumer consumer = session.createConsumer(destination);
Message message = consumer.receive();
if(message instanceof TextMessage) {
try {
String text = ((TextMessage) message).getText();
ObjectMapper mapper = new ObjectMapper();
StatusChangeMessage obj = mapper.readValue(text, StatusChangeMessage.class);
if (obj instanceof StatusChangeMessage) {
StatusChangeMessage received = (StatusChangeMessage) obj;
Order order = orderRepository.findOne(received.getOrderId());
order.setStatus(received.getStatus());
orderRepository.saveAndFlush(order);
}
} catch(JMSException e) {
} catch(IOException e) {
}
}
C# 应用程序正确发送消息(在 activemq 管理界面中可见)但没有活跃的订阅者(Java 应用程序应该这样做)。你看这里有什么问题吗?
基本上,if(message instanceof TextMessage) {
上的断点不会执行。
我终于找到了解决方案。这是两步问题。首先,Windows 防火墙影响了 activemq。其次,客户端库可能与服务器不完全匹配。将服务器降级到 5.8.0 后问题终于消失了。
我想做的是在 C# 应用程序和 Java 应用程序之间通过 Apache Activemq 发送消息。
C#:
using (IConnection connection = factory.CreateConnection())
using (ISession session = connection.CreateSession())
{
IDestination destination = SessionUtil.GetDestination(session, "queue://ISI");
// Create a consumer and producer
using (IMessageProducer producer = session.CreateProducer(destination))
{
// Start the connection so that messages will be processed.
connection.Start();
ITextMessage request = session.CreateTextMessage(JsonConvert.SerializeObject(obj));
/*request.NMSCorrelationID = "abc";
request.Properties["NMSXGroupID"] = "cheese";
request.Properties["myHeader"] = "Cheddar";*/
producer.Send(request);
return request;
}
}
Java:
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(isiProperties.getMqUrl());
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("ISI");
MessageConsumer consumer = session.createConsumer(destination);
Message message = consumer.receive();
if(message instanceof TextMessage) {
try {
String text = ((TextMessage) message).getText();
ObjectMapper mapper = new ObjectMapper();
StatusChangeMessage obj = mapper.readValue(text, StatusChangeMessage.class);
if (obj instanceof StatusChangeMessage) {
StatusChangeMessage received = (StatusChangeMessage) obj;
Order order = orderRepository.findOne(received.getOrderId());
order.setStatus(received.getStatus());
orderRepository.saveAndFlush(order);
}
} catch(JMSException e) {
} catch(IOException e) {
}
}
C# 应用程序正确发送消息(在 activemq 管理界面中可见)但没有活跃的订阅者(Java 应用程序应该这样做)。你看这里有什么问题吗?
基本上,if(message instanceof TextMessage) {
上的断点不会执行。
我终于找到了解决方案。这是两步问题。首先,Windows 防火墙影响了 activemq。其次,客户端库可能与服务器不完全匹配。将服务器降级到 5.8.0 后问题终于消失了。