无法从 Vert.x MQTT 服务器接收消息
Cannot receive messages from Vert.x MQTT Server
我正在尝试让一些基于 Paho 的客户端与 Vert.x MQTT 服务器一起工作。我正在尝试发布到我的接收客户端订阅的测试主题。我很难从我的客户发布者那里获得消息给我的客户订阅者。
结合网上看到的真实例子,我搭建了一个MQTT Broker。 Vert.x MQTT Broker 代码如下:
public class MQTTBroker
{
public MQTTBroker()
{
MqttServerOptions opts = new MqttServerOptions();
opts.setHost("localhost");
opts.setPort(1883);
MqttServer server = MqttServer.create(Vertx.vertx(),opts);
server.endpointHandler(endpoint -> {
System.out.println("MQTT client [" + endpoint.clientIdentifier() + "] request to connect, clean session = " + endpoint.isCleanSession());
endpoint.accept(false);
if("Test_Send".equals(endpoint.clientIdentifier()))
{
doPublish(endpoint);
handleClientDisconnect(endpoint);
}
else
{
handleSubscription(endpoint);
handleUnsubscription(endpoint);
handleClientDisconnect(endpoint);
}
}).listen(ar -> {
if (ar.succeeded()) {
System.out.println("MQTT server is listening on port " + ar.result().actualPort());
} else {
System.out.println("Error on starting the server");
ar.cause().printStackTrace();
}
});
}
protected void handleSubscription(MqttEndpoint endpoint) {
endpoint.subscribeHandler(subscribe -> {
List grantedQosLevels = new ArrayList < > ();
for (MqttTopicSubscription s: subscribe.topicSubscriptions()) {
System.out.println("Subscription for " + s.topicName() + " with QoS " + s.qualityOfService());
grantedQosLevels.add(s.qualityOfService());
}
endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQosLevels);
});
}
protected void handleUnsubscription(MqttEndpoint endpoint) {
endpoint.unsubscribeHandler(unsubscribe -> {
for (String t: unsubscribe.topics()) {
System.out.println("Unsubscription for " + t);
}
endpoint.unsubscribeAcknowledge(unsubscribe.messageId());
});
}
protected void publishHandler(MqttEndpoint endpoint) {
endpoint.publishHandler(message -> {
endpoint.publishAcknowledge(message.messageId());
}).publishReleaseHandler(messageId -> {
endpoint.publishComplete(messageId);
});
}
protected void handleClientDisconnect(MqttEndpoint endpoint) {
endpoint.disconnectHandler(h -> {
System.out.println("The remote client has closed the connection.");
});
}
protected void doPublish(MqttEndpoint endpoint) {
// just as example, publish a message with QoS level 2
endpoint.publish("Test_Topic",
Buffer.buffer("Hello from the Vert.x MQTT server"),
MqttQoS.EXACTLY_ONCE,
false,
false);
publishHandler(endpoint);
// specifing handlers for handling QoS 1 and 2
endpoint.publishAcknowledgeHandler(messageId -> {
System.out.println("Received ack for message = " + messageId);
}).publishReceivedHandler(messageId -> {
endpoint.publishRelease(messageId);
}).publishCompleteHandler(messageId -> {
System.out.println("Received ack for message = " + messageId);
});
}
}
我有一个 Paho 客户端,它应该从它订阅的主题接收消息。代码如下:
public class Receiver implements MqttCallback
{
public Receiver()
{
MqttClient client = null;
try
{
MemoryPersistence persist = new MemoryPersistence();
client = new MqttClient("tcp://localhost:1883",persist);
client.setCallback(this);
client.connect();
client.subscribe("Test_Topic");
System.out.println("The receiver is initialized.");
}
catch(Exception exe)
{
exe.printStackTrace();
}
}
@Override
public void connectionLost(Throwable arg0)
{
System.out.println("Connection is lost!");
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0)
{
System.out.println("Delivered!");
}
@Override
public void messageArrived(String theStr, MqttMessage theMsg)
{
System.out.println("Message from: "+theStr);
try
{
String str = new String(theMsg.getPayload());
System.out.println("Message is: "+str);
}
catch(Exception exe)
{
exe.printStackTrace();
}
}
}
订阅者和发布者是本地的,但一旦我部署了我的应用程序,它们就可以(并且将会)远离 Broker。我用来发布的基于 Paho 的代码如下:
public void publish(String theMsg)
{
MqttClient nwclient = null;
try
{
ServConfigurator serv = ServConfigurator.getInstance();
MemoryPersistence persist = new MemoryPersistence();
String url = "tcp://localhost:1883";
nwclient = new MqttClient(url,"Test_Send",persist);
MqttConnectOptions option = new MqttConnectOptions();
option.setCleanSession(true);
nwclient.connect(option);
MqttMessage message = new MqttMessage(theMsg.getBytes());
message.setQos(2);
nwclient.publish("Test_Topic",message);
nwclient.disconnect();
nwclient.close();
}
catch(Exception exe)
{
exe.printStackTrace();
}
}
请注意,我正在从 Broker 发布一个硬编码字符串(用于测试目的)(如 doPublish 方法所示),但应该注意的是,我正在捕获发布者客户端在我的端点处理程序中发布的尝试.不幸的是,虽然我可以毫无问题地将订阅者和发布者连接到代理,但由于某种原因,消息没有到达订阅者。我尝试了各种方法,但是当发布者客户端实际发布到 Broker 时,由于某种原因,Broker 的发布无法将字符串发送给订阅者。
我很确定我在这里遗漏了一些东西,但我想不出它可能是什么。任何人都可以帮助我让它工作吗???
在此先感谢您的帮助或见解。
当 MQTT 服务器收到来自发布者的消息以便调用 endpoint.publishHandler 将消息传递给它时,我在您的代码中没有看到任何获取主题和搜索订阅者的逻辑(端点)为该主题注册并向他们发送消息。
同时我没有看到您的代码以某种方式保存订阅者端点和订阅主题之间的引用以进行上述研究。
请记住,MQTT 服务器不是代理,它不会为您处理有关主题的订阅客户端列表;你可以在它上面构建一个代理,这应该是你正在尝试做的事情?
我正在尝试让一些基于 Paho 的客户端与 Vert.x MQTT 服务器一起工作。我正在尝试发布到我的接收客户端订阅的测试主题。我很难从我的客户发布者那里获得消息给我的客户订阅者。
结合网上看到的真实例子,我搭建了一个MQTT Broker。 Vert.x MQTT Broker 代码如下:
public class MQTTBroker
{
public MQTTBroker()
{
MqttServerOptions opts = new MqttServerOptions();
opts.setHost("localhost");
opts.setPort(1883);
MqttServer server = MqttServer.create(Vertx.vertx(),opts);
server.endpointHandler(endpoint -> {
System.out.println("MQTT client [" + endpoint.clientIdentifier() + "] request to connect, clean session = " + endpoint.isCleanSession());
endpoint.accept(false);
if("Test_Send".equals(endpoint.clientIdentifier()))
{
doPublish(endpoint);
handleClientDisconnect(endpoint);
}
else
{
handleSubscription(endpoint);
handleUnsubscription(endpoint);
handleClientDisconnect(endpoint);
}
}).listen(ar -> {
if (ar.succeeded()) {
System.out.println("MQTT server is listening on port " + ar.result().actualPort());
} else {
System.out.println("Error on starting the server");
ar.cause().printStackTrace();
}
});
}
protected void handleSubscription(MqttEndpoint endpoint) {
endpoint.subscribeHandler(subscribe -> {
List grantedQosLevels = new ArrayList < > ();
for (MqttTopicSubscription s: subscribe.topicSubscriptions()) {
System.out.println("Subscription for " + s.topicName() + " with QoS " + s.qualityOfService());
grantedQosLevels.add(s.qualityOfService());
}
endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQosLevels);
});
}
protected void handleUnsubscription(MqttEndpoint endpoint) {
endpoint.unsubscribeHandler(unsubscribe -> {
for (String t: unsubscribe.topics()) {
System.out.println("Unsubscription for " + t);
}
endpoint.unsubscribeAcknowledge(unsubscribe.messageId());
});
}
protected void publishHandler(MqttEndpoint endpoint) {
endpoint.publishHandler(message -> {
endpoint.publishAcknowledge(message.messageId());
}).publishReleaseHandler(messageId -> {
endpoint.publishComplete(messageId);
});
}
protected void handleClientDisconnect(MqttEndpoint endpoint) {
endpoint.disconnectHandler(h -> {
System.out.println("The remote client has closed the connection.");
});
}
protected void doPublish(MqttEndpoint endpoint) {
// just as example, publish a message with QoS level 2
endpoint.publish("Test_Topic",
Buffer.buffer("Hello from the Vert.x MQTT server"),
MqttQoS.EXACTLY_ONCE,
false,
false);
publishHandler(endpoint);
// specifing handlers for handling QoS 1 and 2
endpoint.publishAcknowledgeHandler(messageId -> {
System.out.println("Received ack for message = " + messageId);
}).publishReceivedHandler(messageId -> {
endpoint.publishRelease(messageId);
}).publishCompleteHandler(messageId -> {
System.out.println("Received ack for message = " + messageId);
});
}
}
我有一个 Paho 客户端,它应该从它订阅的主题接收消息。代码如下:
public class Receiver implements MqttCallback
{
public Receiver()
{
MqttClient client = null;
try
{
MemoryPersistence persist = new MemoryPersistence();
client = new MqttClient("tcp://localhost:1883",persist);
client.setCallback(this);
client.connect();
client.subscribe("Test_Topic");
System.out.println("The receiver is initialized.");
}
catch(Exception exe)
{
exe.printStackTrace();
}
}
@Override
public void connectionLost(Throwable arg0)
{
System.out.println("Connection is lost!");
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0)
{
System.out.println("Delivered!");
}
@Override
public void messageArrived(String theStr, MqttMessage theMsg)
{
System.out.println("Message from: "+theStr);
try
{
String str = new String(theMsg.getPayload());
System.out.println("Message is: "+str);
}
catch(Exception exe)
{
exe.printStackTrace();
}
}
}
订阅者和发布者是本地的,但一旦我部署了我的应用程序,它们就可以(并且将会)远离 Broker。我用来发布的基于 Paho 的代码如下:
public void publish(String theMsg)
{
MqttClient nwclient = null;
try
{
ServConfigurator serv = ServConfigurator.getInstance();
MemoryPersistence persist = new MemoryPersistence();
String url = "tcp://localhost:1883";
nwclient = new MqttClient(url,"Test_Send",persist);
MqttConnectOptions option = new MqttConnectOptions();
option.setCleanSession(true);
nwclient.connect(option);
MqttMessage message = new MqttMessage(theMsg.getBytes());
message.setQos(2);
nwclient.publish("Test_Topic",message);
nwclient.disconnect();
nwclient.close();
}
catch(Exception exe)
{
exe.printStackTrace();
}
}
请注意,我正在从 Broker 发布一个硬编码字符串(用于测试目的)(如 doPublish 方法所示),但应该注意的是,我正在捕获发布者客户端在我的端点处理程序中发布的尝试.不幸的是,虽然我可以毫无问题地将订阅者和发布者连接到代理,但由于某种原因,消息没有到达订阅者。我尝试了各种方法,但是当发布者客户端实际发布到 Broker 时,由于某种原因,Broker 的发布无法将字符串发送给订阅者。
我很确定我在这里遗漏了一些东西,但我想不出它可能是什么。任何人都可以帮助我让它工作吗???
在此先感谢您的帮助或见解。
当 MQTT 服务器收到来自发布者的消息以便调用 endpoint.publishHandler 将消息传递给它时,我在您的代码中没有看到任何获取主题和搜索订阅者的逻辑(端点)为该主题注册并向他们发送消息。 同时我没有看到您的代码以某种方式保存订阅者端点和订阅主题之间的引用以进行上述研究。 请记住,MQTT 服务器不是代理,它不会为您处理有关主题的订阅客户端列表;你可以在它上面构建一个代理,这应该是你正在尝试做的事情?