具有自动重传问题的稳健串行协议
Robust Serial Protocol with Auto Retransmit questions
好久reader,第一次发贴。
我通过蓝牙在传感器和基站之间建立了串行 link。蓝牙 link 非常不可靠,丢包的方式令人难以置信。我正在使用它作为积极因素,并打算设计一个强大的串行协议,它可以在狗屎中生存 link。
我是办公室里唯一的嵌入式开发人员,只是想听取大家的意见。
计划使用字节填充来创建具有起始 (STX) 和结束 (ETX) 字节、索引号和 CRC 的数据包。我计划在出现 STX、ETX 和 DLE 字符时使用转义字符 (DLE)。那部分都非常清楚,这里说的代码应该做到这一点
static void SendCommand(struct usart_module * const usart_module, uint16_t cmd,
uint8_t *data, uint8_t len)
{
//[STX] ( { [IDX] [CMD_H] [CMD_L] [LEN] ...[DATA]... } [CRC] ) [ETX] // Data in {} brackets is XORed together for the CRC. // Data in () is the packet length
static uint8_t idx;
uint8_t packet[256];
uint8_t crc, packetLength, i;
if (len > 250)
return;
packetLength = len + 5;
packet[0] = idx++;
packet[1] = (cmd >> 8);
packet[2] = (cmd & 0x00FF);
packet[3] = packetLength;
crc = 0;
for (i = 0; i <= 3; i++)
{
crc ^= packet[i];
}
for (i = 0; i < len; i++)
{
packet[4 + i] = data[i];
crc ^= data[i];
}
packet[4 + len] = crc;
// Send Packet
uart_putc(usart_module, STX);
for (i = 0; i < packetLength; i++)
{
if ((packet[i] == STX) || (packet[i] == ETX) || (packet[i] == DLE))
{
uart_putc(usart_module, DLE);
}
uart_putc(usart_module, packet[i]);
}
uart_putc(usart_module, ETX);
}
所以这将发送一个数据包,但现在我需要添加一些代码来跟踪数据包并自动重新传输失败的数据包,这就是我需要一些帮助的地方。
我有几个选择;
-最简单,分配一个包含 256 个数据包的巨大数组,每个数据包都有足够的空间来存储一个数据包,并在传输数据包后将其放入缓冲区,如果我在 x 时间后没有收到 ACK,则再次传输。然后,如果我确实收到 ACK,请从数组中删除该记录,以便条目为空,这样我就知道它收到了。
问题在于,如果我使用 256 字节的最坏情况数据包大小,以及它们的 256 个实例,那就是 64K RAM 而我没有(即使我有,那也是一个糟糕的主意)
-下一个想法,创建一个所有数据包的 linked 列表并使用 malloc 命令等动态分配内存并删除那些得到确认的,并保留那些还没有的所以我知道哪个x 次后重传。
我的问题是整个 malloc 想法。说实话,我从未使用过它,而且我不喜欢在内存有限的嵌入式环境中使用它的想法。也许那只是我的愚蠢,但我觉得这为我不需要的其他问题打开了大门。
-潜在的解决方案,像上面提到的那样为所有数据包创建一个 linked 列表,但是在 fifo 中创建它们,并移动所有记录以将它们保存在 fifo 中。
例如
发送数据包 1,将数据包放入 fifo
发送数据包 2,将数据包放入 fifo
收到数据包 1 的 NACK,什么都不做
发送数据包 3,将数据包放入 fifo
收到数据包 2 的 ACK,memset 数据包 2 到 fifo
中的 0x00
收到数据包 3 的 ACK,memset 数据包 2 到 fifo 中的 0x00
发送数据包 4,将数据包放入 fifo
发送数据包 5,将数据包放入 fifo
FIFO 中没有更多空间,遍历并将所有内容洗牌到前面,因为数据包 2 和 3 现在是空的。
可以让它们实时洗牌,但是我们需要在收到每个数据包后洗牌整个 fifo,这似乎是很多不必要的工作?
我要找的是你们中的一位说 "Jeez Ned, thats not bad, but if you just do xyz, then that saves you heaps of work or RAM or complexity" 之类的。
只是希望几个人能够真正地讨论想法,因为这通常是您获得最佳解决方案的方式。我喜欢我的解决方案,但我觉得我遗漏了什么,或者可能过于复杂了?不确定...我只是对我不认为的这个想法感到 100% 满意,但只是想不出更好的方法。
您不需要使用 malloc。只需静态分配所有数据包,也许作为一个结构数组。但是不要使用数组在 运行 时间遍历数据包。相反,扩展您的 linked-list 想法。创建两个列表,一个用于等待 ACK 的数据包,另一个用于免费(即可用)数据包。在启动时,将每个数据包添加到 free-list。发送数据包时,将其从 free-list 中删除并将其添加到 awaiting-ACK 列表中。制作 awaiting-ACK 列表 doubly-linked 以便您可以从列表中间删除一个数据包并将其 return 到 free-list.
如果您的数据包大小变化很大,并且您想用更少的内存支持更多的数据包,那么您可以为不同大小的数据包创建多个 free-list。例如,max-size-free-list 包含最大的数据包,而 economy-size-free-list 包含较小的数据包。 awaiting-ACK 列表可以包含两种大小,只要它可以知道 free-list 到 return 的数据包。这可以通过在数据包结构中添加一个标志来知道。
typedef enum PacketSizeType
PACKET_SIZE_MAX = 0,
PACKET_SIZE_ECONOMY
} PacketSizeType;
typedef struct PacketBase{
PacketBase * next;
PacketBase * prev;
PacketSizeType type;
uint8_t data[1]; // a place holder (must be last)
} PacketBase;
typedef struct PacketMax
{
PacketBase base; // inherit from PacketBase (must be first)
uint8_t data[255];
} PacketMax;
typedef struct PacketEconomy
{
PacketBase base; // inherit from PacketBase (must be first)
uint8_t data[30];
} PacketEconomy;
PacketMax MaxSizedPackets[100];
PacketEconomy EconomySizedPackets[100];
Packet *free_list_max;
Packet *free_list_economy;
Packet *awaiting_ack_list;
初始化代码应循环遍历两个数组,将 base.type
成员设置为 MAX
或 ECONOMY
,并将数据包添加到适当的 free-list。传输代码从适当大小的 free-list 中获取一个数据包并将其移动到 awaiting-ACK 列表中。 awaiting-ACK 列表可以包含两种类型的数据包,因为它们都继承自 PacketBase
。 ACK 处理程序代码应检查 base.type
到 return 数据包到适当的 free-list.
一些注意事项。
- 您需要考虑订购。 out-of-order 送货是否可以接受?
- 你关心重复数据包吗?
- 丢失的 ACK 必须通过超时来检测。
我的猜测是您关心数据包的交付顺序并且不想要重复的数据包。
一个基本的方法是发送一个带有序列号的数据包,在一个时间范围内等待一个ACK。如果收到 ACK,则按顺序发送下一个数据包。如果您没有收到 ACK(考虑超时和 NAK 等效),则重新传输。重复。
当有未完成的数据包时,数据结构问题就变成了在您的应用程序中排队。
如果你想进入多个未完成的数据包,你需要一个更复杂的数据结构和一种进行选择性 ACK 的方法。那里有很多文献,但 Tannenbaum 的 "Computer Networks" 是一个不错的起点。
好久reader,第一次发贴。
我通过蓝牙在传感器和基站之间建立了串行 link。蓝牙 link 非常不可靠,丢包的方式令人难以置信。我正在使用它作为积极因素,并打算设计一个强大的串行协议,它可以在狗屎中生存 link。
我是办公室里唯一的嵌入式开发人员,只是想听取大家的意见。
计划使用字节填充来创建具有起始 (STX) 和结束 (ETX) 字节、索引号和 CRC 的数据包。我计划在出现 STX、ETX 和 DLE 字符时使用转义字符 (DLE)。那部分都非常清楚,这里说的代码应该做到这一点
static void SendCommand(struct usart_module * const usart_module, uint16_t cmd,
uint8_t *data, uint8_t len)
{
//[STX] ( { [IDX] [CMD_H] [CMD_L] [LEN] ...[DATA]... } [CRC] ) [ETX] // Data in {} brackets is XORed together for the CRC. // Data in () is the packet length
static uint8_t idx;
uint8_t packet[256];
uint8_t crc, packetLength, i;
if (len > 250)
return;
packetLength = len + 5;
packet[0] = idx++;
packet[1] = (cmd >> 8);
packet[2] = (cmd & 0x00FF);
packet[3] = packetLength;
crc = 0;
for (i = 0; i <= 3; i++)
{
crc ^= packet[i];
}
for (i = 0; i < len; i++)
{
packet[4 + i] = data[i];
crc ^= data[i];
}
packet[4 + len] = crc;
// Send Packet
uart_putc(usart_module, STX);
for (i = 0; i < packetLength; i++)
{
if ((packet[i] == STX) || (packet[i] == ETX) || (packet[i] == DLE))
{
uart_putc(usart_module, DLE);
}
uart_putc(usart_module, packet[i]);
}
uart_putc(usart_module, ETX);
}
所以这将发送一个数据包,但现在我需要添加一些代码来跟踪数据包并自动重新传输失败的数据包,这就是我需要一些帮助的地方。
我有几个选择; -最简单,分配一个包含 256 个数据包的巨大数组,每个数据包都有足够的空间来存储一个数据包,并在传输数据包后将其放入缓冲区,如果我在 x 时间后没有收到 ACK,则再次传输。然后,如果我确实收到 ACK,请从数组中删除该记录,以便条目为空,这样我就知道它收到了。
问题在于,如果我使用 256 字节的最坏情况数据包大小,以及它们的 256 个实例,那就是 64K RAM 而我没有(即使我有,那也是一个糟糕的主意)
-下一个想法,创建一个所有数据包的 linked 列表并使用 malloc 命令等动态分配内存并删除那些得到确认的,并保留那些还没有的所以我知道哪个x 次后重传。
我的问题是整个 malloc 想法。说实话,我从未使用过它,而且我不喜欢在内存有限的嵌入式环境中使用它的想法。也许那只是我的愚蠢,但我觉得这为我不需要的其他问题打开了大门。
-潜在的解决方案,像上面提到的那样为所有数据包创建一个 linked 列表,但是在 fifo 中创建它们,并移动所有记录以将它们保存在 fifo 中。
例如
发送数据包 1,将数据包放入 fifo
发送数据包 2,将数据包放入 fifo
收到数据包 1 的 NACK,什么都不做
发送数据包 3,将数据包放入 fifo
收到数据包 2 的 ACK,memset 数据包 2 到 fifo
中的 0x00
收到数据包 3 的 ACK,memset 数据包 2 到 fifo 中的 0x00
发送数据包 4,将数据包放入 fifo
发送数据包 5,将数据包放入 fifo
FIFO 中没有更多空间,遍历并将所有内容洗牌到前面,因为数据包 2 和 3 现在是空的。
可以让它们实时洗牌,但是我们需要在收到每个数据包后洗牌整个 fifo,这似乎是很多不必要的工作?
我要找的是你们中的一位说 "Jeez Ned, thats not bad, but if you just do xyz, then that saves you heaps of work or RAM or complexity" 之类的。
只是希望几个人能够真正地讨论想法,因为这通常是您获得最佳解决方案的方式。我喜欢我的解决方案,但我觉得我遗漏了什么,或者可能过于复杂了?不确定...我只是对我不认为的这个想法感到 100% 满意,但只是想不出更好的方法。
您不需要使用 malloc。只需静态分配所有数据包,也许作为一个结构数组。但是不要使用数组在 运行 时间遍历数据包。相反,扩展您的 linked-list 想法。创建两个列表,一个用于等待 ACK 的数据包,另一个用于免费(即可用)数据包。在启动时,将每个数据包添加到 free-list。发送数据包时,将其从 free-list 中删除并将其添加到 awaiting-ACK 列表中。制作 awaiting-ACK 列表 doubly-linked 以便您可以从列表中间删除一个数据包并将其 return 到 free-list.
如果您的数据包大小变化很大,并且您想用更少的内存支持更多的数据包,那么您可以为不同大小的数据包创建多个 free-list。例如,max-size-free-list 包含最大的数据包,而 economy-size-free-list 包含较小的数据包。 awaiting-ACK 列表可以包含两种大小,只要它可以知道 free-list 到 return 的数据包。这可以通过在数据包结构中添加一个标志来知道。
typedef enum PacketSizeType
PACKET_SIZE_MAX = 0,
PACKET_SIZE_ECONOMY
} PacketSizeType;
typedef struct PacketBase{
PacketBase * next;
PacketBase * prev;
PacketSizeType type;
uint8_t data[1]; // a place holder (must be last)
} PacketBase;
typedef struct PacketMax
{
PacketBase base; // inherit from PacketBase (must be first)
uint8_t data[255];
} PacketMax;
typedef struct PacketEconomy
{
PacketBase base; // inherit from PacketBase (must be first)
uint8_t data[30];
} PacketEconomy;
PacketMax MaxSizedPackets[100];
PacketEconomy EconomySizedPackets[100];
Packet *free_list_max;
Packet *free_list_economy;
Packet *awaiting_ack_list;
初始化代码应循环遍历两个数组,将 base.type
成员设置为 MAX
或 ECONOMY
,并将数据包添加到适当的 free-list。传输代码从适当大小的 free-list 中获取一个数据包并将其移动到 awaiting-ACK 列表中。 awaiting-ACK 列表可以包含两种类型的数据包,因为它们都继承自 PacketBase
。 ACK 处理程序代码应检查 base.type
到 return 数据包到适当的 free-list.
一些注意事项。
- 您需要考虑订购。 out-of-order 送货是否可以接受?
- 你关心重复数据包吗?
- 丢失的 ACK 必须通过超时来检测。
我的猜测是您关心数据包的交付顺序并且不想要重复的数据包。
一个基本的方法是发送一个带有序列号的数据包,在一个时间范围内等待一个ACK。如果收到 ACK,则按顺序发送下一个数据包。如果您没有收到 ACK(考虑超时和 NAK 等效),则重新传输。重复。
当有未完成的数据包时,数据结构问题就变成了在您的应用程序中排队。
如果你想进入多个未完成的数据包,你需要一个更复杂的数据结构和一种进行选择性 ACK 的方法。那里有很多文献,但 Tannenbaum 的 "Computer Networks" 是一个不错的起点。