Comunicación inter-tareas (I): Flujos (stream buffers)

En los sistemas concurrentes y multitarea las diferentes tareas deben pasarse información; de otra manera el sistema no tendría utilidad. En los sistemas de cómputo tradicional con una o varias CPUs el sistema operativo reserva una área de la memoria RAM para escribir y leer información; a esta memoria se le conoce como memoria compartida, y dependiendo de la complejidad del sistema ésta podría estar protegida. En esta memoria compartida uno o varios procesos escriben, y uno o varios procesos leen, todo bajo la supervisión del sistema operativo, y quizás, de la propia CPU.

En nuestros sistemas embebidos de pequeña escala la memoria compartida podría tomar la forma de variables globales, arreglos globales o listas enlazadas globales, y casi siempre sin intervención ni supervisión de un sistema operativo. Esto es, podríamos declarar una variable global que sea escrita por una o más tareas y leída por una o más tareas, pero aunque este esquema es simple, es inseguro e ineficiente: ¿cómo sabemos que el dato está listo para ser leído?, ¿cómo evitamos que dos o más tareas escriban al mismo tiempo en la variable?

Para lograr una comunicación inter-tareas (inter-task communication, en inglés) segura y eficiente debemos utilizar esquemas más elaborados, de los cuales, afortunadamente, FreeRTOS incluye una cantidad de ellos, a saber:

  • Notificaciones. Sirven para pasar un dato de hasta 32 bits, y ya las estudiamos en esta lección.
  • Flujos. Sirven para pasar una cadena de bytes (sin formato) de una tarea a otra. Es el tema de la lección de hoy.
  • Mensajes. Sirven para pasar objetos (con formato) de una tarea a otra. Es el tema de la siguiente lección.
  • Colas. Igual que los mensajes. Históricamente aparecieron en FreeRTOS antes que los mensajes, y tienen ventajas sobre éstos, por ejemplo, diferentes tareas pueden escribir a la cola, y diferentes tareas pueden leer de la cola; esto no es posible ni con los fujos ni mensajes. Es el tema de una siguiente lección.

Las notificaciones, flujos y mensajes son de reciente aparición en FreeRTOS y han sido implementados como una opción ligera (consume menos recursos, pero con algunas restricciones) a las colas, las cuales existen prácticamente desde el inicio de de FreeRTOS.

Tabla de contenidos

¿Qué es y qué necesito?

Un flujo (stream buffer, en FreeRTOS) es una secuencia de bytes (es decir, datos de tipo uint8_t) para pasar información de una tarea a otra, o de una interrupción a una tarea. Como veremos más adelante, podemos pasar de una tarea a otra un arreglo completo de datos uint8_t, o juntar datos uint8_t individuales hasta completar un cierto número, después de lo cual la tarea consumidora los consumirá.

Hago énfasis en que el tipo de dato debe ser uint8_t, ya sea un arreglo o variable simple, porque pretender utilizar los flujos para pasar datos de tipos diferentes al mencionado (int, float, struct) conlleva una serie de problemas, que si no son resueltos, corromperán los datos (ya me pasó).

A TOMAR EN CUENTA: Los flujos (stream buffers, en FreeRTOS) se utilizan para pasar datos cuyo tipo nativo sea uint_8 (o su equivalente unsigned char). Si necesitas pasar tipos simples (int, float) o tipos compuestos (struct), entonces debes usar los mensajes (message buffers, en FreeRTOS).

Mucha de la información generada y procesada en nuestros sistemas embebidos es de tipo uint8_t:

  • Los datos que entran y salen por las UARTs.
  • Los datos que entran y salen por los canales SPI desde/hacia periféricos.
  • Los datos que entran y salen por los canales I2C e IIS desde/hacia periféricos.

Cuando te encuentres en uno de estos casos, entre otros muchos que no he enlistado, entonces utiliza los flujos.

Algo que debes saber cuando utilizas flujos (y mensajes y colas) es que cada byte se escribe y se lee por copia, en lugar de por referencia. Supogamos que en tu tarea tienes un búfer de datos para transmitir (un búfer es un arreglo) de 80 bytes. Cuando haces la llamada a la función xStreamBufferSend() (la cual veremos en un momento) los 80 bytes de tu búfer se copian al búfer interno del flujo (este búfer interno es un arreglo de tipo uint8_t). Cuando la tarea consumidora hace la llamada a la función xStreamBufferReceive() (la cual veremos en un momento) los 80 bytes del búfer interno del flujo se copian al arreglo que hayas declarado como búfer de recepción.

Otra cosa que debes saber cuando utilizas flujos y mensajes es que éstos están optmizados para operar tarea-a-tarea, es decir, una sola tarea escribe al flujo, y una sola tarea lee del flujo. No están hechos para que dos o más tareas escriban, o dos o más tareas lean desde el mismo flujo o mensaje, pero tampoco implementan un mecanismo que lo evite. Tú como programador eres responsable de que eso no pase. Pero si en verdad necesitas que un mismo flujo sea accesado por dos o más tareas, entonces deberás utilizar mecanismos que supervisen que solamente una tarea está accesando al flujo en un determinado momento. En FreeRTOS puedes utilizar los semáforos mutex (los cuales son tema de una próxima lección) para otorgar acceso exclusivo a una tarea a la vez.

Para que puedas utilizar los flujos en tus programas asegúrate que el archivo stream_buffer.c sea parte del proceso de compilación. Este archivo ya está incluído cuando descargas FreeRTOS, y en consecuencia, en el proyecto Molcajete también está donde debe de estar, por lo que no debes procuparte por ello. Así mismo, en cada archivo fuente donde uses a los flujos deberás agregar el encabezado #include <stream_buffer.h>.

También es necesario que la constante INCLUDE_xTaskGetCurrentTaskHandle esté puesta a 1 (en el archivo de configuración FreeRTOSConfig.h):

#define INCLUDE_xTaskGetCurrentTaskHandle 1

Los 3 mecanismos mencionados (flujos, mensajes y colas) necesitan un búfer de datos interno, es decir, un lugar dónde guardar cada byte. Este búfer es un arreglo de elementos uint8_t de un tamaño que tú estableces cuando creas a los objetos. Cuando utilizas creación dinámica de objetos este arreglo se crea de manera automática dentro de la función xStreamBufferCreate(), por lo cual no debes preocuparte de nada (en apariencia. Siempre que te sea posible utiliza la creación estática de objetos, es más segura); mientras que si usas la creación estática de objetos, tú eres responsable de pasarle dicho arreglo a la función xStreamBufferCreateStatic(). A continuación veremos ambas formas.

(Para conocer o recordar la diferencia y la configuración entre objetos dinámicos y estáticos, te recomiendo leer esta lección y esta otra lección, respectivamente. Aunque ambas hablan sobre tareas, la teoría aplica para cualquier tipo de objeto.)

Creación dinámica

Los flujos requieren ser creados, a diferencia de las notificaciones que son parte de las tareas. Como mencioné, podemos crearlos de forma dinámica o estática; veamos primero la forma dinámica:

xBufferSizeBytes. Es el número máximo de bytes que el búfer puede alojar. (size_t es un alias para unsigned long int, y lo usamos cuando se trata de tamaños y límites.)

xTriggerLevelBytes. Aquí especificas cuántos bytes se tienen que escribir al flujo para sacar del estado Blocked (es decir, despertar) a la tarea que estuviera esperando por la información. El valor mínimo es 1 y el máximo no puede ser mayor que xBufferSizeBytes. Por ejemplo, si el búfer estuviera vació y tú especificas el valor 10, entonces la tarea consumidora no saldrá del estado Blocked sino hasta que 10 bytes hayan sido escritos. Es posible cambiar este valor con la función xStreamBufferSetTriggerLevel(). Si la cantidad de bytes establecida en esta función no arriban dentro del tiempo establecido en la tarea consumidora, los n bytes que hubieran sido escritos estarán disponibles. En el ejemplo de los 10 bytes, si sólo llegaron 7 antes de que el tiempo de espera de la tarea consumidora expire, entonces ésta tendrá acceso a esos 7 bytes. Para saber cuántos bytes están disponibles usa la función xStreamBufferBytesAvailable().

Valor devuelto. Si hubo memoria suficiente para crear al flujo, entonces la función devuelve el handler; en caso contrario (no hubo memoria suficiente) devuelve NULL. Este handler lo debes de guardar, tanto para que el resto de funciones de flujos sepan sobre cuál van a operar, como para probar que efectivamente fue creado.

Por ejemplo, para crear un flujo de 128 elementos y que la tarea consumidora sea despertada (sacada del estado Blocked) cuando el flujo tenga al menos 12 bytes, el código nos queda así:

#include <FreeRTOS.h>
#include <task.h>
#include <stream_buffer.h>

StreamBufferHandle_t g_stream_buffer = NULL;
// el handler debe ser global


#define MAX_CAP 128
#define TRIGGER_POINT 12

void setup()
{
   // código

   g_stream_buffer = xStreamBufferCreate( MAX_CAP, TRIGGER_POINT );

   configASSERT( g_stream_buffer );
   // Error creando al flujo

   // más código ...
}

¿Porqué el valor para despertar a la tarea consumidora es 12, mientras que el tamaño máximo es mucho mayor? Que hayan llegado los 12 bytes necesarios no significa que la tarea consumidora los procesará de manera inmediata, peor si ésta es de baja prioridad. Esto es, cuando los 12 bytes hayan llegado al flujo la tarea consumidora pasará del estado Blocked al estado Ready (elegible para ejecutarse), pero quizás no al estado Run (es decir, usar ciclos de CPU) de manera inmediata. Entonces más datos podrían estarse escribiendo en el búfer durante el tiempo que le tome pasar del estado Ready al estado Run. Por esta razón habrá que hacer al búfer un poco más grande (aunque en el ejemplo exageré).

El handler, g_stream_buffer, es global porque debe ser visible tanto en la tarea productora como en la consumidora.

La creación dinámica de objetos no asegura que los objetos se creen, principalmente por escasez de memoria RAM. Por eso siempre deberíamos preguntar si el objeto realmente fue creado, y aunque existen diversas maneras de hacerlo, en este ejemplo escogí la macro configASSERT() que es una versión propia de la conocida assert() de C (en el archivo FreeRTOSConfig.h podrás ver la implementación que utilicé para el projecto Molcajete). Por supuesto que puedes usar al condicional if y tratar de recuperarte sin terminar el programa de manera abrupta.

Creación estática

Ahora veamos cómo crear a un flujo de manera estática. Como es usual, esta forma de creación de objetos necesita más pasos, pero es mucho más segura ya que los objetos siempre serán creados.

Los primeros dos argumentos son idénticos a los de la función xStreamBufferCreate(), así que no los repetiré, y la explicación de los restantes aquí está:

pucStreamBufferStorageArea. Es el búfer interno del flujo donde se guardarán los datos, y no es otra cosa que un arreglo de elementos de tipo uint8_t de tamaño al menos xBufferSizeBytes + 1. Este arreglo deberá existir a lo largo del programa, por lo cual deberás crearlo de manera global o estática a la función donde fue llamada xStreamBufferCreateStatic().

pxStaticStreamBuffer. Esta es la variable que guarda el estado del flujo, es decir, la estructura de datos que maneja al flujo (el flujo es algo más que un simple arreglo). Al igual que el parámetro pucStreamBufferStorageArea deberá existir a lo largo del programa.

Valor devuelto. El handler al flujo. La creación estática de objetos nunca falla, a menos que el arreglo o la variable de estado sean NULL, cosa que tal vez nunca suceda.

#include <FreeRTOS.h>
#include <task.h>
#include <stream_buffer.h>

StreamBufferHandle_t g_stream_buffer = NULL;
// el handler debe ser global

#define MAX_CAP 128
#define TRIGGER_POINT 12

void setup()
{
   xTaskCreate( producer_task, "PROD", 128, NULL, tskIDLE_PRIORITY + 1, NULL );

   xTaskCreate( consumer_task, "CONS", 128, NULL, tskIDLE_PRIORITY, &consumer_handler );

   static uint8_t stream_buffer_array[ MAX_CAP + 1 ];
   // el array subyacente debe ser un elemento más grande que el parámetro xBufferSizeBytes
   // es static para que el arreglo perdure durante todo el programa sin ser visible al resto de funciones

   static StaticStreamBuffer_t stream_buffer_struct;
   // es static para que la variable perdure durante todo el programa sin ser visible al resto de funciones

   g_stream_buffer = xStreamBufferCreateStatic( 
         MAX_CAP,
         TRIGGER_POINT,
         stream_buffer_array,     // arreglo que hace las veces de búfer interno
         &stream_buffer_struct ); // variable de estado del flujo

   // los objetos estáticos siempre se crean, así que no hay necesidad de
   // preguntar si el flujo fue creado o no

   pinMode( 13, OUTPUT );
   // emula a la luz de fondo del LCD

   Serial.begin( 115200 );
   // lo usa la tarea principal

   vTaskStartScheduler();
}

El tamaño del arreglo subyacente (stream_buffer_array[]), es decir, el búfer donde se guardan los datos, debe ser un elemento más grande que el establecido en el argumento xBufferSizeBytes. En este ejemplo marqué a las variables stream_buffer_array y stream_buffer_struct como static; esto para que perduren durante todo el programa, pero que a la vez sean invisibles al resto de funciones. Una alternativa es que las declares globales, aunque entre menos variables globales tengas en tus programas, mejor. Sin embargo el handler, g_stream_buffer, sí que debe ser global porque tanto la tarea productora como la consumidora deben tener acceso a él. Este es un ejemplo de las veces en que sí son necesarias las variables globales. El prefijo g_ nos recuerda que es una variable global y que debemos tener mucho cuidado con ella.

Escribiendo y leyendo al flujo

Una vez que el flujo ha sido creado ya podemos utilizarlo escribiendo en y leyendo desde él. Los datos que deseas transmitir se deben encontrar en un búfer de tu aplicación, y dependiendo de lo que desees transmitir este búfer puede ser un arreglo o una variable, ambos de tipo char, unsigned char, int8_t, o uint8_t. Recuerda que si quieres transmitir variables de tipos simples o de tipos compuestos, entonces deberías utilizar a los mensajes (tema de la siguiente lección).

La función para escribir en el flujo es:

xStreamBuffer. Es el handler del flujo en el que queremos escribir.

pvTxData. Es un apuntador al búfer que queremos transmitir. Dicho búfer lo deberás promocionar a void*. Es importante que recuerdes que los elementos de este búfer son copiados al flujo.

xTicksToWait. Si no hay espacio disponible para una nueva copia de xDataLengthBytes bytes en el flujo, entonces la tarea productora (la que llama a esta función) entrará al estado Blocked por el tiempo determinado en este parámetro. Si el tiempo expira la tarea pasará al estado Ready. El tiempo de espera está en ticks y puedes usar la macro pdMS_TO_TICKS() para convertir milisegundos a ticks. Así mismo, si escribes 0 la función regresa inmediatamente, y si escribes portMAX_DELAY el tiempo de espera es infinito (siempre y cuando la constante simbólica INCLUDE_vTaskSuspend esté a 1). Si el tiempo hubiese expirado, la función habrá escrito la mayor cantidad de bytes posible.

Valor devuelto. Esta función devuelve el número de bytes escritos al flujo.

La función para leer desde el flujo es:

xStreamBuffer. Es el handler del flujo del que queremos leer.

pvRxData. Es un apuntador al búfer en el que los datos leídos se van a guardar. Dicho búfer lo deberás promocionar a void*. Es importante que recuerdes que los elementos de este búfer son copiados desde el flujo al búfer.

xBufferLengthBytes. Es la cantidad máxima de bytes que deseas leer desde el flujo en una sola llamada. Para cualquier tipo diferente de char deberás multiplicar el número de elementos de ese tipo por el tamaño en bytes que ocupa una variable de ese tipo. Esta función devolverá tantos bytes como sea posible, teniendo como límite xBufferLengthBytes.

xTicksToWait. Es el tiempo de espera máximo que la tarea consumidora estará en el estado Blocked mientras arriban bytes a partir de que el búfer estuviera vacío. El tiempo de espera está en ticks y puedes usar la macro pdMS_TO_TICKS() para convertir milisegundos a ticks. Así mismo, si escribes 0 la función regresa inmediatamente, y si escribes portMAX_DELAY el tiempo de espera es infinito (siempre y cuando la constante simbólica INCLUDE_vTaskSuspend esté a 1). Si el tiempo hubiese expirado antes de completar la petición (), la función habrá leído la mayor cantidad de bytes posible.

Valor devuelto. Esta función devuelve el número de bytes leídos desde flujo.

Ejemplo 1

El primer ejemplo que vamos a ver muestra el uso más básico de los flujos: transmitir n bytes de la tarea productora a la consumidora; con n constante. Los flujos no imponen un inicio o un final en la secuencia de bytes, pero en muchas aplicaciones vamos a tomar 10 lecturas y transmitir ese mismo número, sin trucos ni cosas raras.

Con dicha restricción el ejemplo se simplifica ya que no tendremos que verificar si hay espacio en el flujo o no, ni preocuparnos por el punto de disparo (trigger point, en inglés) xTriggerLevelBytes. A éste lo podemos definir como 1 o como n. Recuerda que el punto de disparo es la cantidad de bytes que deben estar en el flujo para que la tarea consumidora sea sacada del estado Blocked. Pero como la copia del búfer de la tarea productora hacia el flujo es muy rápida, en muchas situaciones el resultado será el mismo con el punto de disparo a 1 o a n.

IMPORTANTE: La función de Arduino Serial.print() (y derivados) no son thread-safe; esto es, su comportamiento es miserable en ambientes multitarea (dos tareas tratando de imprimir), dado que Arduino nunca fue pensado para sistemas concurrentes. Esto quedó de manifiesto mientras desarrollaba el ejemplo, al grado que tuve que eliminar prácticamente todas las llamadas.

Mi primer idea para el ejemplo era leer del ADC, quitarle 2 bits a cada lectura (el ADC es de 10 bits) y transmitirlos; sin embargo, para ver que lo transmitido era idéntido a lo recibido debía imprimir los valores tanto en la tarea productora como en la consumidora (en el segundo ejemplo sí que uso al ADC). Pero dado lo que comenté en el recuadro anterior con respecto a Serial.print(), preferí irme por el lado seguro y utilizar un contador ascendente. De esta manera, lo impreso del lado de la tarea consumidora debía ser una secuencia de valores perfectamente definida. Sin más, aquí está el ejemplo:

#include <FreeRTOS.h>
#include <task.h>
#include <stream_buffer.h>


StreamBufferHandle_t g_stream_buffer = NULL;

#define BUFFER_ELEMS ( 8 )
// número de elementos para cada uno de los búfers del usuario

#define STREAM_ELEMS ( 10 )
// número de elementos en el flujo. Hemos dejado un par de bytes para seguridad

#define STREAM_TRIGGER_POINT ( 8 )
// número de bytes escritos al flujo para que la tarea consumidora sea despertada


void producer_task( void* pvParameters )
{
   uint8_t out_buffer[ BUFFER_ELEMS ];

   uint8_t index = 0;

   uint8_t data = 0;

   while( 1 )
   {
      for( uint8_t i = 0; i < BUFFER_ELEMS; ++i ){
         vTaskDelay( pdMS_TO_TICKS( 50 ) );
       
         out_buffer[ i ] = data++;
      }

      size_t bytes_sent = 
         xStreamBufferSend( 
            g_stream_buffer,
            (void*) out_buffer,
            BUFFER_ELEMS,
            pdMS_TO_TICKS( 50 ) );

      if( bytes_sent != sizeof( out_buffer ) ){ // timeout:
         Serial.print( "TO(W): " );
         Serial.println( bytes_sent );
      } 
   }
}

void consumer_task( void* pvParameters )
{
   uint8_t in_buffer[ BUFFER_ELEMS ];

   while( 1 )
   {
      size_t received_bytes = 
         xStreamBufferReceive(
            g_stream_buffer,
            (void*) in_buffer,
            BUFFER_ELEMS,
            pdMS_TO_TICKS( 500 ) );

         Serial.print( "RECIBIDOS: " );
         Serial.println( received_bytes );

      if( received_bytes < BUFFER_ELEMS ) { // timeover:
         Serial.println( "TO(R)" );
      } else{

         for( size_t i = 0; i < received_bytes; ++i ){
            Serial.println( in_buffer[ i ] );
         }

      }
   }
}

void setup()
{
   xTaskCreate( producer_task, "PROD", 128 * 3, NULL, tskIDLE_PRIORITY + 1, NULL );

   xTaskCreate( consumer_task, "CONS", 128 * 3, NULL, tskIDLE_PRIORITY, NULL );

   g_stream_buffer = xStreamBufferCreate( STREAM_ELEMS, STREAM_TRIGGER_POINT );

   configASSERT( g_stream_buffer );
   // Error creando al flujo

   pinMode( 13, OUTPUT );
   // emula a la luz de fondo del LCD

   Serial.begin( 115200 );
   // lo usa la tarea principal

   vTaskStartScheduler();
}

void loop() {}

Una corrida de este ejemplo es:

Salida del programa. La información hasta el símbolo -> es parte de la terminal serial de la IDE de Arduino. Lo importante es la secuencia correcta de valores.

Lo primero que notarás en el ejemplo es que utilicé constantes simbólicas, #define XXX YYY, para establecer algunos tamaños, por seguridad. Imagina que usas números directamente, y luego cambias de 8 elementos a 6, pero se te olvida realizar la actualización en todos los lados donde el tamaño del búfer va a ser utilizado. Eso no es bueno, nada bueno. Establecí el punto de disparo en 8 elementos; esto es, cuando el flujo tenga 8 elementos, entonces sacará del estado Blocked a la tarea consumidora. El valor 1 entrega los mismos resultados.

Ejemplo 2

En este segundo ejemplo veremos algunas cosas interesantes:

  • La creación estática de los flujos.
  • Escribiremos al flujo dato por dato, es decir, la tarea productora no implementa un búfer, como en el ejemplo anterior, sino que una vez generada la información la escribiremos. Esto implica que en el parámetro xTicksToWait de la función xStreamBufferSend() podríamos escribir un tiempo de espera de 0; esto es, la tarea productora no se bloqueará en caso de que el flujo esté lleno, simplemente regresará. Usé 0 para variar al ejemplo.
  • Para este ejemplo sí utilicé un sensor de temperatura. El valor 11 que verás repetido en la imagen de la corrida del programa se corresponde con una temperatura de 21.48 grados.
  • Establecí el punto de disparo (trigger point) a 8. Esto significa que la tarea consumidora será despertada (es decir, sacada del estado Blocked) cada vez que se hayan realizado 8 lecturas del ADC.
  • El ADC del ATmega328 es de 10 bits, pero para meterlo en un dato de 8 bits le quité dos. En el código agregué la fórmula en caso de que quieras imprimir la temperatura en lugar del valor de lectura del ADC.
#include <FreeRTOS.h>
#include <task.h>
#include <stream_buffer.h>


StreamBufferHandle_t g_stream_buffer = NULL;

#define BUFFER_ELEMS ( 8 )
// número de elementos para cada uno de los búfers del usuario

#define STREAM_ELEMS ( 10 )
// número de elementos en el flujo. Hemos dejado un par de bytes para seguridad

#define STREAM_TRIGGER_POINT ( 8 )
// número de bytes escritos al flujo para que la tarea consumidora sea despertada

void producer_task( void* pvParameters )
{
   TickType_t xLastWakeTime = xTaskGetTickCount();

   uint8_t data = 0;

   while( 1 )
   {
      vTaskDelayUntil( &xLastWakeTime, pdMS_TO_TICKS( 50 ) );

      data = analogRead( A0 ) >> 2;
      // -- usé el sensor LM35
      // -- le quitamos dos bits a la lectura (el valor es correcto, pero con menos
      // resolución)

      size_t bytes_sent = xStreamBufferSend( g_stream_buffer, (void*) &data, 1, 0 );
      // escribimos un sólo byte y no hay necesidad de esperar

      if( bytes_sent == 0 ){ // timeout:
         Serial.println( "TO(W)" );
      }
   }
}

void consumer_task( void* pvParameters )
{
   uint8_t in_buffer[ BUFFER_ELEMS ];

   while( 1 )
   {
      size_t received_bytes = 
         xStreamBufferReceive(
            g_stream_buffer,
            (void*) in_buffer,
            BUFFER_ELEMS,
            pdMS_TO_TICKS( 500 ) );

         Serial.print( "RECIBIDOS: " );
         Serial.println( received_bytes );

      if( received_bytes < BUFFER_ELEMS ) { // timeover:
         Serial.println( "TO(R)" );
      } 
#if 1
      else{

         for( size_t i = 0; i < received_bytes; ++i ){

            Serial.println( in_buffer[ i ] );

            // si quieres imprimir la temperatura usa la siguiente fórmula:
            // temp = ( in_buffer[ i ] * 500 ) / 256.0;
            // Serial.println( temp );
         }
      }
#endif
   }
}


void setup()
{
   xTaskCreate( producer_task, "PROD", 128, NULL, tskIDLE_PRIORITY + 1, NULL );

   xTaskCreate( consumer_task, "CONS", 128, NULL, tskIDLE_PRIORITY, NULL );

   static uint8_t stream_buffer_array[ STREAM_ELEMS + 1 ];
   // el array subyacente debe ser un elemento más grande que el parámetro xBufferSizeBytes
   // es static para que el arreglo perdure durante todo el programa sin ser visible al resto de funciones

   static StaticStreamBuffer_t stream_buffer_struct;
   // es static para que la variable perdure durante todo el programa sin ser visible al resto de funciones


   g_stream_buffer = xStreamBufferCreateStatic( 
         STREAM_ELEMS,
         STREAM_TRIGGER_POINT,
         stream_buffer_array,
         &stream_buffer_struct );

   // los objetos estáticos siempre se crean, así que no hay necesidad de
   // preguntar si el flujo fue creado o no

   pinMode( 13, OUTPUT );
   // emula a la luz de fondo del LCD

   Serial.begin( 115200 );
   // lo usa la tarea principal

   vTaskStartScheduler();
}

void loop() {}

Una corrida de este programa es:

El valor 11 se corresponde a una temperatura de 21.48 grados.

Otras funciones de la API

Hemos estudiado apenas las funciones más útiles de la API, sin embargo quedan algunas que es importante que las conozcas (aquí podrás ver la API completa).

Escritura y lectura desde interrupciones

En lecciones anteriores hablé de funciones espejo que deben ser utilizadas cuando son llamadas desde dentro de una interrupción (ISR); a este tipo de funciones FreeRTOS le llama interrupt safe version. Los flujos tienen dos:

Los tres primeros parámetros y el valor devuelto son lo mismo que sus contrapartes. Pero observa dos diferencias fundamentales:

  • No tienen al parámetro xTicksToWait; esto es así porque una ISR no debe bloquearse jamás.
  • Las funciones ponen a pdTRUE el parámetro pxHigherPriorityTaskWoken si la llamada a éstas implica que una tarea de alta prioridad pasó del estado Blocked al estado Ready y es necesario hacer un cambio de contexto, es decir, pasarla al estado Run.

El siguiente fragmento muestra su posible uso (no lo probé, pero así se usa):

void producer_ISR()
{
   isr_ack();
   // "reconocemos" la interrupción (ack -> acknowledge)


   BaseType_t pxHigherPriorityTaskWoken = pdFALSE;
   // siempre debe ser inicializada a pdFALSE

   static uint8_t data = 0;

   size_t bytes_sent = 
     xStreamBufferSendFromISR( 
        g_stream_buffer,
        (void*) &data,
        1,
        &pxHigherPriorityTaskWoken );

   if( bytes_sent == 0 ){
      // no hubo espacio en el flujo
   }

   if( pxHigherPriorityTaskWoken != pdFALSE ) taskYIELD;
   // cambio de contexto opcional, pero recomendable
}

Funciones auxiliares

La API de los flujos incluye una serie de funciones para preguntar o alterar el estado del flujo:

  • BaseType_t xStreamBufferIsEmpty( StreamBufferHandle_t xStreamBuffer ). Devuelve pdTRUE si el flujo indicado en xStreamBuffer está vacío, o pdFALSE en caso contrario.
  • BaseType_t xStreamBufferIsFull( StreamBufferHandle_t xStreamBuffer ). Devuelve pdTRUE si el flujo indicado en xStreamBuffer está lleno, o pdFALSE en caso contrario.
  • BaseType_t xStreamBufferSetTriggerLevel( StreamBufferHandle_t xStreamBuffer, size_t xTriggerLevel ). Establece el nuevo punto de disparo xTriggerLevel para el flujo indicado en xStreamBuffer. Devuelve pdTRUE si xTriggerLevel es menor o igual al tamaño del flujo, o pdFALSE en caso contrario (el punto de disparo es mayor que el tamaño del flujo).
  • size_t xStreamBufferBytesAvailable( StreamBufferHandle_t xStreamBuffer ). Devuelve el número de elementos actualmente en el flujo indicado por xStreamBuffer que se pueden leer.
  • size_t xStreamBufferSpacesAvailable( StreamBufferHandle_t xStreamBuffer ). Devuelve el número de elementos libres en el flujo indicado por xStreamBuffer, es decir, cuánto espacio queda libre.
  • BaseType_t xStreamBufferReset( StreamBufferHandle_t xStreamBuffer ). Pone a su estado inicial al flujo indicado por xStreamBuffer.

¿Qué sigue?

En esta lección hemos estudiado el primero de 3 mecanismos que FreeRTOS incluye para pasar largas cantidades de información entre tareas, o entre una interrupción y una tarea. Vimos cómo crear flujos de manera dinámica y estática, y también cómo escribir y leer tanto arreglos como datos individuales.

RECUERDA: Siempre que te sea posible utiliza objetos estáticos.

RECUERDA: Si vas a llamar a una función de FreeRTOS desde una interrupción, verifica que ésta sea interrupt safe version, es decir, tenga terminación FromISR.

RECUERDA: Los flujos se utilizan para transmitir y recibir secuencias de bytes. Si tu intención es otra, entonces usa mensajes o colas.

En la siguiente lección estudiaremos a los mensajes, los cuales son una extensión lógica (y muy necesaria) de los flujos. Con los mensajes ya podremos transmitir y recibir datos más grandes que uint8_t, tales como int‘s, float‘s, y tipos compuestos, struct.

(En esta página encontrarás información de suma importancia sobre los flujos, mensajes y notificaciones. Deberías leerla en cuanto te sea posible.)

Índice del curso

Espero que esta entrada haya sido de tu interés. Si fue así podrías suscribirte a mi blog, o escríbeme a fjrg76 dot hotmail dot com, o comparte esta entrada con alguien que consideres que puede serle de ayuda.


Si encuentras este blog interesante, entonces podrías considerar suscribirte a él y recibir información relevante sobre tecnología y sistemas embebidos, y de vez en cuando, uno que otro regalo.

Fco. Javier Rodríguez

1 comentario en “Comunicación inter-tareas (I): Flujos (stream buffers)

  1. Pingback: Comunicación inter-tareas (II): Mensajes (message buffers) | Electrónica y Sistemas embebidos

Deja una respuesta