Comunicación inter-tareas (II): Mensajes (message buffers)

En la lección anterior estudiamos un mecanismo para pasar secuencias de bytes entre tareas, o de una interrupción a una tarea. Prácticamente todos los protocolos de comunicaciones son orientados al byte, lo que pone de manifiesto la importancia de este tipos de xxx.

Sin embargo, en casi todas nuestras aplicaciones vamos a necesitar transferir información de datos de tipos simples (int, float, etc) y de datos compuestos (struct), ya sea en forma de una variable o de un arreglo. Para estas ocasiones FreeRTOS incluyó los mensajes, cuya filosofía es la misma que la de los flujos, con la diferencia de que podrás transferir información de cualquier tipo, y no sólo bytes (por byte debemos entender: char, unsigned char, uint8_t, int8_t).

En la lección del día de hoy vamos a estudiar los mensajes explicando sus detalles y viendo 4 ejemplos representativos de su utilización.

Tabla de contenidos

¿Qué es y qué necesito?

Los mensajes (message buffers, en FreeRTOS) son un mecanismo que está basado en los flujos y que sirve para transferir información compleja (más de un byte), ya sea de tipos básicos del lenguaje C o de tipos compuestos.

Aunque similiares, los mensajes tienen diferencias importantes con respecto a los flujos. Una de éstas, y la más importante, es con respecto a la integridad de la información. Con los flujos puedes leer los bytes que tú quieras, pero esto no funciona con tipos diferentes a los bytes. Imagina que quieres usar un flujo para transmitir un entero de 32 bits (el cual requiere 4 bytes), pero a la hora de leer la información lees nada más 3 bytes. Esto es claramente un error que debemos evitar (pero que nos podría pasar), y los mensajes ya lo toman en cuenta; de hecho, en los mensajes no existe el punto de disparo (trigger point).

En uno de los ejemplos de la lección anterior la tarea consumidora escribía bytes individuales, y una vez que se alcanzaba un determinado número de ellos, la tarea consumidora era despertada y consumía todos. Bueno, esto no lo puedes hacer con los mensajes, afortunadamente. Esto es, si utilizando mensajes quieres recibir un arreglo de ints, entonces la tarea productora debe enviar un arreglo de ints con el mismo número de elementos. Si quieres transferir un dato de tipo float, entonces la tarea consumidora transmitiría exactamente 4 bytes y la tarea consumidora tomará exactamente los 4 bytes. Y esto es bueno.

Para que puedas utilizar los mensajes en tus programas asegúrate que el archivo stream_buffer.c sea parte del proceso de compilación. Este archivo ya está incluído cuando descargas FreeRTOS, y en consecuencia, en el proyecto Molcajete también está donde debe de estar, por lo que no debes procuparte por ello. Así mismo, en cada archivo fuente donde uses a los mensajes deberás agregar el encabezado #include <message_buffer.h>.

(Los mensajes están basados en los flujos, por lo que no existe un archivo message_buffer.c, pero lo que sí existe es el archivo de encabezado message_buffer.h.)

También es necesario que la constante INCLUDE_xTaskGetCurrentTaskHandle esté puesta a 1 (en el archivo de configuración FreeRTOSConfig.h):

#define INCLUDE_xTaskGetCurrentTaskHandle 1

Antes de ver las funciones de la API de los mensajes debes tener en cuenta en todo momento la integridad de la información, y para esto FreeRTOS impone unas reglas a los tamaños de los búferes de datos que vamos a utilizar; si estableces un búfer al que le falte un byte de lo que FreeRTOS espera, entonces no habrá transferencia de datos. Y eso es bueno. Los tamaños de los búferes se obtienen a partir de fórmulas simples, pero que hay que seguir al pie de la letra, ya que tienen que ver con la cantidad de elementos que quieres escribir/leer y con el tamaño en bytes de cada elemento. Si los calculas más pequeños de lo que tienen que ser, entonces no habrá transferencia de datos.

Para facilitar el uso de los mensajes en nuestros programas escribí 5 constantes numéricas, las cuales vamos a utilizar en los ejemplos de esta lección. De éstas tú estableces las dos primeras ; las últimas 3 se calculan de manera automática.

#define MESSAGE_ELEMS ( 8 )

Es el número de elementos (no de bytes) que deseas escribir/leer. Esta constante la debes utilizar en ambas funciones xMessageBufferSend() y xMessageBufferReceive() (las cuales explicaré más adelante). El número de elementos puede ir desde 1.

#define MESSAGE_TYPE float

Aquí estableces el tipo de dato del o de los elementos que deseas transmitir. En este ejemplo se trata de variables float, pero pueden ser de tipo compuesto, como lo verás en los ejemplos.

#define MESSAGE_TYPE_SIZE ( sizeof( MESSAGE_TYPE ) )

Calcula el tamaño del arreglo interno subyacente al mensaje. Esta constante la utilizarás en las funciones de creación del mensaje. Más adelante explicaré la fórmula. No deberías manipularlo directamente.

#define MESSAGE_SIZE ( MESSAGE_ELEMS ) * ( MESSAGE_TYPE_SIZE )

Calcula el número de bytes que necesita una transferencia. No deberías manipularlo directamente.

#define MESSAGE_BUFFER_SIZE ( ( MESSAGE_ELEMS * MESSAGE_TYPE_SIZE ) + 4 )

Calcula el número de bytes que necesita un elemento de tipo MESSAGE_TYPE. No deberías manipularlo directamente.

La constante MESSAGE_BUFFER_SIZE requiere su propia explicación. El «manual» (es decir, la documentación oficial) dice que FreeRTOS agrega a la secuencia a transferir el número de bytes del mensaje. Esto es, si tu transferencia es de 100 bytes, entonces se transmitirán 104 bytes (ahorita explico el ‘4’). El problema es el siguiente: imagina que vas a transferir un arreglo de 5 elementos, donde cada elemento ocupa 20 bytes. Y si tú estableces que MESSAGE_BUFFER_SIZE sea de 100 bytes, entonces solamente podrás transferir, en una pasada, 4 de los 5 elementos, y siempre quedarán 16 bytes inutilizables. La solución es establecer MESSAGE_BUFFER_SIZE a 5*20+4=104 (bytes). Con esto aseguras transferir los 5 elementos en una pasada.

¿Y de dónde salió ese ‘4’? El «manual» dice que el número de bytes agregados, esos que guardan el tamaño del mensaje, deben ser equivalentes al número de bytes que ocuparía una variable del tipo size_t (éste es un alias para unsigned int), y la fórmula quedaría así:

#define MESSAGE_BUFFER_SIZE ( ( MESSAGE_ELEMS * MESSAGE_TYPE_SIZE ) + sizeof( size_t ) )

El propio manual explica que un valor común para size_t es de 32 bits, o 4 bytes, lo cual es cierto para la mayoría de sistemas, pero no para el compilador GCC-AVR utilizado por Arduino. En esta plataforma, por lo menos para el chip ATmega328, ¡size_t es de 2 bytes! Y por supuesto, aunque los programas compilarían, no funcionarían como deben (ya le pasó al primo de un amigo). Por lo tanto, en lugar de utilizar size_t en la fórmula me ví obligado a escribir el valor 4. En otras plataformas no debería existir ese problema.

Desarrollo

Los mensajes necesitan un búfer de datos interno, es decir, un lugar dónde guardar cada byte. Este búfer es un arreglo de elementos uint8_t de tamaño MESSAGE_BUFFER_SIZE cuando creas a los objetos. Cuando utilizas creación dinámica de objetos este arreglo se crea de manera automática dentro de la función xMessageBufferCreate(), por lo cual no debes preocuparte de nada (en apariencia. Siempre que te sea posible utiliza la creación estática de objetos, es más segura); mientras que si usas la creación estática de objetos, tú eres responsable de pasarle dicho arreglo a la función xMessageBufferCreateStatic(). A continuación veremos ambas formas.

(Para conocer o recordar la diferencia y la configuración entre objetos dinámicos y estáticos, te recomiendo leer esta lección y esta otra lección, respectivamente. Aunque ambas hablan sobre tareas, la teoría aplica para cualquier tipo de objeto.)

Creación dinámica

La función para crear mensajes dinámicos es:

xBufferSizeBytes. Es el número máximo de bytes que el búfer puede alojar. Para realizar transferencias completas en una sola pasada recuerda agregar size_t bytes extras, es decir, sizeof( size_t ) bytes. Puedes utilizar el valor MESSAGE_BUFFER_SIZE del que hablé hace un momento.

Valor devuelto. Si hubo memoria suficiente para crear al mensaje, entonces la función devuelve el handler; en caso contrario (no hubo memoria suficiente) devuelve NULL. Este handler lo debes de guardar, tanto para que el resto de funciones de flujos sepan sobre cuál van a operar, como para probar que efectivamente fue creado.

Nota que esta función, a diferencia de la función xStreamBufferCreate(), no incluye al parámetro para el punto de disparo, xTriggerLevelBytes.

Por ejemplo, si en tu programa quisieras transferir un arreglo de 8 valores float, entonces tendrías lo siguiente:

#include <FreeRTOS.h>
#include <task.h>
#include <message_buffer.h>

MessageBufferHandle_t g_message_buffer = NULL;
// el handler debe ser global

#define MESSAGE_ELEMS ( 8 )
#define MESSAGE_TYPE float
#define MESSAGE_TYPE_SIZE ( sizeof( MESSAGE_TYPE ) )

#define MESSAGE_BUFFER_SIZE ( ( MESSAGE_ELEMS * MESSAGE_TYPE_SIZE ) + sizeof( size_t ) )
// forma general que NO funciona para el ATmega328, pero en el ejemplo
// completo dado más adelante, ya lo arreglamos

#define MESSAGE_SIZE ( MESSAGE_ELEMS ) * ( MESSAGE_TYPE_SIZE )

void setup()
{
   // código ...

   g_message_buffer = xMessageBufferCreate( MESSAGE_BUFFER_SIZE );

   configASSERT( g_message_buffer );
   // Error creando al mensaje

   // más código ...
}

El handler, g_message_buffer, es global porque debe ser visible tanto en la tarea productora como en la consumidora.

La creación dinámica de objetos no asegura que los objetos se creen, principalmente por escasez de memoria RAM. Por eso siempre deberías preguntar si el objeto realmente fue creado, y aunque existen diversas maneras de hacerlo, en este ejemplo escogí la macro configASSERT() que es una versión propia de la conocida assert() de C (en el archivo FreeRTOSConfig.h podrás ver la implementación que utilicé para el projecto Molcajete). Por supuesto que puedes usar al condicional if y tratar de recuperarte sin terminar el programa de manera abrupta.

Creación estática

Ahora veamos cómo crear a un mensaje de manera estática. Como es usual, esta forma de creación de objetos necesita más pasos, pero es mucho más segura ya que los objetos siempre serán creados.

xBufferSizeBytes. Es el número máximo de bytes que el búfer puede alojar. Para realizar transferencias completas en una sola pasada recuerda agregar size_t bytes extras, es decir, sizeof( size_t ) bytes. Puedes utilizar el valor MESSAGE_BUFFER_SIZE del que hablé hace un momento.

pucMessageBufferStorageArea. Es el búfer interno del mensaje donde se guardarán los datos, y no es otra cosa que un arreglo de elementos de tipo uint8_t de tamaño al menos xBufferSizeBytes + 1. Este arreglo deberá existir a lo largo del programa, por lo cual deberás crearlo de manera global o estática a la función donde fue llamada xMessageBufferCreateStatic().

pxStaticMessageBuffer. Esta es la variable que guarda el estado del mensaje, es decir, la estructura de datos que maneja al mensaje (el mensaje es algo más que un simple arreglo). Al igual que el parámetro pucMessageBufferStorageArea, deberá existir a lo largo del programa.

Valor devuelto. El handler al mensaje. La creación estática de objetos nunca falla, a menos que el arreglo o la variable de estado sean NULL, cosa que tal vez nunca suceda.

#include <FreeRTOS.h>
#include <task.h>
#include <message_buffer.h>

MessageBufferHandle_t g_message_buffer = NULL;
// el handler debe ser global

#define MESSAGE_ELEMS ( 8 )
#define MESSAGE_TYPE float
#define MESSAGE_TYPE_SIZE ( sizeof( MESSAGE_TYPE ) )

#define MESSAGE_BUFFER_SIZE ( ( MESSAGE_ELEMS * MESSAGE_TYPE_SIZE ) + sizeof( size_t ) )
// forma general que NO funciona para el ATmega328, pero en el ejemplo
// completo dado más adelante, ya lo arreglamos

#define MESSAGE_SIZE ( MESSAGE_ELEMS ) * ( MESSAGE_TYPE_SIZE )

void setup()
{
   // código

   static uint8_t message_buffer_array[ MESSAGE_BUFFER_SIZE + 1 ];
   // arreglo subyacente para el búfer interno

   static StaticMessageBuffer_t message_buffer_struct;
   // variable de estado que mantiene a la estructura del mensaje

   g_message_buffer =              // handler (debe ser global)
   xMessageBufferCreateStatic( 
         MESSAGE_BUFFER_SIZE,      // tamaño del búfer interno
         message_buffer_array,     // arreglo subyacente que hace las veces de búfer interno
         &message_buffer_struct ); // variable de estado

   // los objetos estáticos siempre se crean, así que no hay necesidad de
   // preguntar si el mensaje fue creado o no

   // más código ...
}

El tamaño del arreglo subyacente (message_buffer_array[]), es decir, el búfer donde se guardan los datos, debe ser un byte más grande que el establecido en el argumento xBufferSizeBytes. En este ejemplo marqué a las variables message_buffer_array y message_buffer_struct como static; esto para que perduren durante todo el programa, pero que a la vez sean invisibles al resto de funciones. Una alternativa es que las declares globales, aunque entre menos variables globales tengas en tus programas, mejor. Sin embargo el handler, g_message_buffer, sí que debe ser global porque tanto la tarea productora como la consumidora deben tener acceso a él. Este es un ejemplo de las veces en que sí son necesarias las variables globales. El prefijo g_ nos recuerda que es una variable global y que debemos tener mucho cuidado con ella.

Escribiendo y leyendo al mensaje

Una vez que el mensaje ha sido creado ya podemos utilizarlo escribiendo en y leyendo desde él. Los datos que deseas transmitir se deben encontrar en un búfer de tu aplicación, y dependiendo de lo que desees transmitir este búfer puede ser un arreglo o una variable, ya sean tipos básicos o tipos compuestos. En los ejemplos que veremos utilizamos ambas para que veas cómo se utilizan.

La función para escribir en el mensaje es:

xMessageBuffer. Es el handler del mensaje en el que queremos escribir.

pvTxData. Es un apuntador al búfer que queremos transmitir. Dicho búfer lo deberás promocionar a void*. Es importante que recuerdes que los elementos de este búfer son copiados al mensaje.

xDataLengthBytes. Es la cantidad máxima de bytes que deseas escribir al mensaje en una sola llamada. El número lo obtienes de multiplicar el número de elementos por el tamaño en bytes que ocupa cada elemento. Puedes utilizar el valor MESSAGE_SIZE del que hablé hace un momento.

xTicksToWait. Si no hay espacio disponible para una nueva copia de xDataLengthBytes bytes en el mensaje, entonces la tarea productora (la que llama a esta función) entrará al estado Blocked por el tiempo determinado en este parámetro. Si el tiempo expira la tarea pasará al estado Ready. El tiempo de espera está en ticks y puedes usar la macro pdMS_TO_TICKS() para convertir milisegundos a ticks. Así mismo, si escribes 0 la función regresa inmediatamente, y si escribes portMAX_DELAY el tiempo de espera es infinito (siempre y cuando la constante simbólica INCLUDE_vTaskSuspend esté a 1).

Valor devuelto. Esta función devuelve el número de bytes escritos al mensaje. Para asegurarte de mantener la integridad de la información, siempre deberías verificar este valor. En los ejemplos te muestro cómo hacerlo.

La función para leer desde el mensaje es:

xMessageBuffer. Es el handler del mensaje del que queremos leer.

pvRxData. Es un apuntador al búfer en el que los datos leídos se van a guardar. Dicho búfer lo deberás promocionar a void*. Es importante que recuerdes que los elementos de este búfer son copiados desde el mensaje al búfer.

xBufferLengthBytes. Es la cantidad máxima de bytes que deseas leer desde el mensaje en una sola llamada. El número lo obtienes de multiplicar el número de elementos por el tamaño en bytes que ocupa cada elemento. Puedes utilizar el valor MESSAGE_SIZE del que hablé hace un momento.

xTicksToWait. Es el tiempo de espera máximo que la tarea consumidora estará en el estado Blocked mientras arriban bytes a partir de que el búfer estuviera vacío. El tiempo de espera está en ticks y puedes usar la macro pdMS_TO_TICKS() para convertir milisegundos a ticks. Así mismo, si escribes 0 la función regresa inmediatamente, y si escribes portMAX_DELAY el tiempo de espera es infinito (siempre y cuando la constante simbólica INCLUDE_vTaskSuspend esté a 1).

Valor devuelto. Esta función devuelve el número de bytes leídos desde el flujo. Para asegurarte de mantener la integridad de la información, siempre deberías verificar este valor. En los ejemplos te muestro cómo hacerlo.

Ejemplos

Dada la importancia de transferir información de tipos básicos y compuestos creí conveniente realizar 4 ejemplos que muestran las diferentes formas en que podemos usar a los mensajes en nuestros programas:

  1. Transfiere un arreglo de valores float. El mensaje se crea de forma dinámica.
  2. Transfiere un valor float. El mensaje se crea de forma estática.
  3. Transfiere un arreglo de variables compuestas. El mensaje se crea de forma estática.
  4. Transfiere una variable compuesta. El mensaje se crea de forma estática.

Para los ejemplos 3 y 4 tomé como plantilla al ejemplo 2, de ahí que los 3 ejemplos (2, 3 y 4) utilicé la creación estática de mensajes. Sin embargo, con lo visto en el ejemplo 1 podrás crear mensajes dinámicos sin ningún inconveniente.

Nota cómo, cuándo y dónde utilicé las constantes simbólicas que vimos hace un momento. Utilizarlas te facilitará la programación y te ahorrará los problemas que yo enfrenté. En particular las funciones xStreamBufferSend() y xStreamBufferReceive() utilizan al mismo valor MESSAGE_SIZE para indicar el número de bytes que se escriben y se leen.

Hace un rato mencioné que la fórmula general para obtener el tamaño del búfer interno del mensaje es:

#define MESSAGE_BUFFER_SIZE ( ( MESSAGE_ELEMS * MESSAGE_TYPE_SIZE ) + sizeof( size_t ) )

Pero para el caso del ATmega328, utilizando al compilador GCC-AVR, en todos los ejemplos la he modificado a:

#define MESSAGE_BUFFER_SIZE ( ( MESSAGE_ELEMS * MESSAGE_TYPE_SIZE ) + 4 )

Si tu compilador te entrega como resultado de sizeof( size_t ) el valor 4, entonces utiliza la fórmula general (para que no te involucres con magic numbers).

En las funciones de creación de las tareas notarás que utilicé como tamaño de pila el valor 128*3:

void setup()
{
   xTaskCreate( producer_task, "PROD", 128 * 3, NULL, tskIDLE_PRIORITY + 1, NULL );
   xTaskCreate( consumer_task, "CONS", 128 * 3, NULL, tskIDLE_PRIORITY, NULL );
   // ...
}

En lecciones anteriores estuve usando el valor 128, porque a los más declarábamos una o dos variables dentro de las tareas. Sin embargo, en esta lección (y en la anterior) hemos estado declarando arreglos de valores dentro de las tareas, y la memoria para estos arreglos es tomada de la pila de la tarea, por lo cual el tamaño de la pila debe considerarlos. Quizás exageré en un valor tan grande, pero tú siempre podrás calcularlo de acuerdo a las necesidades de tus aplicaciones.

En la lección anterior ya había advertido de lo miserable que se comportan las funciones Serial.print() y similares en programas concurrentes. Por supuesto que este triste comportamiento también aparece con los mensajes. Por eso notarás que en los ejemplos agregué información de depuración que puede ser habilitada o deshabilitada a tu gusto, estableciendo un valor conveniente en la constante USER_DBG:

#define USER_DBG 1
// Para habilitar/deshabilitar información de depuración:
// 0: Deshabilitada
// 1: Información básica
// 2: Más información, pero puede hacer que el sistema falle (debido a Serial.print())

Ejemplo 1: Transfiere un arreglo de valores float. El mensaje se crea de forma dinámica.

Este ejemplo muestra cómo transferir un arreglo de valores float. Puedes modificarlo para usarlo con valores enteros, int, o reales dobles, double.

Así mismo, y como ya había mencionado, deberías de verificar que el número de bytes transmitidos efectivamente es el mismo que tú especificaste:

if( bytes_sent != MESSAGE_SIZE ){ // timeout:
   Serial.print( "TO(W): " );
} 

Obviamente debes hacer la misma prueba del lado de la función consumidora:

if( received_bytes < MESSAGE_SIZE ) { // timeover:
   Serial.println( "TO(R)" );
} 

El ejemplo completo es el siguiente:

#include <FreeRTOS.h>
#include <task.h>
#include <message_buffer.h>


MessageBufferHandle_t g_message_buffer = NULL;


#define MESSAGE_ELEMS ( 8 )
// ¿Cuántos elementos tendrá el búfer de mensajes del USUARIO?

#define MESSAGE_TYPE float
// ¿De qué tipo es el búfer de mensajes?

#define MESSAGE_TYPE_SIZE ( sizeof( MESSAGE_TYPE ) )
// ¿Cuántos bytes ocupa una variable de tipo MESSAGE_TYPE?

#define MESSAGE_BUFFER_SIZE ( ( MESSAGE_ELEMS * MESSAGE_TYPE_SIZE ) + 4 )
// ¿Cuántos bytes ocupa el búfer INTERNO de mensajes?

#define MESSAGE_SIZE ( MESSAGE_ELEMS ) * ( MESSAGE_TYPE_SIZE )
// ¿Cuántos bytes ocupa el stream de mensajes que se transmitirá/recibirá?


#define USER_DBG 1
// Para habilitar/deshabilitar información de depuración:
// 0: Deshabilitada
// 1: Información básica
// 2: Más información, pero puede hacer que el sistema falle (debido a Serial.print())


void producer_task( void* pvParameters )
{
   MESSAGE_TYPE out_buffer[ MESSAGE_ELEMS ];

   uint8_t index = 0;

   float data = 0.5;

   while( 1 )
   {
      vTaskDelay( pdMS_TO_TICKS( 500 ) );

      for( uint8_t i = 0; i < MESSAGE_ELEMS; ++i ){
         
         out_buffer[ i ] = data;

         data += 0.5;

      }

      size_t bytes_sent = 
         xMessageBufferSend( 
            g_message_buffer,
            (void*) out_buffer,
            MESSAGE_SIZE,
            pdMS_TO_TICKS( 50 ) );

#if USER_DBG > 0
      Serial.print( "ENVIADOS: " );
      Serial.println( bytes_sent );
#endif

      if( bytes_sent != MESSAGE_SIZE ){ // timeout:
         Serial.print( "TO(W): " );
      } 
   }
}

void consumer_task( void* pvParameters )
{
   MESSAGE_TYPE in_buffer[ MESSAGE_ELEMS ];

   while( 1 )
   {
      size_t received_bytes = 
         xMessageBufferReceive(
            g_message_buffer,
            (void*) in_buffer,
            MESSAGE_SIZE,
            pdMS_TO_TICKS( 1000 ) );

#if USER_DBG > 0
      Serial.print( "RECIBIDOS: " );
      Serial.println( received_bytes );
#endif

      if( received_bytes < MESSAGE_SIZE ) { // timeover:
         Serial.println( "TO(R)" );
      } 

#if USER_DBG > 1
      else{
         for( size_t i = 0; i < received_bytes; ++i ){
            Serial.println( in_buffer[ i ] );
         }
      }
#endif
   }
}


void setup()
{
   xTaskCreate( producer_task, "PROD", 128 * 3, NULL, tskIDLE_PRIORITY + 1, NULL );

   xTaskCreate( consumer_task, "CONS", 128 * 3, NULL, tskIDLE_PRIORITY, NULL );

   g_message_buffer = xMessageBufferCreate( MESSAGE_BUFFER_SIZE );

   configASSERT( g_message_buffer );
   // Error creando al flujo

   pinMode( 13, OUTPUT );

   Serial.begin( 115200 );

   vTaskStartScheduler();
}

void loop() {}

Una salida de este programa con la constante USER_DBG puesta a 1 es:

Ejemplo 2: Transfiere un valor float. El mensaje se crea de forma estática

Este ejemplo muestra cómo transferir un sólo valor float. Puedes modificarlo para usarlo con valores enteros, int, o reales dobles, double. Presta especial atención a la constante MESSAGE_ELEMS la cual está puesta a 1, ya que es éste el número de elementos que queremos transferir. Así mismo, como no tenemos necesidad de declarar un arreglo, pasaremos a las funciones de escritura/lectura la dirección de una variable float, la cual hace las veces de búfer (sí, una simple variable puede ser un búfer, todo depende del contexto).

#include <FreeRTOS.h>
#include <task.h>
#include <message_buffer.h>


MessageBufferHandle_t g_message_buffer = NULL;

#define MESSAGE_ELEMS ( 1 )
// ¿Cuántos elementos tendrá el búfer de mensajes del USUARIO?

#define MESSAGE_TYPE float
// ¿De qué tipo es el búfer de mensajes?

#define MESSAGE_TYPE_SIZE ( sizeof( MESSAGE_TYPE ) )
// ¿Cuántos bytes ocupa una variable de tipo MESSAGE_TYPE?

#define MESSAGE_BUFFER_SIZE ( ( MESSAGE_ELEMS * MESSAGE_TYPE_SIZE ) + 4 )
// ¿Cuántos bytes ocupa el búfer INTERNO de mensajes?

#define MESSAGE_SIZE ( MESSAGE_ELEMS ) * ( MESSAGE_TYPE_SIZE )
// ¿Cuántos bytes ocupa el stream de mensajes que se transmitirá/recibirá?


#define USER_DBG 2
// Para habilitar/deshabilitar información de depuración:
// 0: Deshabilitada
// 1: Información básica
// 2: Más información, pero puede hacer que el sistema falle (debido a Serial.print())


void producer_task( void* pvParameters )
{
   float data = 0.0;

   while( 1 )
   {

      vTaskDelay( pdMS_TO_TICKS( 500 ) );
      data += 0.5;

      size_t bytes_sent = 
         xMessageBufferSend( 
            g_message_buffer,
            (void*) &data,
            MESSAGE_SIZE,
            pdMS_TO_TICKS( 50 ) );

#if USER_DBG > 0
      Serial.print( "ENVIADOS: " );
      Serial.println( bytes_sent );
#endif

      if( bytes_sent != MESSAGE_SIZE ){ // timeout:
         Serial.print( "TO(W): " );
      } 
   }
}

void consumer_task( void* pvParameters )
{
   float data = 0.0;

   while( 1 )
   {
      size_t received_bytes = 
         xMessageBufferReceive(
            g_message_buffer,
            (void*) &data,
            MESSAGE_SIZE,
            pdMS_TO_TICKS( 1000 ) );

#if USER_DBG > 0
      Serial.print( "RECIBIDOS: " );
      Serial.println( received_bytes );
#endif

      if( received_bytes < MESSAGE_SIZE ) { // timeover:
         Serial.println( "TO(R)" );
      } 

#if USER_DBG > 1
      else{
         Serial.print( "DATA: " );
         Serial.println( data );
      }
#endif
   }
}

void setup()
{
   xTaskCreate( producer_task, "PROD", 128 * 3, NULL, tskIDLE_PRIORITY + 1, NULL );

   xTaskCreate( consumer_task, "CONS", 128 * 3, NULL, tskIDLE_PRIORITY, NULL );


   static uint8_t message_buffer_array[ MESSAGE_BUFFER_SIZE + 1 ];
   // arreglo subyacente para el búfer interno

   static StaticMessageBuffer_t message_buffer_struct;
   // variable de estado que mantiene a la estructura del mensaje

   g_message_buffer = 
   xMessageBufferCreateStatic( 
         MESSAGE_BUFFER_SIZE,
         message_buffer_array,
         &message_buffer_struct );


   pinMode( 13, OUTPUT );

   Serial.begin( 115200 );

   vTaskStartScheduler();
}

void loop() {}

La salida de este programa, con la constante USER_DBG puesta a 2 para imprimir el valor transferido, ya que la carga de Serial.print() no es tan alta (es sólo un valor float), es:

Ejemplo 3:Transfiere un arreglo de variables compuestas. El mensaje se crea de forma estática

Aunque es muy útil transferir variables o arreglos de tipos básicos, es igual de útil transferir información más compleja. En este ejemplo (y en el siguiente) tendremos un tipo compuesto Message (tú puedes darle el nombre que mejor se adapte en tu aplicación):

typedef struct Message_t
{
   uint16_t cont;
   float temp;
} Message;

La idea de este tipo compuesto es tener dos variables de diferentes tipos lógicamente relacionadas bajo un mismo nombre (lo cual coincide con la definición de estructura en C). El campo cont simplemente es un contador ascendente, mientras que el campo temp es la temperatura ambiente (a través de un sensor LM35). El ejemplo transferirá un arreglo de 4 elementos de tipo Message.

Nota que la constante MESSAGE_TYPE ahora está puesta al nombre que le dimos a la estructura, es decir:

#define MESSAGE_TYPE Message

El ejemplo completo es:

#include <FreeRTOS.h>
#include <task.h>
#include <message_buffer.h>


MessageBufferHandle_t g_message_buffer = NULL;

typedef struct Message_t
{
   uint16_t cont;
   float temp;
} Message;


#define MESSAGE_ELEMS ( 4 )
// ¿Cuántos elementos tendrá el búfer de mensajes del USUARIO?

#define MESSAGE_TYPE Message
// ¿De qué tipo es el búfer de mensajes?

#define MESSAGE_TYPE_SIZE ( sizeof( MESSAGE_TYPE ) )
// ¿Cuántos bytes ocupa una variable de tipo MESSAGE_TYPE?

#define MESSAGE_BUFFER_SIZE ( ( MESSAGE_ELEMS * MESSAGE_TYPE_SIZE ) + 4 )
// ¿Cuántos bytes ocupa el búfer INTERNO de mensajes?

#define MESSAGE_SIZE ( MESSAGE_ELEMS ) * ( MESSAGE_TYPE_SIZE )
// ¿Cuántos bytes ocupa el stream de mensajes que se transmitirá/recibirá?


#define USER_DBG 1
// Para habilitar/deshabilitar información de depuración:
// 0: Deshabilitada
// 1: Información básica
// 2: Más información, pero puede hacer que el sistema falle (debido a Serial.print())



void producer_task( void* pvParameters )
{
   Message data_tx[ MESSAGE_ELEMS ] = { 0 };

   while( 1 )
   {
      for( size_t i = 0; i < MESSAGE_ELEMS; ++i ){

         vTaskDelay( pdMS_TO_TICKS( 100 ) );

         data_tx[ i ].cont++;
         data_tx[ i ].temp = ( analogRead( A0 ) * 5.0 * 100.0 ) / 1024.0;
      }

      size_t bytes_sent = 
         xMessageBufferSend( 
            g_message_buffer,
            (void*) data_tx,
            MESSAGE_SIZE,
            pdMS_TO_TICKS( 50 ) );

#if USER_DBG > 0
      Serial.print( "ENVIADOS: " );
      Serial.println( bytes_sent );
#endif

      if( bytes_sent != MESSAGE_SIZE ){ // timeout:
         Serial.print( "TO(W): " );
      } 
   }
}

void consumer_task( void* pvParameters )
{
   Message data_rx[ MESSAGE_ELEMS ] = { 0 };

   while( 1 )
   {
      size_t received_bytes = 
         xMessageBufferReceive(
            g_message_buffer,
            (void*) data_rx,
            MESSAGE_SIZE,
            pdMS_TO_TICKS( 1000 ) );

#if USER_DBG > 0
      Serial.print( "RECIBIDOS: " );
      Serial.println( received_bytes );
#endif

      if( received_bytes < MESSAGE_SIZE ) { // timeover:
         Serial.println( "TO(R)" );
      } 

#if USER_DBG > 1
      else{
         for( size_t i = 0; i < MESSAGE_ELEMS; ++i ){

            Serial.print( "CONT: " );
            Serial.print( data_rx[ i ].cont );

            if( data_rx[ i ].temp > 23.0 ){
               Serial.print( ", TEMP: " );
               Serial.print( data_rx[ i ].temp );
            }
            Serial.print( "\n" );
         }
      }
#endif
   }
}


void setup()
{
   xTaskCreate( producer_task, "PROD", 128 * 3, NULL, tskIDLE_PRIORITY + 1, NULL );

   xTaskCreate( consumer_task, "CONS", 128 * 3, NULL, tskIDLE_PRIORITY, NULL );


   static uint8_t message_buffer_array[ MESSAGE_BUFFER_SIZE + 1 ];
   // arreglo subyacente para el búfer interno

   static StaticMessageBuffer_t message_buffer_struct;
   // variable de estado que mantiene a la estructura del mensaje


   g_message_buffer = 
   xMessageBufferCreateStatic( 
         MESSAGE_BUFFER_SIZE,
         message_buffer_array,
         &message_buffer_struct );


   pinMode( 13, OUTPUT );

   Serial.begin( 115200 );

   Serial.println( "\nRESET" );

   vTaskStartScheduler();
}

void loop() {}

La salida de este programa con la constante USER_DBG puesta a 1 es:

Ejemplo 4: Transfiere una variable compuesta. El mensaje se crea de forma estática

Este último ejemplo muestra cómo transferir una variable simple de tipo Message. Las tareas productora y consumidora declaran sendas variables del tipo mencionado, mientras que la constante MESSAGE_ELEMS toma el valor 1:

#define MESSAGE_ELEMS ( 1 )

void producer_task( void* pvParameters )
{
   Message data_tx = { 0 };
   // ...
}

void consumer_task( void* pvParameters )
{
   Message data_rx = { 0 };
   // ...
}

El ejemplo completo es:

#include <FreeRTOS.h>
#include <task.h>
#include <message_buffer.h>


MessageBufferHandle_t g_message_buffer = NULL;

typedef struct Message_t
{
   uint16_t cont;
   float temp;
} Message;


#define MESSAGE_ELEMS ( 1 )
// ¿Cuántos elementos tendrá el búfer de mensajes del USUARIO?

#define MESSAGE_TYPE Message
// ¿De qué tipo es el búfer de mensajes?

#define MESSAGE_TYPE_SIZE ( sizeof( MESSAGE_TYPE ) )
// ¿Cuántos bytes ocupa una variable de tipo MESSAGE_TYPE?

#define MESSAGE_BUFFER_SIZE ( ( MESSAGE_ELEMS * MESSAGE_TYPE_SIZE ) + 4 )
// ¿Cuántos bytes ocupa el búfer INTERNO de mensajes?

#define MESSAGE_SIZE ( MESSAGE_ELEMS ) * ( MESSAGE_TYPE_SIZE )
// ¿Cuántos bytes ocupa el stream de mensajes que se transmitirá/recibirá?


#define USER_DBG 1
// Para habilitar/deshabilitar información de depuración:
// 0: Deshabilitada
// 1: Información básica
// 2: Más información, pero puede hacer que el sistema falle (debido a Serial.print())


void producer_task( void* pvParameters )
{
   Message data_tx = { 0 };

   while( 1 )
   {
      vTaskDelay( pdMS_TO_TICKS( 500 ) );

      data_tx.cont++;
      data_tx.temp = ( analogRead( A0 ) * 5.0 * 100.0 ) / 1024.0;

      size_t bytes_sent = 
         xMessageBufferSend( 
            g_message_buffer,
            (void*) &data_tx,
            MESSAGE_SIZE,
            pdMS_TO_TICKS( 50 ) );

#if USER_DBG > 0
      Serial.print( "ENVIADOS: " );
      Serial.println( bytes_sent );
#endif

      if( bytes_sent != MESSAGE_SIZE ){ // timeout:
         Serial.print( "TO(W): " );
      } 
   }
}

void consumer_task( void* pvParameters )
{
   Message data_rx = { 0 };

   while( 1 )
   {
      size_t received_bytes = 
         xMessageBufferReceive(
            g_message_buffer,
            (void*) &data_rx,
            MESSAGE_SIZE,
            pdMS_TO_TICKS( 1000 ) );

#if USER_DBG > 0
      Serial.print( "RECIBIDOS: " );
      Serial.println( received_bytes );
#endif

      if( received_bytes < MESSAGE_SIZE ) { // timeover:
         Serial.println( "TO(R)" );
      } 

#if USER_DBG > 1
      else{
         Serial.print( "CONT: " );
         Serial.print( data_rx.cont );
         Serial.print( ", TEMP: " );
         Serial.println( data_rx.temp );
      }
#endif
   }
}


void setup()
{
   xTaskCreate( producer_task, "PROD", 128 * 3, NULL, tskIDLE_PRIORITY + 1, NULL );

   xTaskCreate( consumer_task, "CONS", 128 * 3, NULL, tskIDLE_PRIORITY, NULL );


   static uint8_t message_buffer_array[ MESSAGE_BUFFER_SIZE + 1 ];
   // arreglo subyacente para el búfer interno

   static StaticMessageBuffer_t message_buffer_struct;
   // variable de estado que mantiene a la estructura del mensaje

   g_message_buffer = 
   xMessageBufferCreateStatic( 
         MESSAGE_BUFFER_SIZE,
         message_buffer_array,
         &message_buffer_struct );


   pinMode( 13, OUTPUT );

   Serial.begin( 115200 );

   Serial.println( "\nRESET" );

   vTaskStartScheduler();
}

void loop() {}

La salida de este programa con la constante USER_DBG puesta a 1 es:

Otras funciones de la API

Hemos estudiado apenas las funciones más útiles de la API, sin embargo quedan algunas que es importante que las conozcas (aquí podrás ver la API completa).

ESCRITURA Y LECTURA DESDE INTERRUPCIONES

En lecciones anteriores hablé de funciones espejo que deben ser utilizadas cuando son llamadas desde dentro de una interrupción (ISR); a este tipo de funciones FreeRTOS le llama interrupt safe version. Los mensajes tienen dos:

Los tres primeros parámetros y el valor devuelto son lo mismo que sus contrapartes. Pero observa dos diferencias fundamentales:

  • No tienen al parámetro xTicksToWait; esto es así porque una ISR no debe bloquearse jamás.
  • Las funciones ponen a pdTRUE el parámetro pxHigherPriorityTaskWoken si la llamada a éstas implica que una tarea de alta prioridad pasó del estado Blocked al estado Ready y es necesario hacer un cambio de contexto, es decir, pasarla al estado Run.

El siguiente fragmento muestra su posible uso (no lo probé, pero así se usa):

void producer_ISR()
{
   isr_ack();
   // "reconocemos" la interrupción (ack -> acknowledge)


   BaseType_t pxHigherPriorityTaskWoken = pdFALSE;
   // siempre debe ser inicializada a pdFALSE

   static uint32_t data = 0;

   size_t bytes_sent = 
     xMessageBufferSendFromISR( 
        g_message_buffer,
        (void*) &data,
        MESSAGE_SIZE,
        &pxHigherPriorityTaskWoken );

   ++data;
   // simulamos la información

   if( bytes_sent != MESSAGE_SIZE ){
      // no hubo espacio en el flujo
   }

   if( pxHigherPriorityTaskWoken != pdFALSE ) taskYIELD;
   // cambio de contexto opcional, pero recomendable
}

FUNCIONES AUXILIARES

La API de los mensajes incluye una serie de funciones para preguntar o alterar el estado del flujo:

  • BaseType_t xMessageBufferIsEmpty( MessageBufferHandle_t xMessageBuffer ). Devuelve pdTRUE si el mensaje indicado en xMessageBuffer está vacío, o pdFALSE en caso contrario.
  • BaseType_t xMessageBufferIsFull( MessageBufferHandle_t xMessageBuffer ). Devuelve pdTRUE si el mensaje indicado en xMessageBuffer está lleno, o pdFALSE en caso contrario.
  • size_t xMessageBufferBytesAvailable( MessageBufferHandle_t xMessageBuffer ). Devuelve el número de elementos actualmente en el mensaje indicado por xMessageBuffer que se pueden leer.
  • size_t xMessageBufferSpacesAvailable( MessageBufferHandle_t xMessageBuffer ). Devuelve el número de elementos libres en el mensaje indicado por xMessageBuffer, es decir, cuánto espacio queda libre.
  • BaseType_t xMessageBufferReset( MessageBufferHandle_t xMessageBuffer ). Pone a su estado inicial al mensaje indicado por xMessageBuffer.

¿Qué sigue?

Transferir información de una tarea a otra, o de una interrupción a una tarea, es una actividad muy común y muy importante en nuestros sistemas embebidos. En la lección de hoy vimos cómo transferir datos de tipos simples (diferentes de bytes) y de tipos compuestos utilizando el mecanismo de FreeRTOS, mensajes (en su nomenclatura oficial, message buffers). Los mensajes están basados en los flujos, pero salvaguardan la integridad de la información evitando que leas 3 bytes de un entero de 32 bits, o 1 byte de un número real de 4 bytes.

RECUERDA: Utiliza las constantes que te di al principio de esta lección, en lugar de calcular por tu cuenta los diferentes tamaños de búferes y bytes a transferir.

RECUERDA: Siempre que te sea posible utiliza objetos estáticos.

RECUERDA: Si vas a llamar a una función de FreeRTOS desde una interrupción, verifica que ésta sea interrupt safe version, es decir, tenga terminación FromISR.

RECUERDA: Los mensajes se utilizan para transferir todos los tipos básicos de C y tipos compuestos. Si tu intención es transferir bytes sin ningún formato, ni inicio ni fin, entonces deberías utilizar los flujos.

En la siguiente lección estudiaremos los semáforos mutex, los cuales nos pueden ayudar a que un recurso no sea accesado por más de una tarea al mismo tiempo.

(En esta página encontrarás información de suma importancia sobre los flujos, mensajes y notificaciones. Deberías leerla en cuanto te sea posible.)

Índice del curso

Espero que esta entrada haya sido de tu interés. Si fue así podrías suscribirte a mi blog, o escríbeme a fjrg76 dot hotmail dot com, o comparte esta entrada con alguien que consideres que puede serle de ayuda.


Si encuentras este blog interesante, entonces podrías considerar suscribirte a él y recibir información relevante sobre tecnología y sistemas embebidos, y de vez en cuando, uno que otro regalo.