流缓冲区(Stream Buffer)是 FreeRTOS10.0版本中新增的一种进程间通信机制 。它专为只有一个写入者和一个读取者的应用场景而设计优化,能够处理并传递任意长度的字节数据流 。
流缓冲区与队列的异同
队列在创建时需设定固定的数据项格式和类型,且每次执行写入或读取操作时都针对一个完整的项 。流缓冲区中的数据只能是无起始符和结束符的字节流,且其每次写入和读出的数据长度是完全任意的 。
流缓冲区特别适用于单个写入者与单个读取者的数据交互场景,进程身份可以是中断服务函数(ISR)或常规任务 。常见的应用场景包括从ISR向任务单向传递数据,或在多核处理器架构下实现不同内核间的数据通信 。
若实际系统设计必须引入多个写入者或多个读取者操作同一个流缓冲区,相关调用代码必须被严格置于临界代码段内以禁止任务调度 。此外,在调用读写API函数时,必须将阻塞时间参数强制设置为零 。
流缓冲区在进程间传递数据时采用内存复制的方式,即写入者将内存数据复制进入流缓冲区,读取者再从缓冲区中将数据复制回本地 。其底层利用了任务通知技术,当任务因操作流缓冲区进入阻塞状态时,系统会相应改变该任务的通知状态和通知值 。
触发水平是指当读取者在阻塞状态下等待数据时,流缓冲区内的数据量必须达到的特定字节数阈值 。只有当缓冲区内积攒的数据字节数达到或超过设定的触发水平参数时,系统才会解除读取任务的阻塞状态 。
在 CubeMX 中没有关于流缓冲区的任何设置,想要使用流缓冲区,必须在文件中包含
stream_buffer.h。
xStreamBufferCreate该函数没有对应的中断版本。
//这是一个宏函数,它调用的原函数可以用来创建流冲区和消息缓冲区。
#define xStreamBufferCreate( xBufferSizeBytes, xTriggerLevelBytes ) xStreamBufferGenericCreate( xBufferSizeBytes, xTriggerLevelBytes, pdFALSE )
StreamBufferHandle_t xStreamBufferCreate(
size_t xBufferSizeBytes, // 缓冲区的总容量(以字节为单位)
size_t xTriggerLevelBytes // 触发水平(当缓冲区内积累了多少字节数据时,才会唤醒阻塞等待读取的任务)
);
//对应的静态创建方法为xStreamBufferCreateStatic。后续不再赘述。
StreamBufferHandle_t xStreamBufferCreateStatic(
size_t xBufferSizeBytes, // 缓冲区的总字节大小(数据存储区的容量)
size_t xTriggerLevelBytes, // 触发水平(当缓冲区内积累了多少字节数据时,才会唤醒正在阻塞等待读取的任务)
uint8_t *pucStreamBufferStorageArea, // 数据存储区指针(指向用户提前定义好的、用于存放实际字节流的数组)
StaticStreamBuffer_t *pxStaticStreamBuffer // 控制块指针(指向用户提前定义好的流缓冲区控制块结构体)
);
使用 xStreamBufferCreate 函数可以动态创建一个流缓冲区对象,在调用时需传入设定的缓冲区总字节容量以及触发水平参数,该函数会返回一个代表该缓冲区的句柄指针 。
如果将触发水平设置为 0 的话会等效为 1 。
由于 FreeRTOS 环形缓冲区的底层设计,流缓冲区需要使用 1 个字节 来区分“缓冲区已满”和“缓冲区为空”的状态。
因此,如果你希望你的流缓冲区能够实际容纳 N 个字节的数据,你传入的 pucStreamBufferStorageArea 数组的长度必须至少是 N + 1 字节,传入的 xBufferSizeBytes 也应该是 N + 1。
动态分配时系统会自动加一,只需直接传入你需要的字节数就好。
xStreamBufferSetTriggerLevel该函数没有对应的中断版本。
BaseType_t xStreamBufferSetTriggerLevel(
StreamBufferHandle_t xStreamBuffer, // 目标流缓冲区的句柄
size_t xTriggerLevel // 新的触发水平(必须大于 0,且小于等于缓冲区总容量)
);使用 xStreamBufferSetTriggerLevel 函数可以动态更新已创建流缓冲区的触发水平参数 。传入的全新触发水平值必须小于或等于流缓冲区的总容量长度,函数才会更新成功并返回布尔真值 。
当设置新触发值后,如果缓冲区内数据量已经超过刚刚设置触发值,该函数在退出前会唤醒对应的消费者线程。
设计一个协议包,包头固定(假设为 5 字节),载荷可变。
初始状态时可以先将触发水平设置为 5,等到接受完毕包头、解析完载荷长度时再将触发水平设置为载荷长度。这样做可以避免频繁的无效唤醒导致浪费 CPU 时间。
#define HEADER_SIZE 5
#define MAX_PAYLOAD_SIZE 128
#define SYNC_BYTE_1 0xAA // 假设协议以 AA 55 开头
#define SYNC_BYTE_2 0x55
void vProtocolParseTask(void *pvParameters)
{
uint8_t ucHeader[HEADER_SIZE];
uint8_t ucPayload[MAX_PAYLOAD_SIZE];
size_t xReceivedBytes;
uint8_t ucPayloadLength = 0;
// 初始化:确保最开始的触发水平是包头长度
xStreamBufferSetTriggerLevel(xMyStreamBuffer, HEADER_SIZE);
for (;;)
{
// ==========================================
// 阶段 1:等待并读取包头
// ==========================================
xReceivedBytes = xStreamBufferReceive(xMyStreamBuffer, ucHeader, HEADER_SIZE, portMAX_DELAY);
if (xReceivedBytes == HEADER_SIZE)
{
// 校验包头同步字 (防错位机制)
if (ucHeader == SYNC_BYTE_1 && ucHeader == SYNC_BYTE_2)
{
// 假设第 5 个字节 (下标 4) 是载荷长度
ucPayloadLength = ucHeader;
// 安全检查:防止长度字段被篡改导致越界
if (ucPayloadLength > 0 && ucPayloadLength <= MAX_PAYLOAD_SIZE)
{
// 动态调整触发水平为载荷长度
xStreamBufferSetTriggerLevel(xMyStreamBuffer, ucPayloadLength);
// ==========================================
// 阶段 2:等待并读取载荷
// ==========================================
// 这里加入超时时间(如 1000ms),防止包头对了但后面的数据永远不来,导致死锁
xReceivedBytes = xStreamBufferReceive(xMyStreamBuffer, ucPayload, ucPayloadLength, pdMS_TO_TICKS(1000));
if (xReceivedBytes == ucPayloadLength)
{
// 成功!完整的包已收到,去处理业务逻辑...
ProcessPayload(ucPayload, ucPayloadLength);
}
else
{
// 错误:载荷接收超时!说明发生丢包,直接丢弃本次数据包
}
}
else if (ucPayloadLength == 0)
{
// 特殊情况:这是一个没有载荷的心跳包/指令包,直接处理
ProcessEmptyPacket();
}
// (如果超长则丢弃,属于无效包)
}
else
{
// 【精髓:错位恢复】
// 如果发现包头不对,说明数据错位了。
// 此时不应该傻等,应该将触发水平设为 1,逐字节扫描去找下一个 0xAA。
// 为了简单起见,这里演示直接把触发水平重置为 5,等下一波。
}
}
// ==========================================
// 收尾:将状态机复位,准备接收下一个包头
// ==========================================
xStreamBufferSetTriggerLevel(xMyStreamBuffer, HEADER_SIZE);
}
}vStreamBufferDelete该函数没有对应的中断版本。
void vStreamBufferDelete(
StreamBufferHandle_t xStreamBuffer // 要被物理超度的流缓冲区句柄
);释放目标流缓冲区。无法释放由静态创建的内存。
当你删除一个缓冲区时,请确保没有任何线程阻塞在该缓冲区上。
xStreamBufferSend该函数对应的 ISR 版本函数为
xStreamBufferSendFromISR。
size_t xStreamBufferSend(
StreamBufferHandle_t xStreamBuffer, // 目标流缓冲区的句柄
const void *pvTxData, // 指向要发送数据的指针(数据源)
size_t xDataLengthBytes, // 你期望发送的字节数
TickType_t xTicksToWait // 阻塞超时时间(如果缓冲区已满或空间不足,任务最多等多久)
);xStreamBufferSend 函数用于在常规任务上下文中向指定的流缓冲区复制并写入字节数据流 。若缓冲区的剩余空间不足以容纳当前需要写入的完整数据,写入任务将根据传入的超时时间参数进入阻塞状态等待 。
该函数的返回值为实际成功写入缓冲区的字节数。
假设你想发送 50 字节,但当前缓冲区只剩 20 字节空位。任务会进入阻塞等待以期接收方腾出空间。如果到达超时时间
xTicksToWait后,接收方仍未读取任何数据,该函数会将此时可用的 20 个空位全部填满,然后立刻返回20。余下的 30 字节将不会被处理(需由开发者在应用层自行循环重发)。同样的, 当尝试发送的数据量大于缓冲区的最大有效容量时,FreeRTOS 底层会直接将期望写入量
xRequiredSpace截断为该缓冲区的最大容量。随后任务进入阻塞,死等缓冲区被完全清空。如果发生超时,它只会将超时瞬间可用的空位填满,并返回实际写入的字节数。总的来说,该函数没有自动重发的功能。
流缓冲区没有互斥功能,因此不能用多个任务读写一个流缓冲区,如果一定要这么做,你需要自己确保互斥。
流缓冲区不提供类似
xQueuePeek的只读不删的功能函数。
xStreamBufferReceive该函数对应的 ISR 版本函数为
xStreamBufferReceiveFromISR。
size_t xStreamBufferReceive(
StreamBufferHandle_t xStreamBuffer, // 目标流缓冲区的句柄
void *pvRxData, // 指向本地接收缓存的指针(读出来的数据存放地)
size_t xBufferLengthBytes, // 你期望读取的最大字节数(通常就是 pvRxData 数组的大小,防止溢出)
TickType_t xTicksToWait // 阻塞超时时间(如果缓冲区里的数据没达到“触发水平”,任务最多等多久)
);xStreamBufferReceive 函数用于从目标流缓冲区中复制出指定长度的字节数据到读取者的目标接收缓冲区中 。如果流缓冲区当前为空,或者其中的数据量尚未达到设定的触发水平,读取任务将按照给定的超时参数进入阻塞状态等待数据到达 。
当缓冲区里的数据量大于触发水平(比如 10)时,函数不会只读 10 个字节,而是尽可能的都取所有字节,除非超过
xBufferLengthBytes设定的最大值。
xStreamBufferBytesAvailable该函数没有对应的中断版本。
size_t xStreamBufferBytesAvailable(
StreamBufferHandle_t xStreamBuffer // 目标流缓冲区的句柄
);返回当前流缓冲区中尚未被读取(未消费)的有效字节总数。如果缓冲区是空的,则返回 0。
用于流缓冲区设计上是为了单对单读写的,因此为了防止等待阻塞而先看有多少数据再读取是符合规范的,不会发生高优先级任务中途抢占 CPU 读取数据导致锁死的情况。只要你遵守单对单的要求的话。
// 假设解析一个包至少需要 16 个字节
if( xStreamBufferBytesAvailable( xMyStreamBuffer ) >= 16 )
{
// 数据足够了!现在去调用 Receive,必然能瞬间拿到 16 个字节,绝不卡顿
xStreamBufferReceive( xMyStreamBuffer, ucBuffer, 16, 0 );
ProcessData( ucBuffer );
}
else
{
// 数据不够,我可以先去干点别的工作(执行其他非阻塞的业务逻辑)
DoOtherWork();
}我们可能需要有一个优先级很低的“监视狗任务 (Monitor Task)”。 它会定期巡视系统内的各个流缓冲区,看看有没有发生数据积压。
void vDiagnosticTask( void * pvParameters )
{
for( ;; )
{
size_t unread_bytes = xStreamBufferBytesAvailable( xUartStreamBuffer );
// 如果积压数据超过容量的 80%,可能意味着接收任务卡死了,或者处理太慢
if( unread_bytes > (BUFFER_TOTAL_SIZE * 0.8) )
{
printf("警告:串口流缓冲区濒临溢出!当前积压 %u 字节\n", unread_bytes);
// 触发报警或重启某些状态机
}
osDelay( pdMS_TO_TICKS( 1000 ) ); // 每秒巡查一次
}
}xStreamBufferSpacesAvailable该函数没有对应的中断版本。
size_t xStreamBufferSpacesAvailable(
StreamBufferHandle_t xStreamBuffer // 目标流缓冲区的句柄
);返回当前流缓冲区中剩余可用的空位字节数。
在静态创建缓冲区的时候我们曾说过 N+1 规则:
实际总占据内存大小 (xBufferSizeBytes) = BytesAvailable (已用) + SpacesAvailable (可用) + 1 (幽灵字节) 。
xStreamBufferIsEmpty该函数没有对应的中断版本。
BaseType_t xStreamBufferIsEmpty(
StreamBufferHandle_t xStreamBuffer // 目标流缓冲区的句柄
);返回值 (BaseType_t):
pdTRUE:流缓冲区是完全空的(里面 1 个字节的数据都没有)。pdFALSE:流缓冲区里有数据(至少有 1 个字节)。xStreamBufferIsFull该函数没有对应的中断版本。
BaseType_t xStreamBufferIsFull(
StreamBufferHandle_t xStreamBuffer // 目标流缓冲区的句柄
);返回值 (BaseType_t):
pdTRUE:流缓冲区已经完全满了(没有任何空位,连 1 个字节都写不进去了)。pdFALSE:流缓冲区还有空位(至少能再写入 1 个字节)。在实际项目中,应该让生产者只关心“满不满”,消费者只关心“空不空”。
xStreamBufferReset该函数没有对应的中断版本。
BaseType_t xStreamBufferReset(
StreamBufferHandle_t xStreamBuffer // 目标流缓冲区的句柄
);返回值 (BaseType_t):
pdPASS:复位成功。此时缓冲区内的所有数据已被清除,读写指针归位。pdFAIL:复位失败!这是一个非常关键的反馈,通常意味着当前有任务正阻塞在这个流缓冲区上。调用 xStreamBufferReset 函数可以将指定的流缓冲区彻底清空,并将其内部状态恢复至初始创建时的模样 。此项复位操作具有严格的前提条件:只有在没有任何任务因读写该缓冲区而处于阻塞等待状态时,复位指令才会被系统允许并成功执行 。
不论是流缓冲区还是消息缓冲区,都不能(或不建议)用 DMA 直接读写。
更多内容,请见“消息缓冲区-相关API函数-高级”描述。
xStreamBufferSendCompletedFromISR这是一个 ISR 函数。
仅限高级用户使用。如果你不知道它的使用场景,那么就不要使用它。
BaseType_t xStreamBufferSendCompletedFromISR(
StreamBufferHandle_t xStreamBuffer,
BaseType_t *pxHigherPriorityTaskWoken
);该函数用来唤醒阻塞于目标缓冲区的所有消费者任务。如果成功唤醒了至少一个任务,则返回pdTRUE,否则返回pdFALSE。
xStreamBufferReceiveCompletedFromISR这是一个 ISR 函数。
仅限高级用户使用。如果你不知道它的使用场景,那么就不要使用它。
BaseType_t xStreamBufferReceiveCompletedFromISR(
StreamBufferHandle_t xStreamBuffer,
BaseType_t *pxHigherPriorityTaskWoken
);该函数用来唤醒阻塞于目标缓冲区的所有生产者任务。如果成功唤醒了至少一个任务,则返回pdTRUE,否则返回pdFALSE。
消息缓冲区是 FreeRTOS系统中用于进程间通信的一种重要机制,它建立在流缓冲区的基础之上。
与流缓冲区传递连续、无边界的字节流不同,消息缓冲区专门用于传递长度不固定的离散消息数据。
消息缓冲区的底层操作和存储逻辑高度依赖于流缓冲区,但其具备自我界定数据边界的特性。
每次向消息缓冲区写入一条消息时,系统会在该条消息数据之前自动附加额外的隐藏字节,专门用于永久记录该条特定消息的精确长度。
得益于每条消息自带的长度信息前缀,消息缓冲区能够确保数据包的完整性与独立性。当接收方尝试从缓冲区读取数据时,系统会首先隐式读取长度前缀,随后按照该长度提取对应字节的数据。这种机制保证了每次提取出的都是一条完整的原始消息,有效避免了数据的截断或多条消息粘连。
继承了流缓冲区的核心特性,消息缓冲区的最佳设计应用场景依然是单个写入者与单个读取者之间的通信模型。无论是在中断服务程序向普通任务单向汇报离散事件,还是在多核架构下不同执行内核间传递定长指令包,它都能高效、安全地完成投递。
xMessageBufferCreate该函数没有对应的中断版本。
MessageBufferHandle_t xMessageBufferCreate(
size_t xBufferSizeBytes // 缓冲区的总容量(以字节为单位)
);
MessageBufferHandle_t xMessageBufferCreateStatic(
size_t xBufferSizeBytes,
uint8_t * const pucMessageBufferStorageArea,
StaticStreamBuffer_t * const pxStaticMessageBuffer
) 在程序中动态分配并创建一个消息缓冲区需调用 xMessageBufferCreate 函数。开发者需传入所期望的缓冲区总字节容量。
值得特别注意的是,由于每条存入的消息都会被强制附加长度记录前缀,开发者在计算和设定总容量时,必须将这些必然产生的额外内存开销考虑在内。
vMessageBufferDelete该函数没有对应的中断版本。
void vMessageBufferDelete(
MessageBufferHandle_t xMessageBuffer // 要销毁的消息缓冲区句柄
);删除指定消息缓冲区。
当你删除一个消息缓冲区时,请确保没有任何线程阻塞在该缓冲区上。
xMessageBufferSend该函数对应的中断版本为
xMessageBufferSendFromISR。
size_t xMessageBufferSend(
MessageBufferHandle_t xMessageBuffer, // 目标消息缓冲区的句柄
const void *pvTxData, // 指向你要发送的数据(消息体)的指针
size_t xDataLengthBytes, // 消息体的纯数据长度(不包括底层偷偷加的 4 字节长度头!)
TickType_t xTicksToWait // 阻塞超时时间
);xMessageBufferSend 函数用于在常规任务上下文中将一条离散消息发送至指定的消息缓冲区。
若缓冲区剩余空间不足以容纳该条带有长度前缀的完整消息,执行发送的任务将依据超时时间进入阻塞状态。
该函数的返回值表示实际写入的字节数(不包含前缀头)。
这点体现了和流缓冲区之间的区别。流缓冲区会尽可能的填满剩余内容,即使可能造成数据的截断;而消息缓冲区只会以消息为基本单元,不会再分。
当你试图向总空间只有 20 的消息缓冲区发送 30 字节的消息时,系统会无视等待时间而直接返回失败。
xMessageBufferReceive该函数对应的中断版本为
xMessageBufferReceiveFromISR。
size_t xMessageBufferReceive(
MessageBufferHandle_t xMessageBuffer, // 目标消息缓冲区的句柄
void *pvRxData, // 指向本地接收缓存的指针(你要把读出来的消息存哪)
size_t xBufferLengthBytes, // 你的本地缓存的最大容量(极其重要!)
TickType_t xTicksToWait // 阻塞超时时间(如果没有消息,最多等多久)
);该函数会从指定的消息缓冲区中剥离出一条最老的完整消息,并将其复制到用户定义的接收数组中。
若缓冲区内当前没有任何有效消息,读取任务将根据设定的等待节拍数进入阻塞休眠状态。
该函数的返回值表示实际读取的字节数(不包含头)。
系统内置了一系列状态查询函数用于监控消息缓冲区的实时水位。开发者可以使用特定的空间查询函数获取当前缓冲区内还剩余多少可用字节,或者评估剩余空间是否还能容纳下一次的大型消息写入。
xMessageBufferIsEmpty该函数没有对应的中断版本。
BaseType_t xMessageBufferIsEmpty(
MessageBufferHandle_t xMessageBuffer // 目标消息缓冲区的句柄
);查询当前缓冲区是否为空。
xMessageBufferIsFull该函数没有对应的中断版本。
BaseType_t xMessageBufferIsFull(
MessageBufferHandle_t xMessageBuffer // 目标消息缓冲区的句柄
);判断消息缓冲区空间是否已满。
即便没满也不一定能发消息,请不要把这个函数作为发送消息的条件1。
你应该使用下面的
xMessageBufferSpaceAvailable。
xMessageBufferSpaceAvailable、xMessageBufferSpacesAvailable该函数没有对应的中断版本。
size_t xMessageBufferSpaceAvailable(
MessageBufferHandle_t xMessageBuffer // 目标消息缓冲区的句柄
);该函数返回底层缓冲区中剩余的空闲字节数。
之所以有两个函数名是因为当初作者少打了个 s,后面为了向前兼容不得不保留错误的函数名。
当你将该函数作为是否发消息的判断时,需要注意消息所需空间=消息本身长度+4。
xMessageBufferNextLengthBytes该函数没有对应的中断版本。
size_t xMessageBufferNextLengthBytes(
MessageBufferHandle_t xMessageBuffer // 目标消息缓冲区的句柄
);查询当前缓冲区最前方的消息的有效载荷长度。
0。有点类似于 Peek 函数,只看不取。不过 peek 会返回具体内容,而该函数只会返回长度。
xMessageBufferReset该函数没有对应的中断版本。
BaseType_t xMessageBufferReset(
MessageBufferHandle_t xMessageBuffer // 目标消息缓冲区的句柄
);执行此操作会将缓冲区清空并恢复到刚完成创建时的初始状态。
若业务逻辑需要彻底清空消息缓冲区内的所有历史数据残留,可以调用 xMessageBufferReset 函数。
使用该函数时必须保证当前必须没有任何其他任务正因为等待读写该缓冲区而处于挂起阻塞状态。
不能使用 DMA 来读写流缓冲区或消息缓冲区,再调用函数通知系统。
因为缓冲区除了数据外,还有一个读写指针。消息缓冲区还有一个额外的消息头;当使用 DMA 读写数据时,虽然将数据读出来了,但读写指针没变,此时通知系统不会发生任何事。
如果你一定要这么做,需要你自己想办法获取读写指针,再读写完毕后手动操作该指针。
xMessageBufferSendCompletedFromISR这是一个 ISR 函数。
仅限高级用户使用。如果你不知道它的使用场景,那么就不要使用它。
BaseType_t xMessageBufferSendCompletedFromISR(
MessageBufferHandle_t xMessageBuffer, // 目标消息缓冲区的句柄
BaseType_t *pxHigherPriorityTaskWoken // 高优先级任务唤醒标志的指针(熟悉吗?和你最开始问的那个 RTC 中断一模一样)
);该函数用来唤醒阻塞于目标缓冲区的所有消费者任务。如果成功唤醒了至少一个任务,则返回pdTRUE,否则返回pdFALSE。
平时使用的
xMessageBufferSendFromISR自带任务唤醒功能,不需要调用该函数。当我们通过非 FreeRTOS API 的方式向缓冲区写入数据时(如通过指针强行写入),可以调用此函数来通知系统有新数据。
该函数常常用于多核项目中,多个芯片之间通过共享内存通信,当其中一个核向内存中的缓冲区写入数据后,可以通过触发硬件中断的方式通知其它核唤醒对应线程并读取数据。
xMessageBufferReceiveCompletedFromISR这是一个 ISR 函数。
仅限高级用户使用。如果你不知道它的使用场景,那么就不要使用它。
BaseType_t xMessageBufferReceiveCompletedFromISR(
MessageBufferHandle_t xMessageBuffer, // 目标消息缓冲区的句柄
BaseType_t *pxHigherPriorityTaskWoken // 高优先级任务唤醒标志指针
);该函数用来唤醒阻塞于目标缓冲区的所有生产者任务。如果成功唤醒了至少一个任务,则返回pdTRUE,否则返回pdFALSE。