使用 MQTT.js 和 Mosquitto 发布和订阅 MQTT 主题
Publish and subscribe to MQTT topics using MQTT.js and Mosquitto
我正在尝试使用 mosquitto(在 VM sudo apt-get install mosquitto
内)使用 node.js 和这个 mqtt.js
异步库来 publish/subscribe MQTT 消息:https://github.com/mqttjs/async-mqtt
在我的本地 PC 上使用 sudo apt-get install mosquitto-clients
安装 mosquitto CLI publisher/subscriber 客户端后,我知道它们可以工作,因为我可以使用这些命令成功监控 publisher/subscriber 会话:
- 订阅者:
mosquitto_sub -h ${MY_VM_IP_ADDRESS} -p 1883 -t "readings" -v -d
- 出版商:
mosquitto_pub -h ${MY_VM_IP_ADDRESS} -p 1883 -t "readings" -i foo001 -m '{"deviceId":"foo001","fooMetric":42.42}' -d
我可以看到消息从发布者发送到订阅者,但是 当开始使用 Node.js 发送消息时,我在订阅者 CLI 会话中再也看不到它了。
- 下面的 javascript 代码有什么问题?
- 如何使用
Mqtt.js
识别具有 ID 的发布者?
- 订户如何按此 ID 进行过滤?
我假设具有不同 ID 的多个发布者可以向同一主题发送消息,并且多个订阅者可以通过来自同一主题的 ID 进行过滤。我认为这是可能的,但也许以下代码不起作用的部分原因是我需要妥善处理组合 ID/topic?
这是我执行的 mocha
规范,试图发送读数
const MQTT = require("async-mqtt");
const consoleLogError = (err) => {
if (err.response) {
console.error(`${new Date().toISOString()} HTTP response error, ${err.response.status}: ${err.response.statusText}`);
} else {
console.error(`${new Date().toISOString()} No HTTP error, the stack: ${new Error(err).stack}`);
}
};
const consoleLog = (msg) => {
console.log(`${new Date().toISOString()} ${msg}`);
};
// {"fooMetric":42.42, "created_at":"2018-12-24T10:42:08.057Z"}
const generateReadingMsg = () => {
const now = new Date();
const msg = {
"fooMetric": 42.42,
"created_at": now.toISOString()
};
consoleLog(`New generated reading: ${JSON.stringify(msg)}`);
return msg;
};
const mqttSession = {};
mqttSession.asyncInit = (hostPort, deviceId, mqttTopic) => {
return new Promise((resolve, reject) => {
mqttSession.mqttTopic = mqttTopic;
mqttSession.client = MQTT.connect(`mqtts://${hostPort}`, {
keepalive: 10,
clientId: deviceId,
protocolId: 'MQTT',
clean: false,
protocolVersion: 4,
reconnectPeriod: 1000,
connectTimeout: 30 * 1000,
rejectUnauthorized: false,
});
return resolve();
});
};
mqttSession._send = (msgStr) => {
return Promise.resolve()
.then(() => {
return mqttSession.client.publish(mqttSession.mqttTopic, msgStr);
})
.then(() => {
return mqttSession.client.end();
})
.catch((err) => {
consoleLogError(err);
throw err;
});
}
mqttSession.asyncSend = (msgJson) => {
const msgStr = JSON.stringify(msgJson);
return Promise.resolve()
.then(() => {
mqttSession.client.on("connect", () => {
return mqttSession._send(msgStr);
});
})
.catch((err) => {
consoleLogError(err);
throw err;
});
};
describe.only('MQTT readings', () => {
// for the IP address check the VM details
const vm_ip = "xxx.xxx.xxx.xxx";
beforeEach(() => {
return Promise.all([
mqttSession.asyncInit(`${vm_ip}:1883`, "fooId", "readings")
]);
});
it('should send a reading to the MQTT broker', () => {
console.log(`TODO run "mosquitto_sub -h ${vm_ip} -p 1883 -t "readings" -v -d"`);
console.log(`The following MQTT-send should be equivalent to: "mosquitto_pub -h ${vm_ip} -p 1883 -t "readings" -i foo001 -m '{"deviceId":"foo001","fooMetric":42.42}' -d"`)
return mqttSession.asyncSend(generateReadingMsg())
.then(stuff => {
console.log(`returned stuff from the MQTT session: ${stuff}`);
return Promise.resolve();
})
.catch(error => {
consoleLogError(error);
throw error;
});
});
});
首先,您可以不 识别哪个客户端在 MQTT * 协议级别发布了关于某个主题的给定消息。该信息在任何协议级别信息中都不存在。如果您需要该信息,则需要将其包含在您发送的消息的有效负载中,并在消息传递后对其进行过滤。
至于代码,您正在尝试使用 mqtts://
连接到安全的 MQTT 代理
mqttSession.client = MQTT.connect(`mqtts://${hostPort}`, {
除非您在 VM 中专门配置了 Mosquitto,否则它将是 运行 端口 1883 上的正常不安全 MQTT
如果您删除 s
代码对我的代理运行良好。
mqttSession.client = MQTT.connect(`mqtt://${hostPort}`, {
* 这个 MQTT v3.x,使用新的 MQTT v5.0 规范,可以选择添加额外的元数据,但同样您将无法在订阅时过滤,仅在消息传递后过滤。
我正在尝试使用 mosquitto(在 VM sudo apt-get install mosquitto
内)使用 node.js 和这个 mqtt.js
异步库来 publish/subscribe MQTT 消息:https://github.com/mqttjs/async-mqtt
在我的本地 PC 上使用 sudo apt-get install mosquitto-clients
安装 mosquitto CLI publisher/subscriber 客户端后,我知道它们可以工作,因为我可以使用这些命令成功监控 publisher/subscriber 会话:
- 订阅者:
mosquitto_sub -h ${MY_VM_IP_ADDRESS} -p 1883 -t "readings" -v -d
- 出版商:
mosquitto_pub -h ${MY_VM_IP_ADDRESS} -p 1883 -t "readings" -i foo001 -m '{"deviceId":"foo001","fooMetric":42.42}' -d
我可以看到消息从发布者发送到订阅者,但是 当开始使用 Node.js 发送消息时,我在订阅者 CLI 会话中再也看不到它了。
- 下面的 javascript 代码有什么问题?
- 如何使用
Mqtt.js
识别具有 ID 的发布者? - 订户如何按此 ID 进行过滤?
我假设具有不同 ID 的多个发布者可以向同一主题发送消息,并且多个订阅者可以通过来自同一主题的 ID 进行过滤。我认为这是可能的,但也许以下代码不起作用的部分原因是我需要妥善处理组合 ID/topic?
这是我执行的 mocha
规范,试图发送读数
const MQTT = require("async-mqtt");
const consoleLogError = (err) => {
if (err.response) {
console.error(`${new Date().toISOString()} HTTP response error, ${err.response.status}: ${err.response.statusText}`);
} else {
console.error(`${new Date().toISOString()} No HTTP error, the stack: ${new Error(err).stack}`);
}
};
const consoleLog = (msg) => {
console.log(`${new Date().toISOString()} ${msg}`);
};
// {"fooMetric":42.42, "created_at":"2018-12-24T10:42:08.057Z"}
const generateReadingMsg = () => {
const now = new Date();
const msg = {
"fooMetric": 42.42,
"created_at": now.toISOString()
};
consoleLog(`New generated reading: ${JSON.stringify(msg)}`);
return msg;
};
const mqttSession = {};
mqttSession.asyncInit = (hostPort, deviceId, mqttTopic) => {
return new Promise((resolve, reject) => {
mqttSession.mqttTopic = mqttTopic;
mqttSession.client = MQTT.connect(`mqtts://${hostPort}`, {
keepalive: 10,
clientId: deviceId,
protocolId: 'MQTT',
clean: false,
protocolVersion: 4,
reconnectPeriod: 1000,
connectTimeout: 30 * 1000,
rejectUnauthorized: false,
});
return resolve();
});
};
mqttSession._send = (msgStr) => {
return Promise.resolve()
.then(() => {
return mqttSession.client.publish(mqttSession.mqttTopic, msgStr);
})
.then(() => {
return mqttSession.client.end();
})
.catch((err) => {
consoleLogError(err);
throw err;
});
}
mqttSession.asyncSend = (msgJson) => {
const msgStr = JSON.stringify(msgJson);
return Promise.resolve()
.then(() => {
mqttSession.client.on("connect", () => {
return mqttSession._send(msgStr);
});
})
.catch((err) => {
consoleLogError(err);
throw err;
});
};
describe.only('MQTT readings', () => {
// for the IP address check the VM details
const vm_ip = "xxx.xxx.xxx.xxx";
beforeEach(() => {
return Promise.all([
mqttSession.asyncInit(`${vm_ip}:1883`, "fooId", "readings")
]);
});
it('should send a reading to the MQTT broker', () => {
console.log(`TODO run "mosquitto_sub -h ${vm_ip} -p 1883 -t "readings" -v -d"`);
console.log(`The following MQTT-send should be equivalent to: "mosquitto_pub -h ${vm_ip} -p 1883 -t "readings" -i foo001 -m '{"deviceId":"foo001","fooMetric":42.42}' -d"`)
return mqttSession.asyncSend(generateReadingMsg())
.then(stuff => {
console.log(`returned stuff from the MQTT session: ${stuff}`);
return Promise.resolve();
})
.catch(error => {
consoleLogError(error);
throw error;
});
});
});
首先,您可以不 识别哪个客户端在 MQTT * 协议级别发布了关于某个主题的给定消息。该信息在任何协议级别信息中都不存在。如果您需要该信息,则需要将其包含在您发送的消息的有效负载中,并在消息传递后对其进行过滤。
至于代码,您正在尝试使用 mqtts://
mqttSession.client = MQTT.connect(`mqtts://${hostPort}`, {
除非您在 VM 中专门配置了 Mosquitto,否则它将是 运行 端口 1883 上的正常不安全 MQTT
如果您删除 s
代码对我的代理运行良好。
mqttSession.client = MQTT.connect(`mqtt://${hostPort}`, {
* 这个 MQTT v3.x,使用新的 MQTT v5.0 规范,可以选择添加额外的元数据,但同样您将无法在订阅时过滤,仅在消息传递后过滤。