在不使用堆栈的情况下创建瘦 MQTT 发布器(操作方法)
Creating a thin MQTT publisher without using a stack (How To)
我之前对 Google 的研究让我进入了这个页面:https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
在这里我找到了一些信息……我开始尝试。测试装置针对当地的蚊子。到目前为止这是有效的(它运行没有错误),但是如果我发送 PUBLISH 消息,我会与蚊子断开连接。
[edit] 所以最大的问题是,消息有什么问题。
short summary as far as I understood:
[|] a Byte with High and Low 4 bits
[[|][|]] a word
{} a complex Byte data
Message for connect:
[1|0][0|0] … connect=1 , 0 length since no data
Message for publish:
[3,0]{LEN}{DATA} ...
with {LEN}=[0xxx|xxxx] with bit7=0 … for a len <127 Bytes
or {LEN}=[1xxx|xxxx][0xxx|xxxx] with Byte1 Bit7=1 … for a len >127 Bytes and <128^2
or {LEN}=[1xxx|xxxx][1xxx|xxxx][0xxx|xxx] with Byte1+2 Bit7=1 … for a len >127^2 Bytes and <128^3
with {data}=[[][]]{topic}{value}
[[][]] = Len of topic
{Topic}=UTF8 string (e.g. 'demo/target') as Byte
{value}=UTF8 string (e.g. a json or a simple text like'Hello world.') as byte
您可能想知道我到底为什么要对实现进行硬编码……这是为了理解功能。我喜欢用一种特殊的语言在机器人系统中实现发布,它允许使用带有连接/断开连接/发送和接收命令的套接字到给定的地址和端口。因此我尝试让它在 c# 中工作以采用机器人语言。
在这里您可以找到当前代码:
using System;
using System.ComponentModel;
using System.Net;
using System.Net.Sockets;
using System.Threading;
namespace TestSender
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Hello World!");
Socket sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
System.Net.IPAddress ipAdd = System.Net.IPAddress.Parse("127.0.0.1");
System.Net.IPEndPoint remoteEP = new IPEndPoint(ipAdd, 1883);
Console.WriteLine(sock.Connected.ToString());
sock.Connect(remoteEP);
Console.WriteLine(sock.Connected.ToString());
/*
* control flags byte
* 0 ...RETAIN 0/1 -> 0 only PUBLISH
* 1+2 ...QOS 0,1,2 --> 0 only PUBLISH
* 3 ...DUPe 0/1 --> 0 only PUBLISH
*/
/* control messagetype byte 4...7 (4=low)
0 Reserved 0 Forbidden Reserved
1 CONNECT 1 Client to Server Client request to connect to Server
2 CONNACK 2 Server to Client Connect acknowledgment
3 PUBLISH 3 Client to Server
or Server to Client Publish message
4 PUBACK 4 Client to Server
or Server to Client Publish acknowledgment
5 PUBREC 5 Client to Server
or Server to Client Publish received(assured delivery part 1)
6 PUBREL 6 Client to Server
or Server to Client Publish release(assured delivery part 2)
7 PUBCOMP 7 Client to Server
or Server to Client Publish complete(assured delivery part 3)
8 SUBSCRIBE 8 Client to Server Client subscribe request
9 SUBACK 9 Server to Client Subscribe acknowledgment
A UNSUBSCRIBE 10 Client to Server Unsubscribe request
B UNSUBACK 11 Server to Client Unsubscribe acknowledgment
C PINGREQ 12 Client to Server PING request
D PINGRESP 13 Server to Client PING response
E DISCONNECT 14 Client to Server Client is disconnecting
F Reserved 15 Forbidden Reserved
*/
byte bFlagConenct = 0;
byte bFlagPublish = 0; //no dupe, QOS (at most once = 0), no retain
byte bMessageConnect = 1;
byte bMessageDisConnect = 14;
byte bMessagePublish = 3;
byte bMessageHeaderConnect = Convert.ToByte( 16 * bMessageConnect + bFlagConenct); //10h
byte bMessageHeaderPublish = Convert.ToByte(16 * bMessagePublish + bFlagPublish); //30h
byte bMessageHeaderDisconnect= Convert.ToByte(16 * bMessageDisConnect+ bFlagConenct); //E0h
byte[] byAdress = System.Text.Encoding.ASCII.GetBytes(@"demo/Target");
byte[] byData = System.Text.Encoding.ASCII.GetBytes("{OFF}");
int lenAdress = byAdress.Length;
int lenData = byData.Length;
//get short int, unsinged
ushort sLenAdress = Convert.ToUInt16(lenAdress);
ushort sLenAllData = Convert.ToUInt16(lenAdress+lenData+2);
byte[] byLenTopic = BitConverter.GetBytes(sLenAdress); //High-Low
Array.Reverse(byLenTopic); //High-Low swap
//convert 256 based ushort to 128based ushort
ushort sLenMQTT_128Base=0;
if (sLenAllData>128)
{
//2nd byte needed ... add 128*256
sLenMQTT_128Base = Convert.ToUInt16( sLenAllData + 128 * 256);
}
else
{
//only one byte
sLenMQTT_128Base = sLenAllData;
}
byte[] byLenMQTT;
if (sLenMQTT_128Base > 256)
{
byLenMQTT = BitConverter.GetBytes(sLenMQTT_128Base); //High-Low
}
else
{
byte bLenMQTT = Convert.ToByte(sLenMQTT_128Base);
byLenMQTT = new byte[1]; byLenMQTT[0]= bLenMQTT; //High-Low
}
byte[] BytesData_MQTT = new byte[1+byLenMQTT.Length + byLenTopic.Length + byAdress.Length + byData.Length];
//set message
BytesData_MQTT[0]= bMessageHeaderPublish;
//copy 128Base Len
Array.Copy(byLenMQTT, 0, BytesData_MQTT, 1, byLenMQTT.Length);
//copy Len of Topic
Array.Copy(byLenTopic, 0, BytesData_MQTT, 1+ byLenMQTT.Length, byLenTopic.Length);
//copy Topic
Array.Copy(byAdress, 0, BytesData_MQTT, 1 + byLenMQTT.Length + byLenTopic.Length, byAdress.Length);
//copy data
Array.Copy(byData, 0, BytesData_MQTT, 1 + byLenMQTT.Length + byLenTopic.Length + byAdress.Length, byData.Length);
/*
* Paket:
* Byte1: MessageHeader
* Byte2-x: length of message (all data, using 128 Encoding (High Byte first), 1 Byte len<128,2 (or more) Byte if bigger, bit #7 in a Byte indicating that another byte follows)
* Content: 2 Byte: Länge des Topics (H,L, Encoding 256)
* followed by: Topic (UTF8)
* followed by: Values to be send (UTF8)
*/
byte[] BytesCon_MQTT = new byte[12];
//2byte fixed header
BytesCon_MQTT[0] = bMessageHeaderConnect;
BytesCon_MQTT[1] = 10;
//10+byte variable header
BytesCon_MQTT[2] = 0;
BytesCon_MQTT[3] = 4;
BytesCon_MQTT[4] = System.Text.Encoding.ASCII.GetBytes("M")[0];
BytesCon_MQTT[5] = System.Text.Encoding.ASCII.GetBytes("Q")[0];
BytesCon_MQTT[6] = System.Text.Encoding.ASCII.GetBytes("T")[0];
BytesCon_MQTT[7] = System.Text.Encoding.ASCII.GetBytes("T")[0];
BytesCon_MQTT[8] = 4; //protocoll level 0x04
BytesCon_MQTT[9] = 0; //no user, no pw, no retain, no will qos or flag, no clean
BytesCon_MQTT[10] = 0; //HighByte Keep Alive MSB 0=disabled
BytesCon_MQTT[11] = 0; //Low Byte Keep Alive MSB 0=disabled'
byte[] BytesDisCon_MQTT = new byte[2];
BytesDisCon_MQTT[0] = bMessageHeaderDisconnect;
BytesDisCon_MQTT[1] = 0;
byte[] byInbound=new byte[1000];
sock.Send(BytesCon_MQTT);
Thread.Sleep(1);
sock.Receive(byInbound);
sock.Send(BytesData_MQTT);
Thread.Sleep(1);
sock.Receive(byInbound);
sock.Send(BytesDisCon_MQTT);
Thread.Sleep(1);
sock.Receive(byInbound);
Console.WriteLine(sock.Connected.ToString());
sock.Disconnect(false);
sock.Close();
Console.WriteLine(sock.Connected.ToString());
}
}
}
您还没有发送完整的连接数据包。
连接数据包不只是 2 个字节,它是固定的 2 个字节 header,然后是变量 header,然后是有效负载。
一个最小的全连接数据包header是12字节长,然后它后面需要跟payload。
即使您不打算使用现有的 MQTT 客户端库实现,我也建议您阅读一个并将其与规范进行比较。
我之前对 Google 的研究让我进入了这个页面:https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html 在这里我找到了一些信息……我开始尝试。测试装置针对当地的蚊子。到目前为止这是有效的(它运行没有错误),但是如果我发送 PUBLISH 消息,我会与蚊子断开连接。
[edit] 所以最大的问题是,消息有什么问题。
short summary as far as I understood:
[|] a Byte with High and Low 4 bits
[[|][|]] a word
{} a complex Byte data
Message for connect:
[1|0][0|0] … connect=1 , 0 length since no data
Message for publish:
[3,0]{LEN}{DATA} ...
with {LEN}=[0xxx|xxxx] with bit7=0 … for a len <127 Bytes
or {LEN}=[1xxx|xxxx][0xxx|xxxx] with Byte1 Bit7=1 … for a len >127 Bytes and <128^2
or {LEN}=[1xxx|xxxx][1xxx|xxxx][0xxx|xxx] with Byte1+2 Bit7=1 … for a len >127^2 Bytes and <128^3
with {data}=[[][]]{topic}{value}
[[][]] = Len of topic
{Topic}=UTF8 string (e.g. 'demo/target') as Byte
{value}=UTF8 string (e.g. a json or a simple text like'Hello world.') as byte
您可能想知道我到底为什么要对实现进行硬编码……这是为了理解功能。我喜欢用一种特殊的语言在机器人系统中实现发布,它允许使用带有连接/断开连接/发送和接收命令的套接字到给定的地址和端口。因此我尝试让它在 c# 中工作以采用机器人语言。
在这里您可以找到当前代码:
using System;
using System.ComponentModel;
using System.Net;
using System.Net.Sockets;
using System.Threading;
namespace TestSender
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Hello World!");
Socket sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
System.Net.IPAddress ipAdd = System.Net.IPAddress.Parse("127.0.0.1");
System.Net.IPEndPoint remoteEP = new IPEndPoint(ipAdd, 1883);
Console.WriteLine(sock.Connected.ToString());
sock.Connect(remoteEP);
Console.WriteLine(sock.Connected.ToString());
/*
* control flags byte
* 0 ...RETAIN 0/1 -> 0 only PUBLISH
* 1+2 ...QOS 0,1,2 --> 0 only PUBLISH
* 3 ...DUPe 0/1 --> 0 only PUBLISH
*/
/* control messagetype byte 4...7 (4=low)
0 Reserved 0 Forbidden Reserved
1 CONNECT 1 Client to Server Client request to connect to Server
2 CONNACK 2 Server to Client Connect acknowledgment
3 PUBLISH 3 Client to Server
or Server to Client Publish message
4 PUBACK 4 Client to Server
or Server to Client Publish acknowledgment
5 PUBREC 5 Client to Server
or Server to Client Publish received(assured delivery part 1)
6 PUBREL 6 Client to Server
or Server to Client Publish release(assured delivery part 2)
7 PUBCOMP 7 Client to Server
or Server to Client Publish complete(assured delivery part 3)
8 SUBSCRIBE 8 Client to Server Client subscribe request
9 SUBACK 9 Server to Client Subscribe acknowledgment
A UNSUBSCRIBE 10 Client to Server Unsubscribe request
B UNSUBACK 11 Server to Client Unsubscribe acknowledgment
C PINGREQ 12 Client to Server PING request
D PINGRESP 13 Server to Client PING response
E DISCONNECT 14 Client to Server Client is disconnecting
F Reserved 15 Forbidden Reserved
*/
byte bFlagConenct = 0;
byte bFlagPublish = 0; //no dupe, QOS (at most once = 0), no retain
byte bMessageConnect = 1;
byte bMessageDisConnect = 14;
byte bMessagePublish = 3;
byte bMessageHeaderConnect = Convert.ToByte( 16 * bMessageConnect + bFlagConenct); //10h
byte bMessageHeaderPublish = Convert.ToByte(16 * bMessagePublish + bFlagPublish); //30h
byte bMessageHeaderDisconnect= Convert.ToByte(16 * bMessageDisConnect+ bFlagConenct); //E0h
byte[] byAdress = System.Text.Encoding.ASCII.GetBytes(@"demo/Target");
byte[] byData = System.Text.Encoding.ASCII.GetBytes("{OFF}");
int lenAdress = byAdress.Length;
int lenData = byData.Length;
//get short int, unsinged
ushort sLenAdress = Convert.ToUInt16(lenAdress);
ushort sLenAllData = Convert.ToUInt16(lenAdress+lenData+2);
byte[] byLenTopic = BitConverter.GetBytes(sLenAdress); //High-Low
Array.Reverse(byLenTopic); //High-Low swap
//convert 256 based ushort to 128based ushort
ushort sLenMQTT_128Base=0;
if (sLenAllData>128)
{
//2nd byte needed ... add 128*256
sLenMQTT_128Base = Convert.ToUInt16( sLenAllData + 128 * 256);
}
else
{
//only one byte
sLenMQTT_128Base = sLenAllData;
}
byte[] byLenMQTT;
if (sLenMQTT_128Base > 256)
{
byLenMQTT = BitConverter.GetBytes(sLenMQTT_128Base); //High-Low
}
else
{
byte bLenMQTT = Convert.ToByte(sLenMQTT_128Base);
byLenMQTT = new byte[1]; byLenMQTT[0]= bLenMQTT; //High-Low
}
byte[] BytesData_MQTT = new byte[1+byLenMQTT.Length + byLenTopic.Length + byAdress.Length + byData.Length];
//set message
BytesData_MQTT[0]= bMessageHeaderPublish;
//copy 128Base Len
Array.Copy(byLenMQTT, 0, BytesData_MQTT, 1, byLenMQTT.Length);
//copy Len of Topic
Array.Copy(byLenTopic, 0, BytesData_MQTT, 1+ byLenMQTT.Length, byLenTopic.Length);
//copy Topic
Array.Copy(byAdress, 0, BytesData_MQTT, 1 + byLenMQTT.Length + byLenTopic.Length, byAdress.Length);
//copy data
Array.Copy(byData, 0, BytesData_MQTT, 1 + byLenMQTT.Length + byLenTopic.Length + byAdress.Length, byData.Length);
/*
* Paket:
* Byte1: MessageHeader
* Byte2-x: length of message (all data, using 128 Encoding (High Byte first), 1 Byte len<128,2 (or more) Byte if bigger, bit #7 in a Byte indicating that another byte follows)
* Content: 2 Byte: Länge des Topics (H,L, Encoding 256)
* followed by: Topic (UTF8)
* followed by: Values to be send (UTF8)
*/
byte[] BytesCon_MQTT = new byte[12];
//2byte fixed header
BytesCon_MQTT[0] = bMessageHeaderConnect;
BytesCon_MQTT[1] = 10;
//10+byte variable header
BytesCon_MQTT[2] = 0;
BytesCon_MQTT[3] = 4;
BytesCon_MQTT[4] = System.Text.Encoding.ASCII.GetBytes("M")[0];
BytesCon_MQTT[5] = System.Text.Encoding.ASCII.GetBytes("Q")[0];
BytesCon_MQTT[6] = System.Text.Encoding.ASCII.GetBytes("T")[0];
BytesCon_MQTT[7] = System.Text.Encoding.ASCII.GetBytes("T")[0];
BytesCon_MQTT[8] = 4; //protocoll level 0x04
BytesCon_MQTT[9] = 0; //no user, no pw, no retain, no will qos or flag, no clean
BytesCon_MQTT[10] = 0; //HighByte Keep Alive MSB 0=disabled
BytesCon_MQTT[11] = 0; //Low Byte Keep Alive MSB 0=disabled'
byte[] BytesDisCon_MQTT = new byte[2];
BytesDisCon_MQTT[0] = bMessageHeaderDisconnect;
BytesDisCon_MQTT[1] = 0;
byte[] byInbound=new byte[1000];
sock.Send(BytesCon_MQTT);
Thread.Sleep(1);
sock.Receive(byInbound);
sock.Send(BytesData_MQTT);
Thread.Sleep(1);
sock.Receive(byInbound);
sock.Send(BytesDisCon_MQTT);
Thread.Sleep(1);
sock.Receive(byInbound);
Console.WriteLine(sock.Connected.ToString());
sock.Disconnect(false);
sock.Close();
Console.WriteLine(sock.Connected.ToString());
}
}
}
您还没有发送完整的连接数据包。
连接数据包不只是 2 个字节,它是固定的 2 个字节 header,然后是变量 header,然后是有效负载。
一个最小的全连接数据包header是12字节长,然后它后面需要跟payload。
即使您不打算使用现有的 MQTT 客户端库实现,我也建议您阅读一个并将其与规范进行比较。