Variables de condición y monitores, I

Esta entrada la quiero presentar a través de dos ejemplos y algunas preguntas.

Imagina un sistema concurrente o multitarea con sistema operativo, como muchos de los que hemos estado escribiendo. Imagina que tienes una estructura de datos Cola (Queue en inglés) y que es compartida por varias tareas, es decir, dos o más tareas tratarán de escribir o leer de ella. Los problemas que se presentan son:

  1. ¿Qué haces si la cola está vacía y una o más tareas necesitan leer de ella?
  2. ¿Qué haces si la cola está llena y una o más tareas necesitan escribir en ella?
  3. ¿Qué haces si mientras una tarea está escribiendo en la cola otra quiere leer de ella «al mismo tiempo»?

Este otro ejemplo es clásico en el tema que vamos a tratar hoy. Imagina un sistema bancario que puede recibir depósitos y puede entregar dinero en efectivo. Varias personas pueden depositar, mientras que una o más pueden retirar.

  1. ¿Qué haces si dos personas quieren depositar al mismo tiempo?
  2. ¿Qué haces si dos personas quieren retirar al mismo tiempo?
  3. ¿Qué haces si mientras una persona está depositando otra quiere retirar, o viceversa?
  4. ¿Qué haces si quieres retirar una cantidad, pero el saldo no es suficiente?

El punto en común de ambos ejemplos es que hay un recurso compartido que por un lado de ninguna manera debe ser accesado al mismo tiempo, so pena de corromperlo, y por otro lado tiene tiempos de inactividad en que el solicitante debe esperar (no hay datos para desencolar, la cola está llena, saldo insuficiente para cubrir el retiro, etc).

Una solución es que si el recurso está siendo utilizado, los solicitantes se agreguen a una lista de espera y se vayan a dormir. En cuanto el recurso es liberado, uno de los solicitantes en la lista de espera es sacado de la misma y despertado para que use al recurso. A esta solución le vamos a llamar Variable de condición (en inglés, Condition variable).

Y por otro lado tenemos a los monitores. Un monitor es una construcción (en forma de tipo abstracto o clase) que encapsula a un recurso compartido, administra el acceso a éste a través de un semáforo mutex, y presenta un conjunto de operaciones al cliente del mismo. El recurso sólo puede ser utilizado a través de las operaciones públicas, es decir, el recurso no puede ser usado de maneras que no hayan sido previstas; además, es el monitor el que se encarga de la administración del mutex, no el usuario del recurso. Un monitor puede usar o no variables de condición.

En esta primera parte del artículo de hoy voy a mostrarte una implementación de variable de condición para un microcontrolador con núcleo ARM Cortex-M3 y el sistema operativo FreeRTOS. Por supuesto que las ideas expuestas, así como el código, los podrás trasladar a otras arquitecturas y sistemas operativos con un mínimo de esfuerzo. En la segunda parte te mostraré cómo implementar un monitores con y sin variables de condición.

Tabla de contenidos

¿Qué es y qué necesito?

Un semáforo mutex (de aquí en adelante solamente ‘mutex’) puede implementar exclusión mutua simple; esto es, nada más una tarea puede accesar a un recurso compartido (o estar activa dentro de una región crítica) en un tiempo dado. Para que otra tarea accese al recurso, la tarea que tenía el mutex lo debe devolver. Esta situación hace imposible que la tarea que tiene al mutex, y por lo tanto al recurso compartido, sea suspendida y despertarla después cuando una condición se cumpla. (En mi curso de Arduino en tiempo real escribí esta entrada sobre los mutex’es.)

En los ejemplos que dí al principio lo podemos observar: una tarea que quiere escribir a la cola ya tiene el mutex, es decir, ya tiene acceso a la cola, pero ésta está llena, y por lo tanto no se puede escribir en ella, ¿qué debe hacer la tarea que tiene el acceso a la cola y quiere escribir? Puede hacer dos cosas:

  1. Devolver el mutex y salir para después tentar a la suerte e intentar adquirir al mutex (y al recurso) cuando se haya hecho algo de espacio, o
  2. Ser insertada en una cola de espera, irse a dormir, y después ser desencolada de manera automática tan pronto se haya hecho algo de espacio en la cola.

El punto 2 es la definición de una variable de condición: una tarea a la espera del mutex es encolada y suspendida, y cuando la condición (en este ejemplo, hay espacio en la cola) se cumple, la tarea es desencolada, despertada, adquiere el mutex y se pone a trabajar. La cola de espera permite que varias tareas estén suspendidas (dormidas) a la espera del mutex, y cada una lo obtendrá tan pronto la condición se cumpla.

Una variable de condición es un tipo abstracto que presenta, al menos, las siguientes operaciones:

  • CWait(). Si el mutex no está disponible, entonces la tarea que hizo la llamada a esta función se encola y se suspende.
  • CNotify(). Avisa que la condición se cumplió, y por lo tanto, la tarea que lleva más tiempo en la cola de espera es despertada y puesta a trabajar. Si no hubiera ninguna tarea esperando, entonces no se hace nada.

(La letra C en los nombres anteriores es de Condition, pero es lo de menos.)

En su forma más básica los atributos de este tipo abstracto es una estructura de datos Cola, es decir, el primer elemento en haber entrado (el más antiguo) será el primero en salir (en inglés, FIFO: First-in, First-out). Aunque el mutex puede ser parte del mismo, la mayoría de implementaciones que he visto lo manejan de forma externa, es decir, son parte de la lista de argumentos de la función CWait(). Quizás más adelante modifique (o mejor aún, tú modifiques) la clase CondVar para que también provea al mutex.

Así mismo, además de las dos funciones básicas mencionadas, también se podrían incluir las siguientes tres:

  • CWaitFor(). Igual que CWait(), pero si no obtiene al mutex antes de que el tiempo (relativo a cuando se hizo la llamada) establecido expire sale y devuelve el valor false.
  • CWaitUntil(). Igual que CWait(), pero si no obtiene al mutex antes de que el tiempo (absoluto con referencia a una marca de tiempo en el pasado) establecido expire sale y devuelve el valor false.
  • CNotifyAll(). Igual que CNotify(), pero despierta a todas las tareas que estuvieran en la cola de espera.

El ejemplo que te voy a mostrar más adelante incluye las funciones CWait() y CNotify(). Y hablando de ejemplos, antes de ver los pseudo códigos correspondientes creí necesario que supieras que tanto las variables de condición, como los monitores (que veremos en la segunda parte), pueden ser codificados de manera procedimental (en lenguaje C), u orientada a objetos (con clases en lenguaje C++, Java, C#, etc). El ejemplo te lo voy a mostrar en C++ con objetos, pero verás que no tendrás ningún problema es hacerlo en C plano.

Operación CWait

El núcleo de una variable de condición es la operación CWait(), cuyo pseudo código es:

CWait( mutex )
{
    Inserta a la tarea en la cola de espera
    Devuelve el mutex
    La tarea se suspende

    // esta es la instrucción que la tarea ve cuando es despertada:
    Adquiere al mutex
}

Ojalá su implementación fuera tan simple como su pseudo código. Nota que cuando la tarea es despertada lo primero que hace es intentar adquirir al mutex de nuevo (en la línea 4 lo devolvió).

Ahora, entre el momento de que la tarea fue despertada y el momento en que obtuvo al mutex de regreso, muchas cosas pudieron haber sucedido, entre ellas que la condición dejara de cumplirse. Sí, podría pasar. Por ejemplo, si la tarea estaba esperando un espacio para escribir, pero antes de obtener nuevamente el mutex otra tarea llenó la cola, entonces debe volver a esperar. Esta condición se puede manejar así:

Mutex mutex

una_tarea()
{
    Mientras 1 == 1
    {
        Obtiene al mutex

        Mientras la condición no se cumpla
        {
            CWait( mutex )
        }

        // usa al recurso 

        Devuelve al mutex
    }
}

¿Y cómo sabe que la condición se cumplió? ¡Ah! Aquí es donde la función CNotify() entra a escena.

Operación CNotify()

Cuando el código del cliente de la variable de condición detecta que la condición se ha cumplido, entonces manda llamar a CNotify(), cuyo pseudo código es:

CNotify()
{
    Si hay elementos en la cola de espera
    {
        Desencola a la tarea que ha esperado por más tiempo
        La despierta
    }
}

Que una tarea sea despertada no significa que vaya a ser ejecutada inmediatamente; el sistema operativo la pone como candidata a ser ejecutada (es decir, en la lista de tareas Ready) y se ejecutará cuando le corresponda.

¿Cómo se usan ambas funciones, CWait() y CNotify()? Veámoslo en un pseudo código:

una_tarea()
{
    Mientras 1 == 1
    {
        Obtiene al mutex

        Mientras la condición no se cumpla 
        {
            CWait( mutex )
        }

        // usa al recurso 

        Devuelve al mutex
    }
}
    
otra_tarea()
{
    Mientras 1 == 1
    {
        Si la condición se ha cumplido 
        {
            CNotify()
        }
    }
}

La función otra_tarea está revisando el cumplimiento de la condición, y cuando ésta ha sido cumplida, manda llamar a CNotify(), la cual despierta, de existir, a la tarea que más tiempo ha estado esperando. Como la condición ya fue cumplida, el ciclo wait deja de ejecutarse y la tara una_tarea puede accesar al recurso.

Desarrollo

Una vez que ya tenemos una idea de qué son y cómo se usan las variables de condición, es momento de aterrizar los conceptos. Los libros e Internet están llenos de teoría, y me costó mucho trabajo encontrar ejemplos concretos. Por eso, además de realizar la implementación de este tipo de mecanismos, es que quise compartirla contigo por si en algún momento tienes que usarlas.

Función CWait() y compañía

La función CWait() en C/C++, y utilizando al sistema operativo FreeRTOS, queda de la manera siguiente. Como mencioné, si estás utilizando C plano o algún otro sistema operativo me parece que no tendrás problemas en adaptarlo. Al final del artículo te pasaré el repositorio donde podrás descargar el código completo para el microcontrolador LPC1549 de 32 bits, de núcleo Cortex-M3.

   bool CWait( SemaphoreHandle_t& mutex, TickType_t ticks = portMAX_DELAY )
   {
      TaskHandle_t self;

      self = xTaskGetCurrentTaskHandle();
      // obtiene la tarea actual

      taskENTER_CRITICAL();
      {
         // guarda la tarea en la cola de espera:
         configASSERT( this->len < this->max );
         this->queue[ this->tail++ ] = self;
         if( this->tail == this->max ) this->tail = 0;
         ++this->len;

      } taskEXIT_CRITICAL();

      xSemaphoreGive( mutex );
      // devuelve el mutex

      vTaskSuspend( self );
      // la tarea se auto suspende
     
      return xSemaphoreTake( mutex, ticks );
      // intenta reobtener al mutex
   }

Nota que el código que tiene que ver con guardar la tarea (en realidad, su handler o descriptor) en la cola de espera está dentro de una sección crítica. No queremos que sucedan los problemas que mencioné en un principio, es decir, que mientras una tarea está intentando insertarse en la cola, otra, quizás de mayor prioridad, le gane. Usé una sección crítica para que la operación de inserción (y más adelante, la de extracción) fuese atómica, pero también es posible utilizar un mutex dedicado a la variable de condición con el mismo propósito. De hecho muchos libros lo hacen así (en la teoría). En un futuro cambiaré la sección crítica por un mutex dedicado.

CWait() espera de manera indefinida para obtener el mutex. Su argumento ticks indica que espere por siempre (gracias al valor por defecto portMAX_DELAY); esto es como si el argumento no existiera, y así debe ser para esta función. Sin embargo, las funciones CWaitFor() y CWaitUntil() sí que lo usan:

   bool CWaitFor( SemaphoreHandle_t& mutex, TickType_t ticks )
   {
      return CWait( mutex, ticks );
   }

   bool CWaitUntil( SemaphoreHandle_t mutex, TickType_t& last, TickType_t ticks ) 
   {
      TickType_t now = xTaskGetTickCount();

      TickType_t until = last + ticks - now;
      // los overflows se manejan de manera automática

      last = until;

      return CWait( mutex, until );
   }

Si estás familiarizado con la función de FreeRTOS vTaskDelayUntil(), no tendras problema para entender a la función CWaitUntil() que funciona de manera similar: el tiempo establecido es absoluto, es decir, está referido a un punto establecido en el pasado, el cual se actualiza antes de salir de la función. Con esto evitas el desplazamiento de las llamadas en la línea de tiempo. Sin embargo, si tu aplicación no tiene problemas con ello, entonces puedes usar CWaitFor(), la cual establece un tiempo de salida relativo a cuando fue llamada, similar a la función de FreeRTOS vTaskDelay(). En esta lección de mi curso gratuito Arduino en tiempo real explico las diferencias y los usos de ambas funciones, vTaskDelay() y vTaskDelayUntil().

Función CNotify() y compañía

La función CNotify() en C/C++, y utilizando al sistema operativo FreeRTOS, queda de la manera siguiente. Es más fácil que su contraparte CWait(), pero igual de importante, ya que es ésta la que vuelve a la vida a las tareas para que continuen con su trabajo:

   void CNotify()
   {
      TaskHandle_t task = nullptr;

      taskENTER_CRITICAL();
      {
         if( this->len > 0 ){
            task = this->queue[ this->head++ ];
            if( this->head == this->max ) this->head = 0;
            --this->len;
         }
      } taskEXIT_CRITICAL();

      if( task != nullptr ) vTaskResume( task );
   }

RECUERDA: Hablando de la misma variable de condición, una tarea llama a CWait(), mientras que otra debe llamar a CNotify(). En otras palabras, las tareas consumidoras llaman a CWait(), mientras que las tareas productoras llaman a CNotify().

La sección crítica evita que si dos tareas llaman a CNotify() al mismo tiempo interfieran entre sí y corrompan a la cola de espera.

CNotify() despierta a la tarea que ha esperado por más tiempo, de haberla. La función CNotifyAll() despierta a todas:

   void CNotifyAll()
   {
      taskENTER_CRITICAL();
      {
         while( this->len ) CNotify();
      } taskEXIT_CRITICAL();
   }

Ejemplo

Ahora que conocemos qué son, qué hacen y cómo se implementan las variables de condición, ya podemos ver un ejemplo simple. El recurso compartido será una cola de caracteres, y habrá una función put() que escriba en ella y una función get() que lea de ella. Para poder escribir es necesario que haya espacio, y para poder leer es necesario que hayan caracteres disponibles. Para solucionar este problema vamos a usar dos variables de condición, una por cada condición (¡Wow!)

Primero declaramos las dos variables de condición:

ConditionV<4> data_avail;
// Variable de condición que indica que hay caracteres disponibles para leer

ConditionV<4> space_avail;
// Variable de condición que indica que hay espacio disponible en la cola de caracteres

El valor 4 indica el máximo número de tareas que la cola de espera de la variable de condición podrá guardar. En tu aplicación tú decides si son más o son menos.

La función put() queda así:

void put( char c )
{
   xSemaphoreTake( mutex, portMAX_DELAY );
   // adquiere al recurso

   while( len >= max ) space_avail.CWait( mutex );
   // la tarea se bloquea si la cola de caracteres está llena
   // la función get() realiza la notificación

   // escribe un carácter en la cola:
   buf[ tail++ ] = c;
   if( tail >= max ) tail = 0;
   ++len;

   data_avail.CNotify();
   // avisa que hay al menos un carácter listo para ser leído,
   // la usa la función get()

   xSemaphoreGive( mutex );
   // devuelve al recurso
}

La función get() es:

char get()
{
   xSemaphoreTake( mutex, portMAX_DELAY );
   // adquiere al recurso

   while( len == 0 ) data_avail.CWait( mutex );
   // la tarea se bloquea si la cola de caracteres está vacía
   // la función put() realiza la notificación

   // lee un carácter de la cola:
   char c = buf[ head++ ];
   if( head >= max ) head = 0;
   --len;

   space_avail.CNotify();
   // avisa que hay al menos un espacio para escribir,
   // la usa la función put()

   xSemaphoreGive( mutex );
   // devuelve al recurso

   return c;
   // devuelve al carácter
}

Las tareas productoras y consumidoras son las siguientes (en realidad es una de cada una, pero las reutilicé) :

void producer( void* pvParameters )
{
   uint32_t offset = (uint32_t)pvParameters;
   char cont = 0;

   char* task_name = pcTaskGetName( NULL );

   char letter = offset;

   while( 1 )
   {
      put( letter++ );
      // intenta escribir un carácter en la cola

      ++cont;
      if( cont == 10 ){
         cont = 0;
         letter = offset;
      }

      vTaskDelay( pdMS_TO_TICKS( ( rand() % 50 ) + 20 ) );
   }
}

void consumer( void* pvParameters )
{
   char* task_name = pcTaskGetName( NULL );

   while( 1 )
   {
      char c = get();
      // intenta leer un carácter de la cola

      vTaskDelay( pdMS_TO_TICKS( ( rand() % 150 ) + 25 ) );
   }
}

Quizás notaste que los tiempos que ambas tareas establecen para irse dormir (y así liberar la CPU) son aleatorios. Esto no es requisito para las variables de condición. Lo hice así con el fin de probar el código en condiciones cercanas a la realidad.

Para completar el ejemplo veamos la creación de las tareas:

   xTaskCreate( producer, "P0", 128,  (void*) 'A', tskIDLE_PRIORITY + 1, NULL ); // prints out: ABC...J
   xTaskCreate( producer, "P1", 128,  (void*) 'a', tskIDLE_PRIORITY + 1, NULL ); // prints out: abc...j
   xTaskCreate( producer, "P2", 128,  (void*) '0', tskIDLE_PRIORITY + 1, NULL ); // prints out: 012...9

   xTaskCreate( consumer, "C0", 128, NULL, tskIDLE_PRIORITY, NULL );
   xTaskCreate( consumer, "C1", 128, NULL, tskIDLE_PRIORITY, NULL )

Código fuente

El código fuente completo de este artículo lo puedes descargar o clonar desde este commit del repositorio del proyecto; pero si quieres obtener la última versión completa, incluyendo al monitor (tema de la segunda parte), lo puedes hacer desde aquí.

¿Qué sigue?

Hoy vimos qué son y cómo se implementan las variables de condición, herramientas muy útiles en la programación multi tarea.

Utilicé secciones críticas en las funciones CWait() y CNotify(). Una sección crítica detiene a todo el sistema porque desactiva las interrupciones. Una mejora, como lo mencioné, sería agregar un mutex dedicado a la variable de condición en lugar de las secciones críticas. Con esto logramos que el sistema siga funcionando independiente de las variables de condición.

Otra modificación muy interesante de cara a los sistemas de tiempo real es cambiar la cola de espera, que es circular, por una cola de espera doble, o una cola de prioridad, de tal manera que las tareas con prioridad alta sean las primeras en ser despachadas. No es fácil tomar la decisión ya que las tareas de alta prioridad podrían quedarse con la CPU y nunca prestársela a las tareas con baja prioridad que estuvieran esperando por el recurso. De cualquier manera vale la pena realizar el análisis y la modificación de la misma. O en su defecto, tener dos versiones de variables de condición: modo «normal» y modo «tiempo real».

En la segunda parte abordaré el tema de los monitores, los cuales, como también mencioné, pueden o no usar variables de condición.


Si encuentras este blog interesante, entonces podrías considerar suscribirte a él y recibir información relevante sobre tecnología y sistemas embebidos, y de vez en cuando, uno que otro regalo.

Cómo notificar a una tarea en FreeRTOS desde dos o más tareas

En esta lección de mi curso gratuito Arduino en tiempo real te mostré cómo utilizar un mecanismo para enviar notificaciones directas de tarea a tarea. En esta nota especial te mencioné que FreeRTOS incluyó un arreglo de notificaciones independientes en cada tarea, lo que significa que varias tareas podrían notificar a una misma (en teoría).

Aunque la documentación oficial y sus ejemplos no muestra cómo lograr esto último, me puse a pensar y llegué a una posible solución. Debo añadir que ni en el foro oficial de FreeRTOS saben para qué sirve tener un arreglo de notificaciones.

Por otro lado, en la nota especial señalé que por alguna razón las notificaciones indexadas (así le llama FreeRTOS a tener un arreglo de notificaciones) no funcionan, por lo que el desarrollo que te mostraré está basado en el microcontrolador de 32 bits LPC1549 y su herramienta de desarrollo gratuita LPCXpresso. Por supuesto que lo que mencionaré aplica para casi cualquier otro microcontrolador, excepto el ATmega328. Por esta razón no he puesto la presente entrada en el curso, ya que al no utilizar al chip ATmega328, no aplica.

Tabla de contenidos

¿Qué es y qué necesito?

Notificaciones directas

Las notificaciones directas a la tarea (direct to task notifications) de FreeRTOS son un mecanismo que permite que una tarea envíe un dato de 32 bits a otra tarea. Así de simple. El valor recibido puede tomar varias interpretaciones, desde un dato puro hasta como semáforos binarios o contadores. Algunas ventajas de este mecanismo es que consume pocos o casi nada de recursos, y además está integrado al sistema operativo, por lo que una tarea puede bloquearse mientras la notificación le llega; de ese modo la tarea bloqueada no consume ciclos de CPU.

Para más detalles de qué son y cómo se configuran por favor visita esta y esta lecciones.

Terminología

Tarea productora. Es una tarea que genera información para enviársela a otra tarea.

Tarea consumidora. Es una tarea que consume la información que otras tareas han generado

Teoría

La idea detrás del desarrollo que estoy a punto a describirte es que cada tarea productora escribirá su información en una entrada específica del arreglo de notificaciones; y además la tarea consumidora tendrá una entrada extra para recibir notificaciones. Esto es, cada vez que una tarea productora escriba en la entrada correspondiente, también deberá avisar que lo hizo señalizando en dicha entrada extra. Por ejemplo, si tu aplicación tiene 3 tareas productoras, entonces utilizarás 4 entradas en el arreglo de notificaciones.

La tarea consumidora será desbloqueada cada vez que una tarea productora se lo notifique. Luego leerá la información que le fue transmitida y volverá al estado de bloqueo.

PARA RECORDAR: FreeRTOS incluye dos mecanismos para transferir flujos y mensajes de información (Streams buffers y Message buffers, respectivamente). Éstos utilizan a la entrada 0 del arreglo de notificaciones, en caso de que actives a cualquiera de los dos. Entonces, si tu aplicación tiene 3 tareas productoras, utilizarás 4 entradas en el arreglo de notificaciones, pero deberías declarar 5:

/* Archivo de configuración FreeRTOSConfig.h */

#define configUSE_TASK_NOTIFICATIONS 1
// avisas que quieres utilizar notificaciones directas

#define configTASK_NOTIFICATION_ARRAY_ENTRIES   5
// avisas que cada tarea tendrá un arreglo de 5 entradas

Derivado de la configuración anterior podrás notar que hay un poco de desperdicio de memoria, ya que si tu aplicación tiene 7 tareas, entonces cada una de ellas tendrá un arreglo de 5 elementos para las notificaciones, en total 35 elementos de 32 bits cada uno, de los cuales ocuparás 5 (para este ejemplo). Tómalo en cuenta para que tengas cuidado cuando establezcas el número de entradas. Si no planeas utilizar los streams ni los messages, podrías, para este ejemplo, indicar 4 entradas.

Desarrollo

Para mostrarte cómo dos o más tareas pueden notificar a una misma, en este ejemplo vamos a tener dos tareas productoras y una consumidora, y no vamos a usar streams ni messages, por lo que nuestra configuración quedaría así:

/* Archivo de configuración FreeRTOSConfig.h */

#define configUSE_TASK_NOTIFICATIONS          1
#define configTASK_NOTIFICATION_ARRAY_ENTRIES 3

Como ya te decía, cada tarea productora deberá notificar la información y una notificación. La siguiente tarea simplemente incrementa en dos un valor y se lo transmite a la tarea consumidora. Por supuesto que tus tareas podrían realizar algo mucho más complejo, incluyendo notificar desde interrupciones:

void Producer1_task( void* pvParameters )
{
   pvParameters = pvParameters;

   uint32_t cont = 1;

   while( 1 )
   {
      vTaskDelay( pdMS_TO_TICKS( ( rand() % 100 ) + 100 ) );

      xTaskNotifyIndexed(
    		  consumer_h,            // handler de la tarea consumidora
			  1,                      // índice en el arreglo de notificaciones de la tarea consumidora para recibir la información
			  (uint32_t) cont,        // información que queremos transmitir
			  eSetValueWithOverwrite  // sobreescribe la información si la anterior no fue leída
	  );

      xTaskNotifyIndexed(
           consumer_h,             // handler de la tarea consumidora
			  0,                      // índice en el arreglo de notificaciones de la tarea consumidora para recibir "notificaciones"
			  ( 1 << 1 ),             // el bit 1 (0x01) corresponde a esta tarea
			  eSetBits                // hace una OR entre el valor 0x01 y lo que ya estuviera en el campo de notificaciones
	  );

      // tu proceso...

      cont += 2;

   }
}

Las tareas productoras necesitan conocer el handler de la tarea consumidora, consumer_h. Este handler lo puedes pasar como argumento a la tarea, o declararlo global. En este ejemplo, como verás más adelante, me fui por el camino fácil y poco recomendado: lo declaré global.

Así mismo, las tareas productoras tienen mayor prioridad que la tarea consumidora. Veamos el código de la otra tarea productora:

void Producer2_task( void* pvParameters )
{
	pvParameters = pvParameters;

   uint32_t cont = 2;

   while( 1 )
   {
      vTaskDelay( pdMS_TO_TICKS( ( rand() % 100 ) + 125 ) );

      xTaskNotifyIndexed(
    		  consumer_h,             // handler de la tarea consumidora
			  2,                      // índice en el arreglo de notificaciones de la tarea consumidora para recibir la información
			  (uint32_t) cont,        // información que queremos transmitir
			  eSetValueWithOverwrite  // sobreescribe la información si la anterior no fue leída
	  );

      xTaskNotifyIndexed(
           consumer_h,             // handler de la tarea consumidora
			  0,                      // índice en el arreglo de notificaciones de la tarea consumidora para recibir "notificaciones"
			  ( 1 << 2 ),             // el bit 2 (0x02) corresponde a esta tarea
			  eSetBits                // hace una OR entre el valor 0x02 y lo que ya estuviera en el campo de notificaciones
	  );

      // tu proceso...

      cont += 2;
   }
}

Ambas tareas son muy parecidas, pero lo hice así para mantener simple al ejemplo y concentrarme en la finalidad del mismo.

Después tenemos a la tarea consumidora, la cual como ya dije, se bloquea a la espera de ser notificada:

void Consumer_task( void* pvParameters )
{
   uint32_t p1_value;
   uint32_t p2_value;

   uint32_t         notified_value;
   const TickType_t max_block_time = pdMS_TO_TICKS( 2000 );

   while( 1 )
   {
      BaseType_t 
      ret_val = xTaskNotifyWaitIndexed(
         0,                       // índice donde recibe las notificaciones
         0,                       // no pone a cero ningún bit a la entrada a la función
         ( 0x01 | 0x02 ),         // pone a cero los bits 1 y 2 a la salida de la función
         &notified_value,         // en esta variable guarda quién lanzó las notificaciones (se usa más adelante)
         max_block_time );        // tiempo de espera antes de indicar error

      if( ret_val != pdFALSE ){   // las notificaciones se entregaron antes de que el tiempo terminara
         
         if( notified_value & 0x01 ){ // la tarea Producer1_task puso una notificación
            xTaskNotifyWaitIndexed( 
                  1,                  // en esta entrada está la información
                  0,                  // no pone a cero ningún bit a la entrada a la función
                  0xffffffff,         // pone el campo de información a 0 a la salida de la función
                  &p1_value,          // en esta variable guarda la información
                  0 );                // no espera ya que sabemos que hay información lista para ser leída

            // tu proceso ...

            DEBUGOUT( "%d\n\r", p1_value ); // imprime el dato
            Board_LED_Toggle( RED );
         }

         if( notified_value & 0x02 ){ // la tarea Producer2_task puso una notificación
            xTaskNotifyWaitIndexed( 
                  2,                  // en esta entrada está la información
                  0,                  // no pone a cero ningún bit a la entrada a la función
                  0xffffffff,         // pone el campo de información a 0 a la salida de la función
                  &p2_value,          // en esta variable guarda la información
                  0 );                // no espera ya que sabemos que hay información lista para ser leída

            // tu proceso ...

            DEBUGOUT( "%d\n\r", p2_value ); // imprime el dato
            Board_LED_Toggle( BLUE );
         }
      }
   }
}

Cuando cualquiera de las tareas productoras «avisa» que hay un dato listo, esta función sale del estado Block, pero como son dos o más tareas las que pueden despertarlo, debe verificar, una a una, la tarea que hizo la notificación revisando su respectivo bit. Una vez que la ha determinado, obtiene de manera inmediata la información. Dado que ya sabemos que hubo un dato, no hay necesidad de establecer un tiempo de espera, por eso el argumento timeout vale 0.

¿Qué sigue?

Hoy hemos visto una forma de utilizar al arreglo de notificaciones para que dos o más tareas notifiquen a una misma. Esta idea la podemos extender a interrupciones (ISRs) para que sea una tarea, y no las propias ISRs, quienes procesen la información (a este procedimiento le llamamos diferred tasks, o en español, tareas diferidas. Escribí un artículo sobre esto, en inglés, aquí en mi blog alternativo).

Ten en cuenta que este procedimiento es útil y ahorra muchísimos recursos, pero si tus tareas deben transferir grandes cantidades de información quizás debas considerar la utilización de otros mecanismos como los streams, los messages, o las colas.

Índice del curso

Espero que esta entrada haya sido de tu interés. Si fue así podrías suscribirte a mi blog, o escríbeme a fjrg76 dot hotmail dot com, o comparte esta entrada con alguien que consideres que puede serle de ayuda.


Si encuentras este blog interesante, entonces podrías considerar suscribirte a él y recibir información relevante sobre tecnología y sistemas embebidos, y de vez en cuando, uno que otro regalo.

Hooks útiles de FreeRTOS y un watchdog por software

Mucho, si no es que casi todo el trabajo que hacemos, lo llevamos a cabo dentro de tareas e interrupciones, en lo que le llamaríamos la capa de usuario. Sin embargo, FreeRTOS nos ofrece la posiblidad de hacer cosas simples como si nuestro código fuese parte de él, ejecutándose en la capa del sistema operativo.

Imagina que debes implementar un LED que parpadée a intervalos regulares para indicarte que el sistema está «vivo» (a esta actividad le llamamos heartbeat, o en español, latido del corazón). Si el LED no parpadea, o dejó de hacerlo después de un tiempo, significa que algo malo pasó dentro del sistema. Para implementarlo puedes hacerlo de dos formas con lo que conocemos hasta el momento:

  1. Una tarea dedicada al heartbeat, o
  2. Un software timer con una callback que lleve a cabo el latido del corazón.

Ambas soluciones tienen el problema del alto consumo de recursos para una tarea tan simple. ¿Qué podríamos hacer? Qué tal que le decimos al sistema operativo que él mismo mande llamar a nuestra función heartbeat. ¡Excelente!

FreeRTOS nos provee dos maneras de hacerlo:

  • Que FreeRTOS llame a nuestra función heartbeat() cada vez que el sistema esté desocupado, o
  • Que FreeRTOS llame a nuestra función heartbeat() en cada tick del sistema.

Cuál escoger depende totalmente de la aplicación que estés escribiendo, y por supuesto que puedes implementar muchas cosas diferentes a un heartbeat.

Con este par de mecanismos no tendrás que crear tareas extra o usar a los software timers para algo tan sencillo. Eso sí, el código de heartbeat(), o cualquier otro que inyectes a FreeRTOS, tiene que ser simple y no debería llamar a ninguna función bloqueante.

Una aplicación muy útil que podemos inyectar al sistema para que se ejecute cada vez que el sistema está desocupado es la de un watchdog por software. Un watchdog se encarga de supervisar al sistema; si una tarea crítica deja de responder, entonces el sistema completo es reiniciado.

En la lección de hoy vamos a recordar qué son las callbacks, cómo usarlas en cada tick del sistema o cada vez que éste tenga tiempo libre, y cómo implementar un watchdog por software básico.

Tabla de contenidos

¿Qué es y qué necesito?

Callbacks

Una función callback es una función tuya que se la pasas a una función de un sistema independiente (o aparte del tuyo) como parte de una lista de argumentos. Esta función del sistema entonces ejecuta tu función:

int fn( int v );
void funcion_del_sistema( int a, int (*pf)(int) )
{
	pf( a );
}


int callback( int v )
{
	return v+1;
}

int main()
{
	funcion_del_sistema( 5, callback );
}

La función funcion_del_sistema() no es tuya, es parte de un sistema más grande, por ejemplo, FreeRTOS o un administrador de ventanas en un sistema gráfico. Pero tú le pasas código tuyo inyectándole la función callback() la cual sí es tuya (le puse ese nombre para que la identificaras). La función funcion_del_sistema() la recibe como parte de sus argumentos y le da el nombre de pf(). Cuando funcion_del_sistema() se encuentra pf( a ) manda llamar a callback( 5 ). Esta es una forma de extender la funcionalidad de un sistema sin modificar su código fuente.

Para que las callbacks funcionen sus firmas deben de corresponder. Por ejemplo, la función funcion_del_sistema() puede recibir cualquier función que devuelva un entero y reciba un sólo argumento y éste sea entero, y sería un error de compilación si le quieres pasar una función que devuelve un float o que recibe dos argumentos, etc.

Hooks

Una función hook (en español, ancla) sirve para lo mismo que una callback, extender la funcionalidad del sistema, pero se implementa diferente. Una función del sistema espera que exista una cierta función y la manda llamar; esto es, no se inyecta:

int hook( int );
void funcion_del_sistema( int a )
{
	hook( a );
}


int hook( int v )
{
	return v+1;
}

int main()
{
	funcion_del_sistema( 5 );
}

Para que este mecanismo funcione debes nombrar a la hook exactamente como el sistema lo espera, y además debe estar visible a la hora de compilar. Del código anterior podrás ver que la función del sistema otra_fun ya no tiene el parámetro para recibir una función, y que dentro manda llamar a una función con un nombre particular, en este ejemplo, hook(). Por lo tanto tú debes escribir una función con la misma firma y el mismo nombre, y además, como mencioné, tu función deberá estar visible al compilador.

Lo que vamos a ver hoy requiere de funciones hook.

Idle task

La idle task (se pronuncia más o menos, [áidel task]) es una función que crea FreeRTOS de manera automática con la prioridad más baja (¿recuerdas la constante tskIDLE_PRIORITY que usamos en la creación de tareas?) y que se ejecuta únicamente cuando no hay tareas de mayor prioridad activas. Esto es, en FreeRTOS siempre habrá una tarea ejecutándose cuando todas las demás estén bloqueadas.

Un uso que FreeRTOS le da a esta tarea, que ya había mencionado en otra lección, es devolver la memoria indicada por una llamada a la función free() en algunos de los esquemas de memoria dinámica (heap_2.c, heap_4.c, y heap_5.c). De manera conveniente FreeRTOS abrió una puerta para que nosotros los programadores tengamos la posibilidad de ejecutar código cada vez que la idle task se ejecuta.

RECUERDA: Tu código debe ser simple y no debe bloquear a la idle task. Así mismo toma en cuenta que esta tarea podría no ejecutarse contínuamente, ya que si la carga de trabajo es alta, entonces ésta, y en consecuencia tu código, se estará ejecutando esporádicamente.

Idle hook

La idle task siempre está en estado Ready (y cuando hay tiempo en el sistema pasa al estado Running), ya sea que se ejecute o no periódicamente. Para que tú inyectes tu código debes realizar cuatro cosas:

1. En el archivo FreeRTOSConfig.h asegúrate que la constante configUSE_IDLE_HOOK está puesta a 1:

#define configUSE_IDLE_HOOK 1

2. (Opcional) La idle task es a final de cuentas una tarea, por lo cual requiere de una pila. El tamaño mínimo de ésta ya viene especificado, pero si tu código necesitase más memoria, entonces debes aumentar su tamaño. En el mismo archivo FreeRTOSConfig.h puedes modificar su valor:

#define configIDLE_STACK_SIZE ( ( UBaseType_t ) 192 )
#define configIDLE_TASK_NAME  "IDLE" // por si quisieras cambiarle el nombre

3. La hook de la idle task se llama vApplicationIdleHook(). Ésta la puedes colocar donde quieras; en el proyecto Molcajete decidí poner todas las hooks en un mismo lugar (aunque puede ser diferente, por supuesto). Abre el archivo hooks.cpp (revisa la lección anterior para más detalles) y ubica a la función:

#if configUSE_IDLE_HOOK == 1
void vApplicationIdleHook()
{

}
#endif

Puedes escribir el código que quieres inyectar a la idle task directamente en esta función, o puedes hacerlo modular (indirecto, por si después necesitas agregar más funcionalidades sin ensuciar al código fuente). Para el ejemplo que veremos más adelante, un heartbeat(), utilizaremos la forma indirecta, es más limpia y el compilador se encarga de la optimización. De esta forma nuestra función vApplicationIdleHook() nos queda como:

#if configUSE_IDLE_HOOK == 1
void vApplicationIdleHook( void )
{
   void heartbeat_idle_hook();
   heartbeat_idle_hook();
}
#endif

4. Cuando activas la inyección de código en la idle task FreeRTOS espera una función vApplicationIdleHook(), la cual acabamos de escribir, pero así como está en este momento no hace nada, falta la función heartbeat_idle_hook() (o como gustes llamarle). Ésta es una función nuestra y la escribiremos en nuestro sketch (o donde nosotros querramos si estamos compilando desde la consola). La implementación completa está más adelante en el ejemplo, pero por el momento veamos su esqueleto para platicar un par de cosas interesantes:

// en un sketch o archivo fuente nuestro:

#ifdef __cplusplus
extern "C"{
#endif
void heartbeat_idle_hook()
{
   taskENTER_CRITICAL();
   {
      // tu código aquí
   }
   taskEXIT_CRITICAL();
}
#ifdef __cplusplus
}
#endif

1. La constante __cplusplus le dice al compilador que las funciones marcadas dentro de extern "C"{ ... } deben ser tratadas como funciones de C, no de C++ (recuerda que los sketches se compilan con C++). El compilador de C++ le cambia el nombre a las funciones para poder implementar cosas muy padres, como la sobrecarga de funciones, pero C no tiene idea de eso. Si la función heartbeat_idle_hook() no estuviera dentro de la sección extern "C"{ ... }, entonces su nombre podría ser algo tan raro como __vjgd74_heartbeat_idle_hook(). C++ sabe de este nombre porque él se lo puso, pero cuando mezclas C++ y C, éste último espera (de hecho es FreeRTOS quien espera) a la función heartbeat_idle_hook(), y aunque pareciera que sí existe, para el compilador de C no es así.

2. Tu código debe estar encerrado en una sección crítica (aplica sólo para micros de 8 bits):

taskENTER_CRITICAL();
{
   // tu código aquí
}
taskEXIT_CRITICAL();

Normalmente esto no es necesario (por «normalmente» hay que entender microcontroladores de 16 o 32 bits), pero de no hacerlo así el programa compilará pero no funcionará correctamente (le pasó al primo de un amigo y casi se vuelve loco). El problema es que en el código del heartbeat() que está más adelante utilizaremos aritmética de 16 bits, pero el ATmega328 es de 8 bits, y la atomicidad de las operaciones no está garantizada en un ambiente multitarea. El problema que tuve es el siguiente: cuando hacía restas (substracciones) sin región crítica obtuve resultados erróneos e inverosímiles, tales como: 1-0=65535 ó 0-0=65535. En un micro de 8 bits las operaciones y asignaciones de 16 y 32 bits ocupan varias instrucciones que deben ser ejecutadas de manera atómica (sin interrupciones), así que para asegurar dicha atomicidad encerré todo el código que involucra aritmética de 16 bits (el tipo TickType_t es de 16 bits) dentro de una región crítica.

RECUERDA: Las regiones críticas se deben ejecutar lo más rápido posible porque has desactivado a las interrupciones.

Tick del sistema

FreeRTOS utiliza un timer del microcontrolador para realizar cambios de contexto (cambiar entre tareas) periódicos. Los detalles de cuál timer y cuál configuración no son importantes en este momento; lo que sí es digno de anotarse es la frecuencia:

// en el archivo FreeRTOSConfig.h:
#define configTICK_RATE_HZ ( ( portTickType ) 1000 )

Esto significa que el planificador de tareas de FreeRTOS es llamado 1000 veces por segundo (la frecuencia), o lo que es lo mismo, una vez cada 1ms (el periodo). Aunque hemos estado utilizando este valor a lo largo del curso, no siempre tiene porqué ser tan rápido; quizás alguna aplicación requiera algo más lento, como 100 ó 10 veces por segundo.

Lo que verdaderamente nos interesa en este instante es que, así como le hicimos con la idle task, también podemos inyectar código en la función de FreeRTOS que es ejecutada en cada tick (con estos valores, cada 1ms).

RECUERDA: Tu código debe ser simple y no debe bloquear al tick. El tick NO es una tarea, es una función del sistema operativo que es llamada cada vez que el tiempo del timer correspondiente expiró, y desde el punto de vista del nivel sistema operativo utiliza la pila verdadera del chip.

Tick hook

Para que inyectes tu código debes realizar tres cosas (aplica la misma teoría de la idle hook, por lo cual no la repetiré a menos que se presenta alguna particularidad):

1. En el archivo FreeRTOSConfig.h asegúrate que la constante configUSE_TICK_HOOK está puesta a 1:

#define configUSE_TICK_HOOK 1

2. La hook del tick hook se llama vApplicationTickHook(). Ésta la puedes colocar donde quieras; en el proyecto Molcajete decidí poner todas las hooks en un mismo lugar (aunque puede ser diferente, por supuesto). Abre el archivo hooks.cpp (revisa la lección anterior para más detalles) y ubica a la función:

#if configUSE_TICK_HOOK == 1
void vApplicationTickHook( void )
{

}
#endif

Puedes escribir el código que quieres inyectar al tick directamente en esta función, o puedes hacerlo modular (indirecto, por si después necesitas agregar más funcionalidades sin ensuciar al código fuente). Para el ejemplo que veremos más adelante, un heartbeat(), utilizaremos la forma indirecta, es más limpia y el compilador se encarga de la optimización. De esta forma nuestra función vApplicationTickHook() nos queda como:

#if configUSE_TICK_HOOK == 1
void vApplicationTickHook( void )
{
   void heartbeat_tick_hook();
   heartbeat_tick_hook();
}
#endif

3. Cuando activas la inyección de código en el tick hook, FreeRTOS espera una función vApplicationTickHook(), la cual acabamos de escribir, pero así como está en este momento no hace nada, falta la función heartbeat_tick_hook() (o como gustes llamarle). Ésta es una función nuestra y la escribiremos en nuestro sketch (o donde nosotros querramos si estamos compilando desde la consola). La implementación completa está más adelante en el ejemplo, pero por el momento veamos su esqueleto:

// en un sketch o archivo fuente nuestro:

#ifdef __cplusplus
extern "C"{
#endif
void heartbeat_tick_hook()
{
   taskENTER_CRITICAL();
   {
      // tu código aquí
   }
   taskEXIT_CRITICAL();
}
#ifdef __cplusplus
}
#endif

Watchdog

En los sistemas embebidos un watchdogsupervisor» en español) es un contador hacia abajo independiente, que cuando llega a cero envía una señal de reset al procesador. Para evitarlo el programa debe estar constantemente alimentándolo (o como la mayoría de textos dice, «pateándolo«, del inglés «kicking the dog«). «Alimentar» significa escribir en unos registros especiales unos valores especiales, y también viene del inglés, feed the dog, otra expresón común con los supervisores. Mientras el programa se ejecuta correctamente, el supervisor es alimentado antes de que la cuenta llegue a cero; pero si el programa se pierde, y puede perderse por muchas razones, entonces ya no lo alimentará dejando que la cuenta llegue a cero, y por lo tanto, reiniciará al sistema.

Aunque cada procesador implementa sus propias formas de inicializar y alimentar al supervisor (sin olvidar que existen supervisores externos), el centro de la discusión no es saber cuándo alimentarlo (obviamente antes de que la cuenta llegue a cero), sino bajo cuáles condiciones debe ser alimentado.

Es decir, no se trata nada más de alimentarlo por alimentarlo, sino hacerlo cuando estemos seguros de que el sistema funciona correctamente. Si detectamos una anomalía, entonces no lo alimentamos, ponemos al sistema en modo seguro, y dejamos que el supervisor reinicie al sistema. Estos pasos son fundamentales en sistemas multitarea, como los que estamos desarrollando: si una tarea crítica falla, de las múltiples que podemos llegar a tener, entonces el sistema debe ser reiniciado.

¿Qué tiene que ver un supervisor con las hooks de FreeRTOS? Te lo responderé de la siguiente manera:

Jamás alimentes al supervisor desde una tarea/interrupción periódica.

Los gurús de los sistemas embebidos

Lo más fácil es alimentar al supervisor desde una interrupción periódica. El problema es que si el programa falla, lo más seguro es que la interrupción continúe, por lo tanto, el sistema no se reiniciará (este tipo de cosas también le han pasado al primo de un amigo).

Nosotros vamos a usar el hook de la idle task para implementar a nuestro supervisor por software. Esta tarea no es ni de cerca periódica, lo cual la convierte en una candidata excelente para esta actividad (y aunque podríamos escribir una tarea de baja prioridad para nuestro supervisor, ya tenemos a la hook, ¡aprovechémosla!). En términos generales, si otorgamos a las tareas críticas una prioridad alta, y alguna de éstas se cuelga, entonces la idle task no se ejecutará, y por lo tanto, tampoco nuestro supervisor por software, y como la alimentación del supervisor por hardware es responsabilidad de ésta, entonces no lo alimentará y el sistema se reiniciará.

¿Porqué no usar la tick hook? Por lo que acabo de mencionar. Al final de cuentas la tick hook no solamente es periódica, sino que funciona por fuera del ámbito de nuestras tareas de usuario, por lo cual, si una tarea se cuelga, el tick siempre seguirá funcionando, dando al traste con la idea principal de los supervisores.

Deadline

Los sistemas embebidos de tiempo real deben realizar su trabajo a tiempo, antes de que sea demasiado tarde: el sistema de frenos de un auto debe activar las bolsas de aire antes de un cierto tiempo (tiempo contabilizado después de haber recibido la señal de impacto), ya que de no hacerlo, el conductor podrá sufrir graves daños. Al tiempo entre que la señal se genera y el sistema responde le llamamos el deadline (o tiempo límite). Existen hard deadlines, como el ejemplo de las bolsas de aire, y también existen soft deadlines, donde pasarse un poco no tiene consecuencias fatales.

Te comento esto porque el ejemplo que veremos no sólo verifica que las tareas no se cuelguen, sino que además reiniciará al sistema si uno o más deadlines no se cumplen.

Implementación del supervisor por software

Hay muchas formas de implementar al supervisor por software (en inglés, «software watchdog«), y la que te mostraré en los ejemplos es una adaptación de una que encontré en:

[LEWIS02] D.W. Lewis. Fundamentals of embedded software. Prentice Hall, USA: 2002.

La idea es la siguiente: tendrás una lista de tareas críticas (no todas las tareas son o deberían ser críticas). Cada tarea tendrá 3 atributos: busy: Bool; failed: Bool; deadline: UINT; last: UINT. busy indica si la tarea está ocupada o no; failed significa si la tarea perdió el deadline (se pasó) o no; deadline guarda el tiempo máximo en el que la tarea debe responder; last indica el tiempo en el que tarea inició su proceso. Al inicio las banderas se ponen a false y estableces el deadline. Cuando un evento se presenta y dispara la ejecución de una tarea, busy se pone a true y guardar la marca de tiempo en last. Cuando la tarea finaliza la bandera failed se pone a true si el deadline se perdió (la tarea se tardó más), e incondicionalmente pone busy a false. El siguiente pseudocódigo muestra la forma general de este método:

// Tarea productora del evento
last[ TASK ] = current_time()
busy[ TASK ] = True
SemaphoreGive(...)


// Tarea consumidora del evento
while( True )
{
	SemaphoreTake(...)

	/* hace lo que tiene que hacer ... */

	if current_time() - last[ TASK ]  > deadline[ TASK ] {
		failed[ TASK ] = True
	}

	busy = False
}

// Software watchdog
Idle_hook()
{
	now = current_time()

	for i = 0; i < MAX_TASKS; ++i {

		if  failed[ i ] == True { break }
		// la tarea perdió su deadline
		
		if busy[ i ] == True && now - last[ i ] > dealine[ i ] { break }
		// ¡ la tarea se perdió por completo !
	}

	if i == MAX_TASKS { feed_the_dog() }
	else { clean_and_reset() }
}

Implementación del watchdog por hardware

El micro ATmega328 incluye un watchdog, pero no lo vamos a usar; en su lugar utilizaremos uno externo. Pero si no tienes uno o no quieres construir uno, entonces puedes usar el interno (aquí está una explicación superficial (en inglés) de cómo usar a este watchdog interno). Ahora que si quieres construir uno, aquí (en mi blog alternativo) podrás encontrar información sobre cómo construir uno con el chip 555. De hecho, la explicación la tuve que sacar de aquí porque la lección se estaba haciendo demasiado larga. Ya sea que visites mi otro blog, o no, aquí te dejo el diagrama del watchdog basado en el 555:

Ejemplos

Ejemplo 1: Heartbeat en la idle hook con tiempo simétrico

Este primer ejemplo, y el más sencillo, muestra cómo implementar un heartbeat() utilizando la hook de la idle task. Como no debemos usar funciones bloqueantes en las hooks (vTaskDelay(), por ejemplo), estaremos calculando diferencias de tiempos.

El tiempo es simétrico porque es el mismo tanto para el LED encendido como apagado. La función xTaskGetTickCount() devuelve el número de ticks transcurridos desde el último reinicio del programa. Nota que el único lugar donde utilizamos la macro de conversión pdMS_TO_TICKS() fue en la constante que establece el intervalo de tiempo; fuera de eso, el resto del código trabaja en término de ticks y no de milisegundos.

#include <FreeRTOS.h>
#include <task.h>

#define PROGRAM_NAME "hooks_1.ino"

#define TIME_INTERVAL pdMS_TO_TICKS( 500 )
// el LED parpadea cada 500ms

#ifdef __cplusplus
extern "C"{
#endif
void heartbeat_idle_hook()
{
   static bool led_state = false;
   static TickType_t last_time = 0;

   TickType_t current;
   TickType_t elapsed_time;

   taskENTER_CRITICAL();
   {
      current = xTaskGetTickCount(); 
      elapsed_time = current - last_time;

      if( elapsed_time >= TIME_INTERVAL ){

         last_time = current;

         digitalWrite( 13, led_state );
         led_state = !led_state;
      }
   }
   taskEXIT_CRITICAL();
}
#ifdef __cplusplus
}
#endif

void a_task( void* pvParameters )
{
    while( 1 )
    {
        vTaskDelay( pdMS_TO_TICKS( 497 ) );
        Serial.println( "Ok" );
    }
}

void setup()
{
   Serial.begin( 115200 );
   Serial.println( PROGRAM_NAME );

   pinMode( 13, OUTPUT );
   // lo vamos a usar en la hook de la idle task

   xTaskCreate( a_task, "TSK1", 128, NULL, tskIDLE_PRIORITY + 1, NULL );
   // establecemos que esta tarea tenga mayor prioridad que la idle task

   vTaskStartScheduler();

}

void loop() {}

La función xTaskGetTickCount() devuelve un valor de 16 bits para el ATmega328; esto implica que el contador interno de ticks de FreeRTOS pasa de 65535 a 0 (le decimos rollover) cada 65.535 segundos (cada minuto, aprox). Haber utilizado diferencias de tiempo como en la expresión: elapsed_time = current - last_time nos permite obtener siempre resultados correctos que permiten que el programa funcione de manera continua a pesar de los constantes rollovers.

Las variables led_state y last_time son ser estáticas porque deben mantener su valor entre llamadas.

El archivo hooks.cpp, incluyendo nada más el código de la idle hook queda así:

#include <Arduino.h>
#include <FreeRTOS.h>
#include <task.h>

#ifdef __cplusplus
extern "C"{
#endif

#if configUSE_MALLOC_FAILED_HOOK == 1
void vApplicationMallocFailedHook()
{
}
#endif

#if configCHECK_FOR_STACK_OVERFLOW > 0
void vApplicationStackOverflowHook( TaskHandle_t xTask, char *pcTaskName )
{
}
#endif

#if configUSE_IDLE_HOOK == 1
void vApplicationIdleHook()
{
   void heartbeat_idle_hook();
   heartbeat_idle_hook();
}
#endif

#if configUSE_TICK_HOOK == 1
void vApplicationTickHook( void )
{
}
#endif

#if configUSE_DAEMON_TASK_STARTUP_HOOK == 1
void vApplicationDaemonTaskStartupHook( void )
{
}
#endif

#ifdef __cplusplus
}
#endif

Ejemplo 2: Heartbeat en la idle hook con tiempo asimétrico

Este ejemplo es un poquito más complejo que el anterior y lo puedes utilizar si no quieres un tiempo simétrico, o si tu aplicación necesita intervalos asimétricos. La diferencia principal está en que utiliza la variable led_state como parte de una pequeña máquina de estados. El resto del código es idéntico al ejemplo anterior.

#include <FreeRTOS.h>
#include <task.h>

#define PROGRAM_NAME "hooks_2.ino"

#define TIME_ON  pdMS_TO_TICKS( 250 )
#define TIME_OFF pdMS_TO_TICKS( 750 )

#ifdef __cplusplus
extern "C"{
#endif
void heartbeat_idle_hook()
{
   static bool led_state = false;
   static TickType_t last_time = 0;

   TickType_t current;
   TickType_t elapsed_time;

   taskENTER_CRITICAL();
   {
      current = xTaskGetTickCount(); 
      elapsed_time = current - last_time;

      if( led_state == false && elapsed_time >= TIME_OFF ){

         digitalWrite( 13, HIGH );
         led_state = true;
         last_time = current;

      } else if( led_state == true && elapsed_time >= TIME_ON ){

         digitalWrite( 13, LOW );
         led_state = false;
         last_time = current;

      } else{  // siempre debemos completar el if-else if-else:
         ; 
      }
   }
   taskEXIT_CRITICAL();
}
#ifdef __cplusplus
}
#endif

void a_task( void* pvParameters )
{
    while( 1 )
    {
        vTaskDelay( pdMS_TO_TICKS( 497 ) );
        Serial.println( "Ok" );
    }
}

void setup()
{
   Serial.begin( 115200 );
   Serial.println( PROGRAM_NAME );

   pinMode( 13, OUTPUT );
   // lo vamos a usar en la hook de la idle task

   xTaskCreate( a_task, "TSK1", 128, NULL, tskIDLE_PRIORITY + 1, NULL );
   // establecemos que esta tarea tenga mayor prioridad que la idle task

   vTaskStartScheduler();
}

void loop() {}

Ejemplo 3: Heartbeat en la tick hook con tiempo asimétrico

En este tercer ejemplo vamos a implementar el heartbeat() con tiempos asimétricos utilizando la tick hook. A diferencia de lo que hicimos en la idle hook, donde teníamos que utilizar diferencias de tiempos porque no sabíamos cada cuándo se activaría la tarea, en este caso solamente vamos a «contar» ticks, ya que esta hook se llama 1 vez en cada tick:

#include <FreeRTOS.h>
#include <task.h>

#define PROGRAM_NAME "hooks_3.ino"

#define TIME_ON  pdMS_TO_TICKS( 100 )
#define TIME_OFF pdMS_TO_TICKS( 900 )

#ifdef __cplusplus
extern "C"{
#endif
void heartbeat_tick_hook()
{
   static bool led_state = false;
   static uint16_t ticks = TIME_OFF;

   taskENTER_CRITICAL();
   {
      --ticks;
      if( ticks == 0 ){

         digitalWrite( 13, led_state );
         led_state = !led_state;

         ticks = led_state == false ? TIME_ON : TIME_OFF;
      }
   }
   taskEXIT_CRITICAL();
}
#ifdef __cplusplus
}
#endif

void a_task( void* pvParameters )
{
    while( 1 )
    {
        vTaskDelay( pdMS_TO_TICKS( 497 ) );
        Serial.println( "Ok" );
    }
}

void setup()
{
   Serial.begin( 115200 );
   Serial.println( PROGRAM_NAME );

   pinMode( 13, OUTPUT );
   // lo vamos a usar en la hook del tick system


   xTaskCreate( a_task, "TSK1", 128, NULL, tskIDLE_PRIORITY, NULL );
   // no importa la prioridad

   vTaskStartScheduler();
}

void loop() {}

Antes de ejecutar este programa asegúrate que en el archivo hooks.cpp tienes las siguientes líneas de código (no pongo el contenido completo de este archivo para no alargar la lección):

#if configUSE_TICK_HOOK == 1
void vApplicationTickHook( void )
{
   void heartbeat_tick_hook();
   heartbeat_tick_hook();
}
#endif

Ejemplo 4: Supervisor por software basado en la idle hook y el chip 555

Este ejemplo es una implementación del supervisor por software que mencioné anteriormente. Tiene 4 tareas, 3 de ellas críticas (la task_3() no es crítica).

Agrupé en una structura los parámetros necesarios para el supervisor, en lugar de mantenerlos sueltos, y declaré un arreglo para 3 tareas críticas:

struct Watchdog
{
   bool       busy{ false };
   bool       failed{ false };
   TickType_t deadline{ 0 };
   TickType_t last_time{ 0 };

   // podríamos agregar un ID que nos ayude a identificar la tarea que falló
};

// No todas las tareas deben ser supervisadas:
#define MAX_TASKS 3
Watchdog tasks[ MAX_TASKS ];

El código del supervisor es el siguiente. Un ciclo recorre la lista de tareas críticas y en cuanto una de ellas se cuelgue o el deadline no se haya cumplido, sale y ejecuta la función clean_and_reset(); en caso de no encontrar problemas, llama a la función feed_the_dog():

void watchdog_idle_hook()
{
   TickType_t now;
   uint8_t i;

   // las secciones críticas se necesitan cuando un micro de 8 bits debe hacer
   // operaciones o asignaciones de 16 o 32 bits en un ambiente multitarea

   taskENTER_CRITICAL();
   {
      now = xTaskGetTickCount();

      for( i = 0; i < MAX_TASKS; ++i ){

         if( tasks[ i ].failed == true ) break;
         // la tarea perdió el deadline (se tardó más de lo debido)

         TickType_t diff = now - tasks[ i ].last_time;
         if( tasks[ i ].busy == true && diff > tasks[ i ].deadline ) break;
         // la tarea se perdió
      }

      if( i == MAX_TASKS ) feed_the_dog();
      // ninguna tarea se colgó y todos los deadlines se cumplieron

      else clean_and_reset();
      // una o más tareas se colgaron, o uno o más deadlines no se cumplieron
   }
   taskEXIT_CRITICAL();
}

La función para alimentar al supervisor externo es feed_the_dog(), la cual envía un pulso negativo (ALTO-BAJO-ALTO) de 100 microsegundos antes de que el tiempo expire. Escogí el pin D4 de Arduino:

void feed_the_dog()
{
   // alimentamos al perro guardián:

   digitalWrite( 4, LOW );
   delayMicroseconds( 100 );
   digitalWrite( 4, HIGH );
}

Sin mayor problema podrías adaptarla para que alimentes al watchdog interno, si es que decidiste utilizarlo.

La función clean_and_reset() es llamada cuando el sistema falló. Ésta forza un reinicio, pero no sin antes poner al sistema a un estado seguro, y avisar de alguna forma, que algo salió mal:

void clean_and_reset()
{
   // pone el sistema a un estado seguro


   Serial.println( "***" );
   // avisamos con un texto, un LED, un timestamp, etc.


   // obligamos a un watchdog reset:
   vTaskSuspendAll();
   taskDISABLE_INTERRUPTS();
   while( 1 )
      ;
}

El esqueleto de una tarea crítica en este programa es:

void critical_task( void* pvParameters )
{
   tasks[ 0 ].deadline = TIME_LIMIT;
   // el deadline es estático

   // inicializa lo relacionado con la tarea...

   while( 1 )
   {
      tasks[ TASK ].busy = true;
      tasks[ TASK ].last_time = xTaskGetTickCount();


      // --- Empieza el proceso:

      // aquí va el proceso crítico...

      // --- Finalizó el proceso


      TickType_t diff = xTaskGetTickCount() - tasks[ TASK ].last_time;
      if( diff > tasks[ TASK ].deadline ) tasks[ TASK ].failed = true;

      tasks[ TASK ].busy = false;
   }
}

Finalmente, actualiza el código del archivo hooks.cpp para que la idle hook mande llamar al supervisor:

#if configUSE_IDLE_HOOK == 1
void vApplicationIdleHook()
{
   void watchdog_idle_hook();
   watchdog_idle_hook();
}
#endif

(No olvides poner la constante configUSE_IDLE_HOOK a 1 en el archivo FreeRTOSConfig.h)

Ya tenemos todos los elementos para poder ver al ejemplo completo:

  • La task_1 hace parpadear a un LED a una razón que se va incrementendo en cada paso lo que hace que a larga el programa falle.
  • La task_2 imprime su nombre en la consola serial. No falla.
  • Las tareas task_3 y task_4 trabajan de manera coordinada; la primera hace las veces de generadora de eventos, mientras que la segunda los consume. Este par de tareas podrían hacer que el sistema falle.

Por supuesto las tareas las diseñé así para que el programa falle bajo condiciones controladas.

#include <FreeRTOS.h>
#include <task.h>

#define PROGRAM_NAME "hooks_4.ino"

#define TIME_LIMIT_1 pdMS_TO_TICKS( 500 )
#define TIME_LIMIT_2 pdMS_TO_TICKS( 500 )
#define TIME_LIMIT_3 pdMS_TO_TICKS( 500 )


struct Watchdog
{
   bool       busy{ false };
   bool       failed{ false };
   TickType_t deadline{ 0 };
   TickType_t last_time{ 0 };

   // podríamos agregar un ID que nos ayude a identificar la tarea que falló
};


// No todas las tareas deben ser supervisadas:
#define MAX_TASKS 3
Watchdog tasks[ MAX_TASKS ];


TaskHandle_t consumer_handler = NULL;
// lo necesita una de las tareas (la productora)


#ifdef __cplusplus
extern "C"{
#endif

void feed_the_dog()
{
   // alimentamos al perro guardián:

   digitalWrite( 4, LOW );
   delayMicroseconds( 100 );
   digitalWrite( 4, HIGH );
}

void clean_and_reset()
{
   // pone el sistema a un estado seguro


   Serial.println( "***" );
   // avisamos con un texto, un LED, un timestamp, etc.


   // obligamos a un watchdog reset:
   vTaskSuspendAll();
   taskDISABLE_INTERRUPTS();
   while( 1 )
      ;
}

//----------------------------------------------------------------------
//                     watchdog_idle_hook()
//----------------------------------------------------------------------
void watchdog_idle_hook()
{
   TickType_t now;
   uint8_t i;

   // las secciones críticas se necesitan cuando un micro de 8 bits debe hacer
   // operaciones o asignaciones de 16 o 32 bits en un ambiente multitarea

   taskENTER_CRITICAL();
   {
      now = xTaskGetTickCount();

      for( i = 0; i < MAX_TASKS; ++i ){

         if( tasks[ i ].failed == true ) break;
         // la tarea perdió el deadline (se tardó más de lo debido)


         TickType_t diff = xTaskGetTickCount() - tasks[ i ].last_time;
         if( tasks[ i ].busy == true && diff > tasks[ i ].deadline ) break;
         // la tarea se perdió

      }

      if( i == MAX_TASKS ) feed_the_dog();
      else clean_and_reset();
   }
   taskEXIT_CRITICAL();
}
#ifdef __cplusplus
}
#endif


//----------------------------------------------------------------------
//                          task_1()
//----------------------------------------------------------------------
void task_1( void* pvParameters )
{
   tasks[ 0 ].deadline = TIME_LIMIT_1;
   // el deadline es estático

   pinMode( 2, OUTPUT );
   // un led en D2

   bool led_state = false;

   uint16_t excess_time = pdMS_TO_TICKS( 50 );

   while( 1 )
   {

      tasks[ 0 ].busy = true;
      tasks[ 0 ].last_time = xTaskGetTickCount();


      // --- Empieza el proceso:

      vTaskDelay( excess_time );

      digitalWrite( 2, led_state );
      led_state = !led_state;

      Serial.println( pcTaskGetName( NULL ) );
      // hacemos tiempo calculando el nombre en lugar de usar texto estático

      // --- Finalizó el proceso


      TickType_t diff = xTaskGetTickCount() - tasks[ 0 ].last_time;
      if( diff > tasks[ 0 ].deadline ) tasks[ 0 ].failed = true;

      tasks[ 0 ].busy = false;


      excess_time += pdMS_TO_TICKS( 5 );
   }
}

//----------------------------------------------------------------------
//                          task_2()
//----------------------------------------------------------------------
void task_2( void* pvParameters )
{
   tasks[ 1 ].busy = false;
   tasks[ 1 ].failed = false;
   tasks[ 1 ].deadline = TIME_LIMIT_2;

   while( 1 )
   {
      tasks[ 1 ].busy = true;
      tasks[ 1 ].last_time = xTaskGetTickCount();


      // --- Empieza el proceso:

      vTaskDelay( pdMS_TO_TICKS( 93 ) );

      Serial.println( pcTaskGetName( NULL ) );

      // --- Finalizó el proceso


      TickType_t diff = xTaskGetTickCount() - tasks[ 1 ].last_time;
      if( diff > tasks[ 1 ].deadline ) tasks[ 1 ].failed = true;
      tasks[ 1 ].busy = false;
   }
}

//----------------------------------------------------------------------
//                          task_3()
//----------------------------------------------------------------------
void task_3( void* pvParameters )
{
   configASSERT( consumer_handler );

   // las inicializamos aquí porque la tarea productora tiene mayor prioridad que la consumidora:
   tasks[ 2 ].busy = false;
   tasks[ 2 ].failed = false;
   tasks[ 2 ].deadline = TIME_LIMIT_3;


   TickType_t last_wake_time = xTaskGetTickCount();

   uint16_t excess_time = 0;

   while( 1 )
   {
      vTaskDelayUntil( &last_wake_time, pdMS_TO_TICKS( 111 + excess_time  ) );

      // actualizamos los atributos ANTES de dar el aviso:
      tasks[ 2 ].busy = true;
      tasks[ 2 ].last_time = xTaskGetTickCount();

      digitalWrite( 13, HIGH );

      xTaskNotifyGive( consumer_handler );
      // avisamos

      Serial.println( pcTaskGetName( NULL ) );

      excess_time += 10;
   }
}

//----------------------------------------------------------------------
//                          task_4()
//----------------------------------------------------------------------
void task_4( void* pvParameters )
{
   while( 1 )
   {
      // --- Empieza el proceso:

      // los atributos fueron establecidos en el productor antes de dar el aviso

      ulTaskNotifyTake( pdTRUE, portMAX_DELAY );

      Serial.println( pcTaskGetName( NULL ) );

      digitalWrite( 13, LOW );

      // --- Finalizó el proceso


      TickType_t diff = xTaskGetTickCount() - tasks[ 2 ].last_time;
      if( diff > tasks[ 2 ].deadline ) tasks[ 2 ].failed = true;

      tasks[ 2 ].busy = false;
   }
}

//----------------------------------------------------------------------
//                         setup()
//----------------------------------------------------------------------
void setup()
{
   Serial.begin( 115200 );
   Serial.println( PROGRAM_NAME );

   pinMode( 13, OUTPUT );

   pinMode( 4, OUTPUT );
   digitalWrite( 4, HIGH );

   xTaskCreate( task_1, "TSK1", 128, NULL, tskIDLE_PRIORITY + 0, NULL );
   // requiere supervisión, podría fallar

   xTaskCreate( task_2, "TSK2", 128, NULL, tskIDLE_PRIORITY + 1, NULL );
   // requiere supervisión, pero nunca falla

   xTaskCreate( task_3, "TSK3", 128, NULL, tskIDLE_PRIORITY + 2, NULL );
   // productor (no requiere supervisión)

   xTaskCreate( task_4, "TSK4", 128, NULL, tskIDLE_PRIORITY + 1, &consumer_handler );
   // consumidor (requiere supervisión)


   feed_the_dog();

   vTaskStartScheduler();
}

void loop() {}

¿Qué sigue?

Hoy vimos 3 temas importantes: 2 sobre cómo inyectar código nuestro al sistema operativo, y otro sobre cómo implementar y utilizar un supervisor por software utilizando un watchdog externo.

Tú podrías utilizar ya sea un watchdog externo propiamente dicho, o el interno al ATmega328.

RECUERDA: No utilices funciones bloqueantes dentro de las hooks de FreeRTOS.

RECUERDA: Nunca alimentes al watchdog desde una tarea o interrupción periódica.

RECUERDA: Revisa que el sistema está en perfectas condiciones antes de alimentar al supervisor.

Índice del curso

Espero que esta lección haya sido de tu interés. Si fue así podrías suscribirte a mi blog, o escríbeme a fjrg76 dot hotmail dot com, o comparte esta entrada con alguien que consideres que puede serle de ayuda.


Si encuentras este blog interesante, entonces podrías considerar suscribirte a él y recibir información relevante sobre tecnología y sistemas embebidos, y de vez en cuando, uno que otro regalo.

Administración de la memoria

No podíamos terminar este curso sin mencionar brevemente la forma en que FreeRTOS administra la memoria. De ésta tenemos dos tipos, la memoria dinámica y la memoria estática, siendo la primera la que más problemas da. La memoria dinámica es memoria RAM que le pedimos al sistema operativo cuando el programa ya está ejecutándose. En programas de computadora esta es una actividad que realizamos todo el tiempo, sin apenas darnos cuenta. Aunque lenguajes como C/C++ nos obligan a devolver manualmente la memoria que hayamos pedido, hay otros más «pesados» (en términos de recursos y tiempo de procesamiento) que devuelven la memoria por nosotros, y además pareciera, en cualquier caso, que la memoria RAM nunca se termina. Y somos felices.

Sin embargo, los sistemas embebidos son arena de otro costal. La memoria RAM está severamente limitada y raras veces podemos darnos el lujo de pedir y devolver memoria en tiempo de ejecución. Y aunque tuviéramos la posibilidad de hacerlo debemos preguntarnos si eso es realmente lo que queremos.

La memoria dinámica tiene varios problemas que se amplifican en nuestros sistemas con recursos limitados:

  • Las llamadas a la función malloc() no son deterministas; esto es, mientras que una llamada puede tomar 300 microsegundos, otra podría tomar 5 milisegundos, y luego otra 1 milisegundo.
  • La función malloc() no es re-entrante. Esto es, la función no está diseñada para ser accesada por dos o más tareas al mismo tiempo en un programa concurrente, con lo cual el sistema podría corromperse. Este problema se podría resolver desactivando las interrupciones o al sistema operativo mientras malloc() se está ejecutando. El problema es que en algunos casos la búsqueda de un bloque de memoria contiguo puede tardar mucho.
  • La memoria se fragmenta. Cuando tu programa inicia toda la memoria es contigua, pero cada vez que la pides y la devuelves se van creando huecos, y podría llegar a ser que tengas, digamos, 100 bloques no contiguos (es decir, con huecos entre uno y otro) de 16 bytes (por dar un número) cada uno. En total tienes 1600 bytes NO contiguos. ¿Qué sucedería si haces una petición de 20 bytes? La llamada a malloc() va a fallar porque en el escenario propuesto no hay ningún bloque contiguo de al menos 20 bytes. «Oye, pero tengo 1600 bytes, ¿cómo es eso posible?». Bueno, la memoria se fragmentó.
  • Fugas de memoria. Las funciones que piden memoria, deben devolverla (en los casos más clásicos. Cuando veamos los diferentes esquemas de FreeRTOS retomaré este punto), ¿y qué pasa si la devuelves? Ese bloque de memoria queda marcado como «en uso», por lo que aunque ya no la estés utilizando, a los ojos del sistema operativo sí lo estás haciendo. La consecuencia es que nadie más, durante la vida del programa, podrá usar esa memoria.
  • FreeRTOS no devuelve la memoria inmediatamente. Uno pensaría que cuando llamas a la función free() la memoria se devuelve al momento y queda lista para volver a ser utilizada. Bueno, esto no es del todo cierto. FreeRTOS devuelve la memoria cuando tiene un poco de tiempo para hacerlo. Esto significa que si la carga de trabajo de tu aplicación es alta y FreeRTOS no se hace de tiempo, entonces esa memoria que devolviste… realmente no ha sido devuelta. Y lo peor de todo es que el escenario no es determinista (otra vez) por lo que repetir el incidente para investigarlo será casi imposible.
  • ¿Cuánta memoria es suficiente?. La memoria dinámica se toma de una región llamada heap, y tú estableces el tamaño (en bytes) de esa región. En los ejemplos que hemos estado viendo el heap está establecido como 1500 bytes, de los 2048 que tiene el ATmega328. ¿1500 es mucho o poco? ¿Porqué no los 2048? ¿Porqué no 1024? ¿Qué sucede con los bytes que no usemos del heap? FreeRTOS proporciona herramientas para determinar si 1500 bytes es mucho o poco, lo cual es tema de esta lección.

Durante mucho tiempo la creación dinámica de objetos fue la única disponible en FreeRTOS, y no fue sino hasta la versión 9.0.0 que se introdujo la creación estática de objetos. Y antes de ella siempre me había preguntado porqué no estaba «de fábrica», ya que yo conocía de los problemas de la memoria dinámica. De hecho, los gurús de los sistemas embebidos advierten sobre su utilización y aconsejan no usarla. Afortunadamente me leyeron la mente e introdujeron el concepto a FreeRTOS.

Ahora que si a pesar de los problemas descritos, quieres seguir utilizando la creación dinámica de objetos, o estás trabajando sobre un proyecto que ya la usa (ya me pasó), o tu aplicación a mano los necesita, sí o sí, entonces lo mejor es que conozcas cómo se comporta y se administra la memoria para que no te lleves (desagradables) sorpresas.

En esta lección vamos a platicar sobre algunos conceptos importantes relacionados con la memoria, y de herramientas de FreeRTOS que nos permitirán determinar los tamaños óptimos para el heap y las pilas de las tareas, para intentar disminuir los problemas que se nos presenten.

Tabla de contenidos

¿Qué es y qué necesito?

Vamos a platicar sobre dos problemas que se nos presentan en el desarrollo de programas con FreeRTOS (y en general, con cualquier sistema operativo):

  • Desboardamiento del heap: un heap demasiado grande equivale a desperdicio de memoria, y uno demasiado pequeño equivale a problemas.
  • Desbordamiento de la pila. Igual que el anterior, pero con la pila de cada tarea.

Y también sobre algunas herramientas que FreeRTOS ofrece para afinar sus respectivos tamaños.

Heap

¿De dónde sale la memoria para la creación dinámica de objetos? La memoria es tomada de un área conocida como el heap (se pronuncia más o menos [jép]. En español es montón). En FreeRTOS es un arreglo cuyo tamaño tú defines en el archivo FreeRTOSConfig.h:

#define configTOTAL_HEAP_SIZE (( UBaseType_t )( 1500 ))

(Para el ATmega328 el tipo UBaseType_t está difinido como unsigned char. Para más detalles revisa el archivo portmacro.h.)

Aunque el ATmega328 tiene 2048 bytes de RAM, no es recomendable usar toda para el heap, ya que cuando declaras variables globales, o usas funciones antes de iniciar al sistema operativo, o usas interrupciones, necesitarás memoria que no es parte del heap.

Como mencioné en la introducción, ¿1500 bytes es mucho o poco? ¿Y si tengo dos tareas nada más, cada una con una pila de 256 bytes, qué sucede con la memoria no utilizada del heap? Hacer a ésta área de memoria muy grande o muy chica tiene sus consecuencias. Si la haces muy grande y no la utilizas toda, entonces estarás desperdiciando valiosa memoria (la RAM no se da en los árboles); pero si la haces muy chica, entonces se terminará antes de lo esperado y no podrás crear más objetos dinámicos mientras el programa se está ejecutando (tiempo de ejecución, le decimos). Los objetos estáticos no tienen esta disyuntiva: si eventualmente se termina la memoria lo sabrás cuando estés compilando el programa (tiempo de compilación) y sabrás qué hacer antes de instalar tu aplicación en la copa de un árbol; en el caso de que se agote la memoria en tiempo de ejecución, entonces tendrás que subir a la copa del árbol a presionar el botón de reset.

En FreeRTOS es posible que una tarea cree a otra tarea (u objetos dinámicos), pero lo más recomendable, en la medida de los posible, es crear TODOS los objetos dinámicos al inicio del programa, y probar que realmente se hayan creado, y no sólo suponerlo. Además, tampoco deberías destruirlos (es decir, devolver su memoria). Este escenario nos hace pensar en utilizar, mejor, objetos estáticos.

En algunas aplicaciones podría ser prohibitivo o inconveniente, desde muchos ángulos, probar el valor valor devuelto por la función xTaskCreate() de todas las tareas que hayas creado. Para estos casos puedes indicarle a FreeRTOS que te avise cuando una llamada a malloc() falle, es decir, cuando malloc() no haya encontrado la memoria contigua requerida.

Primero deberás decirle a FreeRTOS que quieres el aviso. Para ello debes poner a 1 la constante configUSE_MALLOC_FAILED_HOOK en el archivo FreeRTOSConfig.h:

#define configUSE_MALLOC_FAILED_HOOK 1

Luego debes escribir una función callback (también conocidas como hooks) que será llamada por FreeRTOS. Una función callback o hook es una función que tú escribes, pero que el programa (en este caso FreeRTOS) utilizará como si fuera parte de él, sin ser parte de él. También le llamamos inyección de dependencias. Anteriormente en el curso ya hemos usado varias callbacks, así que el término no te debería ser desconocido. La función callback que deberás escribir es la siguiente:

Dentro de ella tú estableces qué hacer en caso de que la memoria del heap se haya agotado, y todo dependerá de la aplicación que estés escribiendo. Las acciones podrían ir desde activar un LED de error, o escribir un mensaje en la consola, o algo más drástico como resetear al sistema. Ten en cuenta que esta función la utilizarías mientras el programa está en desarrollo o pruebas y es para que te des cuenta si el tamaño de heap debe ser incrementado.

En el Ejemplo 1 que veremos más adelante está una posible implementación. Para que este mecanismo funcione asegúrate de crear todos los objetos dinámicos, directa o indirectamente, que tu programa va a necesitar.

Desbordamiento de la pila

Cuando creas tareas debes especificar la cantidad de memoria para su pila. La pila es una estructura de datos dinámica (en el sentido de que su contenido está cambiando continuamente) que almacena las variables declaradas en la tarea, así como los argumentos que envías a funciones y los valores que éstas pudieran devolver. Si tu tarea, o las funciones a las que llama, declaran muchas variables (o unas pocas pero del tipo arreglo), o las llamadas a las funciones son muy profundas, corres el riesgo de que la pila se desborde por tener un tamaño insuficiente. El desbordamiento de la pila es cuando se rebasa el tamaño máximo de la misma, por las razones que mencioné. ¿Cuál es el tamaño ideal para una pila? Es difícil decirlo; esta es una tarea que involucra la experiencia y el sentido común. FreeRTOS propone un tamaño mínimo para la pila para cada procesador. En el caso del ATmega328 es:

#define configMINIMAL_STACK_SIZE ( ( UBaseType_t ) 85 )

Y de ahí para arriba. Pero ¿cuánto es suficiente? Afortunadamente FreeRTOS incluye herramientas que nos permitirán verificar si la pila se ha desbordado, y en caso afirmativo, afinar su tamaño. Debes activar esta herramienta antes de usarla, estableciendo un valor 1 ó 2 en el archivo FreeRTOSConfig.h:

/*
 * 0: Deshabilitada
 * 1: Precisa
 * 2: Más precisa
 */
#define configCHECK_FOR_STACK_OVERFLOW 1

Cuando este mecanismo detecte un desbordamiento de la pila mandará llamar una función hook llamada vApplicationStackOverflowHook():

Los argumentos xTask y pcTaskName contienen cada uno, de manera respectiva, el handler y el nombre de la tarea que hizo que llegaras ahí.

(Nota: Por algún desfase entre el código y la documentación, el tipo del parámetro pcTaskName se muesra como signed char*, sin embargo el compilador lo rechazará; en su lugar usa char* únicamente, como lo he hecho en los ejemplos más adelante.)

En caso de un problema con la pila FreeRTOS buscará la función vApplicationStackOverflowHook() en tu proyecto y llevará a cabo lo que ahí indiques.

¿Qué hay de los valores 1 y 2 de la constante configCHECK_FOR_STACK_OVERFLOW? En realidad son dos métodos para la detección de los desbordamientos.

Método 1

configCHECK_FOR_STACK_OVERFLOW es puesta a 1. FreeRTOS observará, en cada cambio de contexto, si la pila no ha sido desbordada. En caso de desbordamiento llamará a la función hook vApplicationStackOverflowHook().

Método 2

configCHECK_FOR_STACK_OVERFLOW es puesta a 2. FreeRTOS llenará la pila con un valor conocido cuando la tarea sea creada. Posteriormente, en cada cambio de contexto observará los últimos 16 bytes de la pila para ver si cambiaron con respecto al valor inicial. Este método es más lento, pero te avisará antes de un posible desbordamiento. El aviso viene en forma de una llamada a la función hook vApplicationStackOverflowHook(). Esta funcionalidad se presta para ser utilizada con un depurador por hardware.

Ambos métodos introducen más carga de trabajo al sistema, por lo que deberías utilizarlas solamente mientras desarrollas la aplicación, o en la etapa de pruebas. Además, no tiene sentido continuar un programa si su pila se ha desbordado, so pena de muchos problemas.

FreeRTOS indica que no existe una garantía de capturar todos los desbordamientos, y es lógico. Pero tú puedes ayudarlo haciendo que tu programa recorra todos los caminos posibles en la lógica de tu aplicación. Imagina que uno de estos caminos manda llamar a funciones que ocupan mucha memoria, pero asumes que «en la vida real» la condición para tomar ese camino «nunca» se dará. Resulta que como la vida real es caprichosa («Si puede suceder, entonces sucederá«) un día se le ocurrirá tomar ese camino y la pila se desbordará (ya cuando tu aplicación ha sido distribuída y el dispositivo ha sido instalado en la copa de un árbol en medio de la selva). Moraleja:

Si programas un camino crítico en tu programa, entonces haz pruebas que lo recorran.

Antes de terminar esta sección sobre conceptos veamos las diferentes opciones que FreeRTOS ofrece para las funciones malloc()/free().

Implementaciones de malloc

FreeRTOS incluye 5 implementaciones de malloc(), cada una con características y potencias diferentes, dentro de archivos con el nombre heap_x.c (donde la x va desde 1 hasta 5):

  1. heap_1.c. Es la implementación más simple y no permite devolver la memoria (la función free() no está definida). Su uso no es recomendado debido a que la creación estática de objetos la vino a substituir.
  2. heap_2.c. Igual que heap_1.c, pero permite devolver la memoria, sin juntar los fragmentos, cuyo resultado es memoria fragmentada.
  3. heap_3.c. Utiliza las funciones malloc() y free() de tu compilador/plataforma. Por ejemplo, en el caso del ATmega328 esta implementación simplemente manda llamar a dichas funciones que ya vienen incluídas en el compilador avr-gcc. NOTA: El projecto Molcajete utiliza este esquema; pero si tú quisieras usar algún otro, simplemente descárgalo y cópialo en el directorio /src de FreeRTOS. (Por favor lee la nota NOTA IMPORTANTE al final del primer ejemplo.)
  4. heap_4.c. Igual que heap_2.c, pero junta los fragmentos. Por supuesto es más lenta, pero la memoria no se fragmenta. FreeRTOS recomienda utilizar este esquema en lugar de heap_2.c para programas nuevos.
  5. heap_5.c. Igual que heap_4.c, pero permite distribuir el heap entre diferentes regiones de RAM. Algunos microcontroladores modernos y potentes incluyen no una región de RAM, sino varias no contiguas. Este esquema aprovecha todas las regiones, y obviamente es el más complicado de todos.

Recuerda: Solamente un esquema de los 5 mencionados debe estar activo a un mismo tiempo, de lo contrario el programa no compilará (por duplicidad de símbolos).

Función uxTaskGetStackHighWaterMark()

Si quieres saber cuánta memoria queda en la pila de una tarea debes utilizar la función uxTaskGetStackHighWaterMark():

Esta función devuelve el número de elementos aún no utilizados (words o bytes, depende de la definición de UBaseType_t) desde que la tarea indicada por el argumento xTask inició. Puedes usar el valor NULL para referirte a la propia tarea que hizo la llamada. En los ejemplos más adelante veremos su uso.

Configuración

Antes de ver los ejemplos de esta sección debes de saber que la forma de construir (compilar) sketches es un tanto especial cuando integras código de terceros, tal como FreeRTOS, y las cosas no siempre funcionan como uno esperaría. Y este es el caso de las funciones hook que te he platicado.

Idealmente deberías escribir el código de la hook en el mismo sketch de tu aplicación, pero no funciona. Por esta razón creé originalmente un archivo especial hooks.c en el directorio /src de FreeRTOS del proyecto Molcajete. Pero este tiene un problema, ¡es un archivo de lenguaje C, no de C++!. Esto significa que en dicho archivo no podrás usar objetos de C++, como por ejemplo Serial.println(), ya que en C no existen objetos. Pero si utilizaras funciones como digitalWrite() no tendrías inconvenientes. Sin embargo, para efectos de depuración de los programas es mejor contar con una salida de texto.

Por ello deberás realizar los siguientes 3 simples pasos antes de probar los ejemplos de la siguiene sección:

  1. Ubica (pero no lo abras, no es necesario) al archivo hooks.c, el cual está en /tu/ruta/de/instalación/de/molcajete/arduino-1.8.13/libraries/FreeRTOS/src.
  2. Cámbiale el nombre a hooks.c.txt. Este paso es para que no lo borres y para que el compilador no lo considere en el proceso de compilación. Por supuesto que también puedes borrarlo con toda seguridad.
  3. En ese mismo directorio crea un nuevo archivo hooks.cpp con el siguiente contenido (copia y pega para evitar errores de dedo):
#include <Arduino.h>
#include <FreeRTOS.h>
#include <task.h>

#ifdef __cplusplus
extern "C"{
#endif


#if configUSE_MALLOC_FAILED_HOOK == 1
void vApplicationMallocFailedHook()
{
}
#endif

#if configCHECK_FOR_STACK_OVERFLOW > 0
void vApplicationStackOverflowHook( TaskHandle_t xTask, char *pcTaskName )
{
}
#endif

#if configUSE_IDLE_HOOK == 1
void vApplicationIdleHook()
{
}
#endif

#if configUSE_TICK_HOOK == 1
void vApplicationTickHook( void )
{
}
#endif

#ifdef __cplusplus
}
#endif

En este archivo he concentrado todas las hooks que FreeRTOS pudiera llegar a necesitar; para la lección de hoy solamente utilizaremos las primeras dos.

Ejemplos

Los ejemplos a continuación los vas a estar trabajando con tres archivos abiertos: hooks.cpp (para que escribas el código de tus hooks), FreeRTOSConfig.h (para jugar con las constantes), y los archivos de tus sketches.

Ejemplo 1: Terminándonos el heap

(* Nota importante al final del ejemplo.)

En este ejemplo vamos a pedir más memoria de la que establecimos para el heap y veremos si la función vApplicationMallocFailedHook() se manda llamar. Recuerda que estas herramientas son para cuando estás desarrollando o probando tus aplicaciones, no son para código de producción (es decir, el código que ejecuta tu aplicación ya en campo).

Abre el archivo hooks.cpp y modifica la función vApplicationMallocFailedHook() con el siguiente contenido:

void vApplicationMallocFailedHook()
{
   Serial.println( "ERR: Se agotó el heap. El programa se detiene." );

   while( 1 )
      ;
}

El ciclo infinito while( 1 ); es para que el programa no continúe. Si esta función se llega a llamar es porque debes aumentar el heap o afinar el tamaño de la pila de cada tarea. Que el programa continue no tendría sentido.

Es importante que en este tipo de funciones no utilices funciones bloqueantes (xTaskDelay(), temporizadores, xQueueReceive(), etc) dado que tu programa está en una situación crítica; sin embargo sí que podrás utilizar algunas funciones de FreeRTOS que devuelven valores sin bloquearse (es decir, que regresan inmediatamente), o funciones de Arduino que no sean bloqueantes, como digitalWrite(), por ejemplo.

Luego escribe un sketch con el siguiente contenido:

#include <FreeRTOS.h>
#include <task.h>

// No olvides poner configUSE_MALLOC_FAILED_HOOK a 1 en el archivo FreeRTOSConfig.h

#define PROGRAM_NAME "mem_mang_1.ino"


void led_task( void* pvParameters )
{
    uint8_t pin = (uint8_t) pvParameters;

    pinMode( pin, OUTPUT );

    while( 1 )
    {
        digitalWrite( pin, HIGH );
        vTaskDelay( pdMS_TO_TICKS( 497 ) );
        digitalWrite( pin, LOW );
        vTaskDelay( pdMS_TO_TICKS( 497 ) );
    }
}

void setup()
{
   Serial.begin( 115200 );
   Serial.println( PROGRAM_NAME );


   xTaskCreate( led_task, "LD1", 256, (void*) 13, tskIDLE_PRIORITY, NULL );
   Serial.println( "La tarea 1 se creó ..." );

   xTaskCreate( led_task, "LD2", 256, (void*) 12, tskIDLE_PRIORITY, NULL );
   Serial.println( "La tarea 2 se creó ..." );

   xTaskCreate( led_task, "LD3", 256, (void*) 11, tskIDLE_PRIORITY, NULL );
   Serial.println( "La tarea 3 se creó ..." );

   xTaskCreate( led_task, "LD4", 256, (void*) 10, tskIDLE_PRIORITY, NULL );
   Serial.println( "La tarea 4 se creó ..." );


   vTaskStartScheduler();
}

void loop() {}

Este programa intentará crear 4 tareas (reutilizando el código de una misma, aunque eso es lo de menos en el ejemplo), cada una pidiendo 256 words, es decir, 512 bytes para cada pila. Este ejemplo lo voy a ejecutar utilizando la implementación heap_3.c, y para ésta el valor de la constante configTOTAL_HEAP_SIZE no es tomado en cuenta, por lo que su valor es irrelevante (el porqué usé el esquema heap_3.c lo explico en la nota al final del ejemplo).

#define configTOTAL_HEAP_SIZE (( UBaseType_t )( 1500 )) // Irrelevante para heap_3.c

La salida de este programa es:

En este ejemplo nosotros sabemos dónde y porqué se agotó el heap y porqué el programa se detuvo, dado que ese es el objetivo del ejemplo; sin embargo, en un escenario real donde crees objetos dinámicos de forma dispersa será difícil determinar, con el método Serial.println(), dónde fue el punto exacto donde el programa falló. Para determinarlo deberás utilizar un depurador por hardware, tal como el AVR Dragon (para el chip ATmega328). Microcontroladores más modernos usan depuradores más potentes, o las tarjetas donde vienen montados (las conocidas como tarjetas de desarrollo o de evaluación) integran el soporte para la depuración por hardware.

Parte de mi colección de tarjetas de desarrollo. Puedes ver que algunas integran al depurador por hardware en la misma tarjeta, mientras que en otras conecté depuradores externos (ambos del tipo JTAG). Desafortunadamente es muy complicado depurar por hardware al ATmega328 en Linux.

*NOTA IMPORTANTE: En esta lección hablé sobre creación dinámica de tareas, y en la del día de hoy hablé de las diferentes implementaciones para las funciones malloc() y free(), y durante todo el curso he estado utilizando la versión heap_3.c, la cual como ya dije, utiliza las implementaciones de dichas funciones que vienen con el compilador avr-gcc. ¿Y qué sucede con las otras? NO FUNCIONAN con el micro/compilador ATmega328/avr-gcc. Así de simple, y por lo tanto, si no tienes una razón de peso para no utilizar el esquema heap_3.c, entonces quédate con él, o mejor aún, descarta el uso de objetos dinámicos y usa nada más objetos estáticos.

Lo anterior te lo he comentado por dos cosas:

  1. Para que no te lleves sorpresas desagradables si utilizas un esquema diferente a heap_3.c.
  2. Porque FreeRTOS incluye una función, xPortGetFreeHeapSize(), que te devuelve el número de palabras (que no de bytes) que quedan libres en el heap. Sin embargo, el esquema heap_3.c no es parte de FreeRTOS y por eso la función no es implementada. No obstante, ahí queda la información para cuando uses un microcontrolador y compilador diferente. Dicha función la puedes utilizar para afinar el tamaño del heap en una aplicación dada.

Ejemplo 2: Desbordando la pila

Para este y el siguiente ejemplo fue complicado encontrar ejemplos simples que desbordaran a la pila; lo mejor que encontré fue utilizar funciones recursivas (una función recursiva es aquella que se llama a sí misma de manera controlada). Y aún así no me fue posible ver la llamada a la función hook vApplicationStackOverflowHook() (de alguna manera el código generado esta súperoptimizado). Sin embargo, ya tienes los elementos para supervisar los problemas con la pila.

La función hook, en el archivo hooks.cpp, quedó así:

#if configCHECK_FOR_STACK_OVERFLOW > 0
void vApplicationStackOverflowHook( TaskHandle_t xTask, char *pcTaskName )
{
   Serial.print( "ERR: Desbordamiento de pila para la tarea: " );
   Serial.println( pcTaskName );

   vTaskSuspendAll();
   taskDISABLE_INTERRUPTS();

   while( 1 )
      ;
}
#endif

La función vTaskSuspendAll() suspende al planificador, esto es, ya no habrá cambios de contexto; para restaurar al planificador usa xTaskResumeAll(). Ambas funciones devuelven void y no reciben argumentos. La función vTaskSuspenAll() no detiene las interrupciones, para ello es necesario que llames a taskDISABLE_INTERRUPTS(); para reactivarlas usa taskENABLE_INTERRUPTS(). La documentación oficial menciona que este par de funciones deberían ser substituídas por taskENTER_CRITICAL() y taskEXIT_CRITICAL() (ambas funciones también devuelven void y no reciben argumentos), pero el contexto en que estoy usando me da para usar las funciones que te mencioné (y además internamente son diferentes). Puedes usar vTaskSuspendAll() y taskDISABLE_INTERRUPTS() en el Ejemplo 1.

El programa de prueba es:

#include <FreeRTOS.h>
#include <task.h>

// No olvides poner configCHECK_FOR_STACK_OVERFLOW a 1 o 2 en el archivo FreeRTOSConfig.h

#define PROGRAM_NAME "mem_mang_2.ino"


void led_task( void* pvParameters )
{
    uint8_t pin = (uint8_t) pvParameters;
    pinMode( pin, OUTPUT );

    while( 1 )
    {
        digitalWrite( pin, HIGH );
        vTaskDelay( pdMS_TO_TICKS( 200 ) );
        digitalWrite( pin, LOW );
        vTaskDelay( pdMS_TO_TICKS( 200 ) );
    }
}

void recursive( int val )
{
   if( val > 0 ){
      Serial.print( "Val = " );
      Serial.println( val );
      recursive( val - 1 );
   } else{
      return;
   }
}

void other_task( void* pvParameters )
{
   Serial.println( "Antes del desbordamiento ..." );

   while( 1 )
   {
      vTaskDelay( pdMS_TO_TICKS( 497 ) );
      recursive( 5000 );
   }
}

void setup()
{
   Serial.begin( 115200 );
   Serial.println( PROGRAM_NAME );

   xTaskCreate( led_task, "LED", 128, (void*) 13, tskIDLE_PRIORITY, NULL );
   xTaskCreate( other_task, "OTHR", 128, NULL, tskIDLE_PRIORITY, NULL );

   vTaskStartScheduler();
}

void loop() {}

Al igual que en el ejemplo anterior, no olvides que lo que estamos haciendo es para descubrir problemas con la pila ANTES de que instales tu aplicación en la copa de un árbol muy alto en una selva. Una vez que has determinado que no hay problemas con las pilas, entonces deberías desactivar tanto su verificación como la del heap.

En una situación crítica donde la pila se desbordó quizás sea mejor idea mantener el código de vApplicationStackOverflowHook() lo más simple posible. Serial.print() está lejos de ser simple; en su lugar puedes activar un LED (si tu hardware y aplicación lo permiten):

#if configCHECK_FOR_STACK_OVERFLOW > 0
void vApplicationStackOverflowHook( TaskHandle_t xTask, char *pcTaskName )
{
   digitalWrite( 2, HIGH );

   vTaskSuspendAll();
   taskDISABLE_INTERRUPTS();

   while( 1 )
      ;
}
#endif

Para que funcione debes configurarlo en la función setup():

void setup()
{
   pinMode( 2, OUTPUT );
   digitalWrite( 2, LOW );

   // más código
}

Este ejemplo no tiene imagen de la salida de una ejecución porque fue difícil obtener algo

Ejemplo 3: Verificando la memoria disponible en la pila con uxTaskGetStackHighWaterMark()

Mencioné que la función uxTaskGetStackHighWaterMark() devuelve el número de elementos restantes de la pila que no han sido usados. En este ejemplo imprimiremos dichos valores, que para el ATmega328 el valor devuelto estará en bytes.

#include <FreeRTOS.h>
#include <task.h>

// No olvides poner configCHECK_FOR_STACK_OVERFLOW a 1 o 2 en el archivo FreeRTOSConfig.h

#define PROGRAM_NAME "mem_mang_3.ino"

void led_task( void* pvParameters )
{
    uint8_t pin = (uint8_t) pvParameters;

    pinMode( pin, OUTPUT );

    while( 1 )
    {
        digitalWrite( pin, HIGH );
        vTaskDelay( pdMS_TO_TICKS( 200 ) );
        digitalWrite( pin, LOW );
        vTaskDelay( pdMS_TO_TICKS( 200 ) );
    }
}

void recursive( int val )
{
   uint8_t arr[ val ];

   UBaseType_t high_watermark = uxTaskGetStackHighWaterMark( NULL );
   Serial.print( "Memoria restante: " );
   Serial.println( high_watermark );

   if( val > 0 ){
      Serial.print( "Val = " );
      Serial.println( val );

      vTaskDelay( pdMS_TO_TICKS( 50 ) );

      recursive( val - 1 );
   } else{
      return;
   }
}

void other_task( void* pvParameters )
{

   UBaseType_t high_watermark = uxTaskGetStackHighWaterMark( NULL );
   Serial.println( "Memoria antes del desbordamiento ..." );
   Serial.println( high_watermark );

   while( 1 )
   {
      vTaskDelay( pdMS_TO_TICKS( 497 ) );

      recursive( 30 );

      Serial.print( "MEMORIA RESTANTE: " );
      high_watermark = uxTaskGetStackHighWaterMark( NULL );
      Serial.println( high_watermark );
   }
}

void setup()
{
   // para stackOverflow:
   pinMode( 2, OUTPUT );
   digitalWrite( 2, LOW );

   Serial.begin( 115200 );
   Serial.println( PROGRAM_NAME );

   xTaskCreate( led_task, "LED", 128, (void*) 13, tskIDLE_PRIORITY, NULL );
   xTaskCreate( other_task, "OTHR", 128, NULL, tskIDLE_PRIORITY, NULL );

   vTaskStartScheduler();
}

void loop() {}

La salida de este programa cuando configCHECK_FOR_STACK_OVERFLOW es 1 es la siguiente:

Observa que a pesar de que la cantidad de memoria restante es cero, al menos entró dos veces más a la función recursiva. Y aunque la impresión serial ya no continuó, el programa hizo cosas raras, y muy importante, no llamó a la función vApplicationStackOverflowHook(); el programa pareció haberse perdido. Es natural, ya que el desbordamiento de una pila tiene consecuencias desastrosas.

Veamos la salida de este programa cuando configCHECK_FOR_STACK_OVERFLOW es 2:

Aquí puedes ver que el programa se detuvo antes de que la memoria de la pila llegara 0, y aunque no se ve, sí entró a la función vApplicationStackOverflowHook(). Ya te decía que la función Serial.print() no es exactamente simplona, por lo cual no aparece el mensaje de error; sin embargo, en su lugar sí que el LED asociado a D2 se activó, y a diferencia de la ejecución anterior, el programa efectivamente se detuvo y dejó de hacer cosas raras. La hook que utilicé para este ejemplo es:

#if configCHECK_FOR_STACK_OVERFLOW > 0
void vApplicationStackOverflowHook( TaskHandle_t xTask, char *pcTaskName )
{
   Serial.print( "ERR: Desbordamiento de pila para la tarea: " );
   Serial.println( pcTaskName );

   digitalWrite( 2, HIGH );

   vTaskSuspendAll();
   taskDISABLE_INTERRUPTS();

   while( 1 )
      ;
}
#endif

¿Qué sigue?

Hoy vimos un tema algo delicado, importante y muy técnico relacionado con los sistemas operativos: la administración de la memoria. Vimos conceptos y herramientas que nos permiten identificar mientras desarrollamos o probamos la aplicación posibles desbordamientos del heap o de las pilas. También vimos que de las 5 implementaciones de malloc() que FreeRTOS incluye, debemos usar heap_3.c para el ATmega328.

RECUERDA: Siempre que te sea posible evita el uso de memoria dinámica y prefiere en su lugar la memoria estática.

RECUERDA: Si te vez utilizando la implementación heap_1.c, entonces puedes considerar usar memoria estática.

RECUERDA: Si estás desarrollando para el ATmega328 utiliza la implementación heap_3.c.

RECUERDA: Prefiere la implementación heap_4.c en lugar de la heap_2.c (en plataformas que no tengan problemas con ellas.

RECUERDA: FreeRTOS no devuelve inmediatamente la memoria cuando llamas a free(). FreeRTOS espera a que haya tiempo libre en el sistema para devolverla (a través de la tarea interna Idle_task).

RECUERDA: Cuando utilices configCHECK_FOR_STACK_OVERFLOW para atrapar los desbordamientos de las pilas prefiere el Método 2 (valor 2); ya vimos que es más seguro.

RECUERDA: Las herramientas para atrapar los desbordamientos del heap y de las pilas deberías utilizarlos únicamente mientras estás desarrollando tu aplicación. Puedes auxiliarte con la compilación condicional para eliminarla en código de producción.

Índice del curso

Espero que esta entrada haya sido de tu interés. Si fue así podrías suscribirte a mi blog, o escríbeme a fjrg76 dot hotmail dot com, o comparte esta entrada con alguien que consideres que puede serle de ayuda.


Si encuentras este blog interesante, entonces podrías considerar suscribirte a él y recibir información relevante sobre tecnología y sistemas embebidos, y de vez en cuando, uno que otro regalo.

Colas

Hasta el momento en este curso hemos visto tres formas de comunicación inter-tareas:

  1. Con notificaciones directas a la tarea.
  2. Con flujos.
  3. Con mensajes.

Y nos queda una por explorar: las colas. Este fue el primer mecanismo que implementó FreeRTOS para pasar grandes cantidades de datos entre tareas o entre interrupciones y tareas.

La diferencia entre los 3 mecanismos listados y las colas es que éstas puedes ser escritas y leídas por dos o más tareas. Además, la escritura y lectura se realiza estrictamente elemento por elemento.

Aunque históricamente las colas fueron el primer mecanismo para la comunicación inter-tareas, con el paso del tiempo, la retroalimentación de los usuarios de FreeRTOS, y la evolución de éste, los administradores del proyecto se dieron cuenta que podían implementar un mecanismo más ligero para pasar datos de una tarea a otra, ya que casi todo el tiempo esta es la forma de trabajar.

Pero eso no quiere decir que las colas sean obsoletas o que deban ser descartadas; todo lo contrario. En algún momento te verás en la necesidad de utilizarlas. En la sección de ejemplos vamos a ver una tarea que la hace de «controlador» o «concentrador», mientras que dos tareas «producen» datos.

Aunque en el Arduino UNO no es posible lo siguiente, piensa en una interfaz gráfica. En éstas diversos periféricos generan eventos: una pantalla touch, algún botón o teclado físico, sensores de posición, temporizadores internos, puerto serial, etc. Todas estos eventos se guardan en una cola de eventos conforme van llegando. El «controlador» de la interfaz gráfica va despachando evento por evento, desde el más antiguo al más reciente (siempre en este orden).

Además, debes saber que las colas son la estructura de datos subyacente para otros mecanismos de FreeRTOS, como los semáforos (y su API primitiva) y los temporizadores por software.

Tabla de contenidos

¿Qué es y qué necesito?

Una cola (queue, en inglés, y pronunciada como [kiú]) es una estructura de datos con las siguientes características:

  • Es lineal. Es decir, todos los elementos, excepto el primero y el último, tienen un antecesor y un predecesor.
  • Se escribe por un extremo de la cola, y se lee por el extremo opuesto. Esto logra que el elemento que llegó primero sea el primero en ser atendido (al contrario de las pilas, donde el último elemento en llegar es el primero en ser atendido). Las colas también son conocidas como estructuras FIFO (first in, first out, primero en llegar, primero en salir).
  • El acceso a los elementos de enmedio, es decir, que no sean los extremos, está prohibido.
  • Aunque las colas son un tipo especial de listas, en las colas no se puede buscar un elemento ni se puede ordenar.

Las colas tienen dos operaciones básicas, más una auxiliar (muy útil), cuyos nombres varían de autor a autor, pero que significan lo mismo:

  • Insertar en la cola: Insert(), Insertar(), InsertFront(), Enqueue(), Push(), Send().
  • Extraer de la cola: Remove(), Remover(), RemoveBack(), Retrieve(), Dequeue(), Pop(), Receive().
  • Observar el elemento en el frente de la cola. Observar(), Peek().

(Los términos en negrita corresponden a los nombres que utiliza FreeRTOS para las respectivas operaciones, aunque yo en mis clases de Estructuras de Datos uso Enqueue(), Dequeue() y Peek(), pero como este es un curso de FreeRTOS, entonces utilizaré sus nombres para evitar confusiones.)

RECUERDA: Insertamos en la parte trasera (rear, back, tail) de la cola, y extraemos del frente (front, head) de la misma.

Al igual que con los mensajes, la operación Send() copia los datos en la cola, y Receive() copia los datos desde la cola. Si de plano es prohibitivo realizar las copias dado que cada dato sea muy grande, entonces puedes crear una cola de apuntadores (o referencias) y mantener los datos en búferes externos.

Para que puedas utilizar las colas en tus programas asegúrate incluir al encabezado #include <queue.h> en cada uno de ellos. Recuerda también que si vas a crear las colas con memoria dinámica, entonces deberás poner a 1 la constante configSUPPORT_DYNAMIC_ALLOCATION; y si vas a crear las colas con memoria estática, entonces deberás poner a 1 la constante configSUPPORT_STATIC_ALLOCATION, ambas en el archivo FreeRTOSConfig.h:

#define configSUPPORT_STATIC_ALLOCATION  1
#define configSUPPORT_DYNAMIC_ALLOCATION 1

Para facilitar el uso de las colas en nuestros programas escribí 4 constantes simbólicas, las cuales vamos a utilizar en los ejemplos de esta lección. De éstas tú estableces las dos primeras; las últimas 2 se calculan de manera automática. Por supuesto que puedes escribir directamente los valores donde sea necesario, pero en mi experiencia es preferible hacerlo de esta manera:

#define QUEUE_ELEMS ( 8 )

Es el número de elementos (no de bytes) de la cola.

#define QUEUE_TYPE uint16_t

Aquí estableces el tipo de dato del o de los elementos que va a guardar la cola. En este ejemplo se trata de variables uint16_t, pero podrían ser también de otros tipos básicos o tipos compuestos, como lo verás en los ejemplos.

#define QUEUE_TYPE_SIZE ( sizeof( QUEUE_TYPE ) )

Calcula el tamaño del tipo en bytes. No deberías manipularlo directamente.

#define QUEUE_BUFFER_SIZE ( ( QUEUE_ELEMS ) * ( QUEUE_TYPE_SIZE ) )

Calcula el número de bytes que necesita el búfer subyacente de la cola. No deberías manipularlo directamente.

Desarrollo

Las colas necesitan un búfer de datos interno, es decir, un lugar dónde guardar cada elemento. Este búfer es un arreglo de elementos uint8_t de tamaño ( QUEUE_ELEMS x QUEUE_TYPE_SIZE ) que estableces cuando creas a los objetos. Cuando utilizas creación dinámica de objetos este arreglo se crea de manera automática dentro de la función xQueueCreate(), por lo cual no debes preocuparte de nada (en apariencia. Siempre que te sea posible utiliza la creación estática de objetos, es más segura); mientras que si usas la creación estática de objetos, tú eres responsable de pasarle dicho arreglo a la función xQueueCreateStatic(). A continuación veremos ambas formas.

(Para conocer o recordar la diferencia y la configuración entre objetos dinámicos y estáticos, te recomiendo leer esta lección y esta otra lecciónrespectivamente. Aunque ambas hablan sobre tareas, la teoría aplica para cualquier tipo de objeto.)

CREACIÓN DINÁMICA

La función para crear mensajes dinámicos es:

uxQueueLength. Es el número máximo de elementos (no de bytes) que la cola almacenará. Puedes utilizar el valor QUEUE_ELEMS del que hablé hace un momento.

uxItemSize. Es el número de bytes que requiere un elemento. Puedes utilizar el valor QUEUE_TYPE_SIZE del que hablé hace un momento.

Valor devuelto. Si hubo memoria suficiente para crear la cola, entonces la función devuelve el handler; en caso contrario (no hubo memoria suficiente) devuelve NULL. Este handler lo debes de guardar, tanto para que el resto de funciones de flujos sepan sobre cuál van a operar, como para probar que efectivamente fue creado.

Por ejemplo, para crear una cola 10 elementos de tipo uint16_t, utilizando las constantes simbólicas anteriores, tendrías lo siguiente:

#include <FreeRTOS.h>
#include <task.h>
#include <queue.h>

QueueHandle_t g_queue_handler = NULL;
// el handler debe ser global

#define QUEUE_ELEMS ( 10 )
#define QUEUE_TYPE uint16_t
#define QUEUE_TYPE_SIZE ( sizeof( QUEUE_TYPE ) )

void setup()
{
   // código ...

   g_queue_handler = xQueueCreate( QUEUE_ELEMS, QUEUE_TYPE_SIZE );

   configASSERT( g_queue_handler );
   // Error creando a la cola

   // más código ...
}

El handler, g_queue_buffer, es global porque debe ser visible tanto en la tarea productora como en la consumidora.

La creación dinámica de objetos no asegura que los objetos se creen, principalmente por escasez de memoria RAM. Por eso siempre deberías preguntar si el objeto realmente fue creado, y aunque existen diversas maneras de hacerlo, en este ejemplo escogí la macro configASSERT() que es una versión propia de la conocida assert() de C (en el archivo FreeRTOSConfig.h podrás ver la implementación que utilicé para el projecto Molcajete). Por supuesto que puedes usar al condicional if y tratar de recuperarte sin terminar el programa de manera abrupta.

Creación estática

Ahora veamos cómo crear a un mensaje de manera estática. Como es usual, esta forma de creación de objetos necesita más pasos, pero es mucho más segura ya que los objetos siempre serán creados.

uxQueueLength. Es el número máximo de elementos (no de bytes) que la cola almacenará. Puedes utilizar el valor QUEUE_ELEMS del que hablé hace un momento.

uxItemSize. Es el número de bytes que requiere un elemento. Puedes utilizar el valor QUEUE_TYPE_SIZE del que hablé hace un momento.

pucQueueStorageBuffer. Es el búfer interno de la cola donde se guardarán los datos, y es un arreglo de tamaño ( uxQueueLength * uxItemSize ) bytes de elementos de tipo uint8_t. Este arreglo deberá existir a lo largo del programa, por lo cual deberás crearlo de manera global al programa, o estática a la función donde fue llamada xQueueCreateStatic().

pxQueueBuffer. Esta es la variable que guarda el estado de la cola. Al igual que el parámetro pucQueueStorageBuffer, deberá existir a lo largo del programa.

Valor devuelto. El handler al mensaje. La creación estática de objetos nunca falla, a menos que el arreglo o la variable de estado sean NULL, cosa que tal vez nunca suceda.

Un ejemplo de su uso es:

#include <FreeRTOS.h>
#include <task.h>
#include <queue.h>

QueueHandle_t g_queue_handler = NULL;
// el handler debe ser global

#define QUEUE_ELEMS ( 10 )
#define QUEUE_TYPE uint16_t
#define QUEUE_TYPE_SIZE ( sizeof( QUEUE_TYPE ) )

void setup()
{
   // código ...

   static uint8_t       queue_buffer_array[ QUEUE_BUFFER_SIZE ];
   static StaticQueue_t queue_buffer_struct;

   g_queue_handler = xQueueCreateStatic( 
         QUEUE_ELEMS, 
         QUEUE_TYPE_SIZE,
         queue_buffer_array,
         &queue_buffer_struct );


   // más código ...
}

En este ejemplo marqué a las variables queue_buffer_array y queue_buffer_struct como static; esto para que perduren durante todo el programa, pero que a la vez sean invisibles al resto de funciones. Una alternativa es que las declares globales, aunque entre menos variables globales tengas en tus programas, mejor. Sin embargo el handler, g_message_buffer, sí que debe ser global porque tanto la tarea productora como la consumidora deben tener acceso a él. Este es un ejemplo de las veces en que sí son necesarias las variables globales. El prefijo g_ nos recuerda que es una variable global y que debemos tener mucho cuidado con ella.

Escribiendo y leyendo hacia y desde la cola

Una vez que la cola fue creada ya puedes utilizarla escribiendo en y leyendo desde ella. Cuando escribes a la cola lo haces elemento por elemento (y por copia, no lo olvides); estos elementos son variables de tipos básicos o tipos compuestos. En el Ejemplo 2 más adelante utilizaremos estos últimos.

La función para escribir en la cola es:

xQueue. Es el handler a la cola.

pvItemToQueue. Es un apuntador al dato que quieres escribir. Éste debe ser promocionado a void* (apuntador a void). Esta función escribe en la parte trasera de la cola.

xTicksToWait. Si no hay espacio disponible en la cola para copiar un nuevo elemento, entonces la tarea productora (la que llama a esta función) entrará al estado Blocked por el tiempo determinado en este parámetro. Si el tiempo expira la tarea pasará al estado Ready. El tiempo de espera está en ticks y puedes usar la macro pdMS_TO_TICKS() para convertir milisegundos a ticks. Así mismo, si escribes 0 la función regresa inmediatamente, y si escribes portMAX_DELAY el tiempo de espera es infinito (siempre y cuando la constante simbólica INCLUDE_vTaskSuspend esté a 1).

Valor devuelto. Si el elemento se copió a la cola, entonces la función devuelve pdTRUE; en caso contrario devolverá errQUEUE_FULL.

Para leer un elemento desde la cola hacia el búfer de tu programa usamos la función xQueueReceive():

xQueue. Es el handler a la cola.

pvBuffer. Es un apuntador a la variable donde quieres guardar el elemento. Esta función lee del frente de la cola.

xTicksToWait. Si la cola está vacía, entonces no hay nada qué leer, por lo tanto la tarea consumidora (la que llama a esta función) entrará al estado Blocked por el tiempo determinado en este parámetro. Si el tiempo expira la tarea pasará al estado Ready. El tiempo de espera está en ticks y puedes usar la macro pdMS_TO_TICKS() para convertir milisegundos a ticks. Así mismo, si escribes 0 la función regresa inmediatamente, y si escribes portMAX_DELAY el tiempo de espera es infinito (siempre y cuando la constante simbólica INCLUDE_vTaskSuspend esté a 1).

Valor devuelto. Si el elemento se leyó de la cola, entonces la función devuelve pdTRUE; en caso contrario devolverá pdFALSE. (Esta es una inconsistencia en la documentación oficial de FreeRTOS, ya que si xQueueSend() devuelve el valor errQUEUE_FULL en caso de que la cola esté llena, entonces xQueueReceive() debería devolver errQUEUE_EMPTY cuando la cola esté vacía, la cual ¡sí existe! y además tiene el mismo valor que pdFALSE. En lo que obtengo una respuesta por parte de FreeRTOS usemos pdFALSE, o haz que tus condicionales se prueben contra pdTRUE.)

Ejemplos

Una vez que tenemos claro el concepto de cola, su uso es bastante simple. Vamos a ver 4 ejemplos que mostrarán diversos aspectos de la utilización de las colas:

  1. Creación dinámica de una cola de tipo simple. Una tarea produce y una tarea consume.
  2. Creación estática de una cola de tipo simple. Dos tareas producen y una tarea consume.
  3. Creación estática de una cola de tipo compuesto. Dos tareas producen y una tarea consume.
  4. Creación estática de una cola de tipo compuesto. Dos tareas producen y una tarea consume. El handler es pasado en el parámetro pvParameters.

Ejemplo 1: Creación dinámica de una cola de tipo simple

Este primer ejemplo la cola es creada de manera dinámica y la tarea productora (la cual es una nada más) escribe en bloques de QUEUE_ELEMS elementos, y la tarea consumidora los consume tan pronto como es posible. El uso del ciclo for no es necesario, todo depende de la fuente de datos que desees transmitir, pero para este ejemplo se me hizo conveniente simular la llegada de n elementos y luego hacer una pausa, y repetir el procedimiento:

#define PROGRAM_NAME "queues_1.ino"

#include <FreeRTOS.h>
#include <task.h>
#include <queue.h>


QueueHandle_t g_queue_handler = NULL;
// el handler debe ser global


#define QUEUE_ELEMS ( 10 )
// ¿Cuántos elementos tendrá la cola?

#define QUEUE_TYPE uint16_t
// ¿De qué tipo es el búfer?

#define QUEUE_TYPE_SIZE ( sizeof( QUEUE_TYPE ) )
// ¿Cuántos bytes ocupa una variable de tipo QUEUE_TYPE?


void producer_task( void* pvParameters )
{
   QUEUE_TYPE data = 10;

   while( 1 )
   {
      for( uint8_t i = 0; i < QUEUE_ELEMS; ++i ){
         if( xQueueSend( g_queue_handler, (void*) &data, pdMS_TO_TICKS( 100 ) ) != errQUEUE_FULL ){

            data += 10;

            vTaskDelay( pdMS_TO_TICKS( 10 ) );
            // simulamos un retardo entre escritura y escritura
         } 
         else{ // timeout:
            Serial.println( "TO(W)" );
         }
      }

      vTaskDelay( pdMS_TO_TICKS( 500 ) );
   }
}

void consumer_task( void* pvParameters )
{
   while( 1 )
   {
      QUEUE_TYPE data;

      while( xQueueReceive( g_queue_handler, &data, pdMS_TO_TICKS( 500 ) ) != errQUEUE_FULL ){
         Serial.println( data );
      }
   }
}

void setup()
{
   xTaskCreate( producer_task, "PROD", 128 * 3, NULL, tskIDLE_PRIORITY + 1, NULL );

   xTaskCreate( consumer_task, "CONS", 128 * 3, NULL, tskIDLE_PRIORITY, NULL );

   g_queue_handler = xQueueCreate( QUEUE_ELEMS, QUEUE_TYPE_SIZE );

   configASSERT( g_queue_handler );
   // Error creando a la cola

   pinMode( 13, OUTPUT );

   Serial.begin( 115200 );

   Serial.println( PROGRAM_NAME );

   vTaskStartScheduler();
}

void loop() {}

De este ejemplo nota que utilicé las constantes simbólicas que describí hace un momento, y que la variable que deseas copiar a la cola debe ser promocionada a void*, como también ya lo había mencionado:

xQueueSend( g_queue_handler, (void*) &data, pdMS_TO_TICKS( 100 ) )

Así mismo, antes de salir de la función setup() he agregado una función que imprime el nombre del programa. Esto no es necesario, pero como para este curso he creado muchos programas, nunca estoy seguro cuál es el último que subí a mis tarjetas.

La salida de este programa es:

Ejemplo 2: Creación estática de una cola de tipo simple

En este ejemplo vamos a crear una cola de un tipo básico de manera estática. Además, dos tareas van a escribir en ella, y una tarea leerá de ella. Una tarea productora escribirá valores múltiplo de 10, mientras que la otra escribirá valores múltiplo de 7, y cuando llegue al valor arbitrario 350 (no tiene nada de especial), volverá a 0. Esto lo hice así para distinguir en la salida cuál tarea productora escribió cuál valor.

En los retardos notarás valores raros que realmente son números primos. Los utilicé para que no hubiera relación entre las diferentes temporizaciones tratando de simular un ambiente real. Debido a los diferentes tiempos de escritura y lectura la cola se llenará y presentarán los mensajes: TO(P2), el cual significa timeout en el productor 2; o el mensaje TO(P1), el cual significa timeout en el productor 1 (en ambos casos significa que la cola estaba llena y no pudieron escribir en ella).

Finalmente agregué una constante, USER_DBG, que permite habilitar (en diferentes grados) o deshabilitar información de depuración. Por supuesto que esto no sería necesario en un programa real (¿o tal vez sí?) pero como estamos experimentando es interesante saber lo que sucede en el programa.

#define PROGRAM_NAME "queues_2.ino"

#include <FreeRTOS.h>
#include <task.h>
#include <queue.h>


QueueHandle_t g_queue_handler = NULL;


#define QUEUE_ELEMS ( 10 )
// ¿Cuántos elementos tendrá la cola?

#define QUEUE_TYPE uint16_t
// ¿De qué tipo es el búfer?

#define QUEUE_TYPE_SIZE ( sizeof( QUEUE_TYPE ) )
// ¿Cuántos bytes ocupa una variable de tipo QUEUE_TYPE?

#define QUEUE_BUFFER_SIZE ( ( QUEUE_ELEMS ) * ( QUEUE_TYPE_SIZE ) )
// ¿Cuántos bytes tiene el búfer subyacente?


#define USER_DBG 1
// Para habilitar/deshabilitar información de depuración:
// 0: Deshabilitada
// 1: Información básica
// 2: Más información, pero puede hacer que el sistema falle (debido a Serial.print())



void producer1_task( void* pvParameters )
{
   QUEUE_TYPE data = 10;

   while( 1 )
   {

#if USER_DBG > 1
      Serial.print( "TX1: " ); Serial.println( data );
#endif  

      if( xQueueSend( g_queue_handler, (void*) &data, pdMS_TO_TICKS( 100 ) ) == pdFALSE ){
         Serial.println( "TO(P1)" );
      } else{
         data += 10;
         if( data > 1000 ) data = 10;
      }

      vTaskDelay( pdMS_TO_TICKS( 439 ) );
   }
}

void producer2_task( void* pvParameters )
{
   QUEUE_TYPE data = 0;

   while( 1 )

   {
#if USER_DBG > 1
      Serial.print( "TX2: " ); Serial.println( data );
#endif  

      if( xQueueSend( g_queue_handler, (void*) &data, pdMS_TO_TICKS( 100 ) ) != errQUEUE_FULL ){

         data += 7;
         if( data > 350 ) data = 0;

      } 
      else{ //timeout:
         Serial.println( "TO(P2)" );
      }

      vTaskDelay( pdMS_TO_TICKS( 397 ) );
   }
}

void consumer_task( void* pvParameters )
{
   TickType_t last_wake_time = xTaskGetTickCount();

   while( 1 )
   {
      QUEUE_TYPE data;

      vTaskDelayUntil( &last_wake_time, pdMS_TO_TICKS( 311 ) );

      if( xQueueReceive( g_queue_handler, &data, pdMS_TO_TICKS( 509 ) ) == pdTRUE ){

#if USER_DBG > 0
         Serial.print( "RX: " ); Serial.println( data );
#endif  

#if USER_DBG > 1
         Serial.print( "SA: " ); Serial.println( uxQueueSpacesAvailable( g_queue_handler ) );
#endif  

      } else{ //timeout:
         Serial.println( "TO(R) " );
      }
   }
}

void setup()
{
   xTaskCreate( producer1_task, "PRD1", 128, NULL, tskIDLE_PRIORITY + 1, NULL );
   xTaskCreate( producer2_task, "PRD2", 128, NULL, tskIDLE_PRIORITY + 1, NULL );

   xTaskCreate( consumer_task, "CONS", 128, NULL, tskIDLE_PRIORITY, NULL );


   static uint8_t queue_buffer_array[ QUEUE_BUFFER_SIZE ];
   // búfer subyacente de la cola

   static StaticQueue_t queue_buffer_struct;
   // variable de estado de la cola

   g_queue_handler = xQueueCreateStatic( 
         QUEUE_ELEMS, 
         QUEUE_TYPE_SIZE,
         queue_buffer_array,
         &queue_buffer_struct );


   pinMode( 13, OUTPUT );

   Serial.begin( 115200 );

   Serial.println( PROGRAM_NAME );


   vTaskStartScheduler();
}

void loop() {}

La salida de este programa con USER_DBG puesta a 1 es:

Ejemplo 3: Creación estática de una cola de tipo compuesto

En ocasiones tendremos que transferir datos compuestos debido a que así está diseñado el sistema (no todo en la vida son tipos simples), o porque, como mencioné en la introducción, necesitamos saber quién escribió qué. En el ejemplo que dí sobre un controlador para una interfaz gráfica, el «controlador» debe conocer qué periférico generó el evento. Pero dado que el tipo de la cola es fijo debemos idear una manera de homogenizar la información; esto es, que cada periférico escriba información del mismo tipo.

El ejemplo que doy a continuación es una versión alterna y completa, pero simple, de un ejemplo que FreeRTOS ha incluído en su documentación oficial para encarar dicho problema.

Lo primero que vas a notar es la enumeración Event, la cual indica la fuente generó la información. Después viene el tipo compuesto, Data, que encapsula la fuente generadora y el dato generado:

typedef enum Event
{
   Sensor1,
   Sensor2,
} Event;

typedef struct Data
{
   Event    evt;
   uint16_t value;
} Data;

Luego cada una de las dos tareas que incluí en el ejemplo se identifican de manera permanente en el campo .evt de una variable data del tipo Data, dado que lo único que va a cambiar mientras el programa se esté ejecutando es el valor escrito en el campo .value de dicha variable (que hace las veces de búfer). Una tarea productora toma valores de temperatura, mientras que otra genera una secuencia numérica, y a diferencia de lo que hemos venido haciendo, ésta se puede bloquear (entrar al estado Blocked) por tiempo indefinido (solamente como ejemplo, en la vida real deberías evitar esta situación).

Finalmente, la tarea consumidora, que hace las veces de «controlador», implementa un switch sobre el campo .evt para determinar de dónde provienen los datos y proceder de manera correcta.

#define PROGRAM_NAME "queues_3.ino"

#include <FreeRTOS.h>
#include <task.h>
#include <queue.h>

typedef enum Event
{
   Sensor1,
   Sensor2,
} Event;

typedef struct Data
{
   Event    evt;
   uint16_t value;
} Data;

QueueHandle_t g_queue_handler = NULL;


#define QUEUE_ELEMS ( 5 )
// ¿Cuántos elementos tendrá la cola?

#define QUEUE_TYPE Data
// ¿De qué tipo es el búfer?

#define QUEUE_TYPE_SIZE ( sizeof( QUEUE_TYPE ) )
// ¿Cuántos bytes ocupa una variable de tipo QUEUE_TYPE?

#define QUEUE_BUFFER_SIZE ( ( QUEUE_ELEMS ) * ( QUEUE_TYPE_SIZE ) )
// ¿Cuántos bytes ocupa el búfer subyacente?


#define USER_DBG 1

void sensor1_task( void* pvParameters )
{
   QUEUE_TYPE data = { .evt = Sensor1, .value = 0 };

   while( 1 )
   {
      data.value = analogRead( A0 ) & 0x03FF;
      // sensor de temperatura LM35
      
      if( xQueueSend( g_queue_handler, (void*) &data, pdMS_TO_TICKS( 100 ) ) == errQUEUE_FULL ){
         Serial.println( "TO(S1)" );
      }

      vTaskDelay( pdMS_TO_TICKS( 457 ) );
   }
}

void sensor2_task( void* pvParameters )
{
   QUEUE_TYPE data = { .evt = Sensor2, .value = 0 };

   while( 1 )
   {
      xQueueSend( g_queue_handler, (void*) &data, portMAX_DELAY );

      data.value += 10;
      if( data.value > 500 ) data.value = 0;

      vTaskDelay( pdMS_TO_TICKS( 397 ) );
   }
}

void controller_task( void* pvParameters )
{
   TickType_t last_wake_time = xTaskGetTickCount();

   while( 1 )
   {
      QUEUE_TYPE data;

      vTaskDelayUntil( &last_wake_time, pdMS_TO_TICKS( 257 ) );

      if( xQueueReceive( g_queue_handler, &data, pdMS_TO_TICKS( 509 ) ) == pdTRUE ){

         switch( data.evt )
         {
         case Sensor1:
            Serial.print( "SENSOR1: " );
            Serial.println( data.value );
         break;

         case Sensor2:
            Serial.print( "SENSOR2: " );
            Serial.println( data.value );
         break;

         default:
            Serial.println( "ERR" );
         break;
         }

      } else{ //timeout:
         Serial.println( "TO(R) " );
      }
   }
}

void setup()
{
   xTaskCreate( sensor1_task, "SNS1", 128, NULL, tskIDLE_PRIORITY + 1, NULL );
   xTaskCreate( sensor2_task, "SNS2", 128, NULL, tskIDLE_PRIORITY + 1, NULL );

   xTaskCreate( controller_task, "CTRL", 128, NULL, tskIDLE_PRIORITY, NULL );


   static uint8_t       queue_buffer_array[ QUEUE_BUFFER_SIZE ];
   static StaticQueue_t queue_buffer_struct;

   g_queue_handler = xQueueCreateStatic( 
         QUEUE_ELEMS, 
         QUEUE_TYPE_SIZE,
         queue_buffer_array,
         &queue_buffer_struct );


   pinMode( 13, OUTPUT );

   Serial.begin( 115200 );

   Serial.println( PROGRAM_NAME );


   vTaskStartScheduler();
}

void loop() {}

La salida de este programa es:

En el ejemplo anterior escribí el valor en bruto entregado por el sensor de temperatura LM35, que al momento de ejecutar el programa reportaba valores alrededor de 52-53. Pero si quisieras ver el valor correspondiente a la temperatura podrías modificar el case de Sensor1:

case Sensor1:
  Serial.print( "SENSOR1: " );
  Serial.println( (float) (data.value*500.0/1024.0) );
break;

(En esta entrada mi blog alternativo describí las diferentes partes de la fórmula, por si te interesa conocerlas.)

La salida del programa ya con la temperatura en grados centígrados es:

Ejemplo 4: Creación estática de una cola de tipo compuesto. El handler es pasado en el parámetro pvParameters.

NOTA: La siguiente solución no funciona cuando uno de los productores o uno de los consumidores es una interrupción, ya que la única forma de que ésta tenga acceso al handler es haciéndolo global.

En todos los ejemplos vistos el handler a la cola fue declarado como global y cualquier tarea podría escribir hacia o leer desde ella. En muchas situaciones esto quizás no sea importante, pero en otras sería mejor restringir qué tareas tienen acceso a la cola.

Ya en la lección sobre Semáforos Mutex exploramos cómo limitar el acceso a un monitor solamente por las tareas interesadas. Aquella vez usamos, y hoy lo volveremos a hacer, el parámetro pvParameters (que cada tarea incluye) para pasarle el handler. De esta manera solamente las tareas involucradas podrán accesar a la cola. Así mismo, el handler a la misma deja de ser global (entre menos globales mejor) ya que la declararemos static en la función setup().

En principio esta modificación es sencilla, pero el demonio está en los detalles, y como siempre sucede en C/C++, el demonio se presenta en forma de apuntador. Voy a diseccionar el código para intentar explicar el procedimiento; al final de la sección estará el programa completo. ¡No parpadees!

En la función setup() declaramos como static el handler a la cola. Luego le pasamos a las funciones de creación de tareas la dirección del handler promocionada a void*. (El handler ya es un apuntador, así que estamos pasando la dirección del apuntador.)

void setup()
{
   static QueueHandle_t queue_handler;
   // este es el handler (internamente es un apuntador)

   xTaskCreate( 
      sensor1_task, 
      "SNS1", 
      128, 
      (void*) &queue_handler, // aquí pasamos la dirección del handler
      tskIDLE_PRIORITY + 1, 
      NULL );

   xTaskCreate( sensor2_task, "SNS2", 128, (void*) &queue_handler, tskIDLE_PRIORITY + 1, NULL );

   xTaskCreate( controller_task, "CTRL", 128, (void*) &queue_handler, tskIDLE_PRIORITY, NULL );

   // más código ...
}

Después, en cada tarea que deba recibir al handler de la cola debemos crear una variable que guarde la dirección del handler. Finalmente tenemos que desreferenciar dicha variable para accesar al handler propiamente dicho:

void sensor1_task( void* pvParameters )
{
   QueueHandle_t* queue = (QueueHandle_t*) pvParameters;
   // queue guarda la dirección del handler

   while( 1 )
   {
      if( xQueueSend( 
           *queue,        // desreferenciamos la variable queue para obtener el handler
           (void*) &data, 
           pdMS_TO_TICKS( 100 ) ) == pdFALSE ){
         
           // más código ...
      }
      // más código ...
   }
}

¡Wow, esos apuntadores que nos vuelven locos! A continuación te presento el programa completo:

#define PROGRAM_NAME "queues_4.ino"

#include <FreeRTOS.h>
#include <task.h>
#include <queue.h>

typedef enum Event
{
   Sensor1,
   Sensor2,
} Event;

typedef struct Data
{
   Event    evt;
   uint16_t value;
} Data;


#define QUEUE_ELEMS ( 5 )
// ¿Cuántos elementos tendrá la cola?

#define QUEUE_TYPE Data
// ¿De qué tipo es el búfer?

#define QUEUE_TYPE_SIZE ( sizeof( QUEUE_TYPE ) )
// ¿Cuántos bytes ocupa una variable de tipo QUEUE_TYPE?

#define QUEUE_BUFFER_SIZE ( ( QUEUE_ELEMS ) * ( QUEUE_TYPE_SIZE ) )
// ¿Cuántos bytes el búfer subyacente?


void sensor1_task( void* pvParameters )
{
   QueueHandle_t* queue = (QueueHandle_t*) pvParameters;

   QUEUE_TYPE data = { .evt = Sensor1, .value = 0 };

   while( 1 )
   {
      data.value = analogRead( A0 ) & 0x03FF;
      // sensor de temperatura LM35
      
      if( xQueueSend( *queue, (void*) &data, pdMS_TO_TICKS( 100 ) ) == pdFALSE ){
         Serial.println( "TO(S1)" );
      }

      vTaskDelay( pdMS_TO_TICKS( 457 ) );
   }
}

void sensor2_task( void* pvParameters )
{
   QueueHandle_t* queue = (QueueHandle_t*) pvParameters;

   QUEUE_TYPE data = { .evt = Sensor2, .value = 0 };

   while( 1 )
   {
      xQueueSend( *queue, (void*) &data, portMAX_DELAY );

      data.value += 10;
      if( data.value > 500 ) data.value = 0;

      vTaskDelay( pdMS_TO_TICKS( 397 ) );
   }
}

void controller_task( void* pvParameters )
{
   QueueHandle_t* queue = (QueueHandle_t*) pvParameters;

   TickType_t last_wake_time = xTaskGetTickCount();

   while( 1 )
   {
      vTaskDelayUntil( &last_wake_time, pdMS_TO_TICKS( 257 ) );

      QUEUE_TYPE peek;
      if( xQueuePeek( *queue, &peek, 0 ) == pdFALSE ){
         Serial.println( "NO EVENTS" );
      } else{
         Serial.print( "NEXT EVENT: " );
         Serial.println( peek.evt );
      }

      QUEUE_TYPE data;

      if( xQueueReceive( *queue, &data, pdMS_TO_TICKS( 509 ) ) == pdTRUE ){

         switch( data.evt )
         {
         case Sensor1:
            Serial.println( (float) ( data.value*500.0/1024.0) );
         break;

         case Sensor2:
            Serial.println( data.value );
         break;

         default:
            Serial.println( "ERR" );
         break;
         }

      } else{ //timeout:
         Serial.println( "TO(R) " );
      }
   }
}

void setup()
{
   static QueueHandle_t queue_handler;

   xTaskCreate( sensor1_task, "SNS1", 128, (void*) &queue_handler, tskIDLE_PRIORITY + 1, NULL );
   xTaskCreate( sensor2_task, "SNS2", 128, (void*) &queue_handler, tskIDLE_PRIORITY + 1, NULL );

   xTaskCreate( controller_task, "CTRL", 128, (void*) &queue_handler, tskIDLE_PRIORITY, NULL );


   static uint8_t       queue_buffer_array[ QUEUE_BUFFER_SIZE ];
   static StaticQueue_t queue_buffer_struct;

   queue_handler = xQueueCreateStatic( 
         QUEUE_ELEMS, 
         QUEUE_TYPE_SIZE,
         queue_buffer_array,
         &queue_buffer_struct );


   pinMode( 13, OUTPUT );

   Serial.begin( 115200 );

   Serial.println( PROGRAM_NAME );


   vTaskStartScheduler();
}

void loop() {}

La función controller_task() usa una operación muy importante de las colas que mencioné en la introducción, la operación xQueuePeek(), la cual no habíamos utilizado hasta ahora, cuya función es devolver la copia del elemento al frente de la cola, pero sin extraerlo. No era necesaria en el ejemplo, pero quise que vieras su utilización.

La salida del programa es:

Otras funciones

La API de colas es muy grande (la puedes revisar aquí), por lo que sólo te voy a platicar de algunas funciones que pudieran ser importantes.

Colas e interrupciones

Como ya es costumbre en FreeRTOS, debes tener mucho cuidado con las funciones que llames desde dentro de una interrupción. Cuando se dé el caso asegúrate que las funciones sean thread-safe, es decir, aquellas que terminan en FromISR. Las versiones thread-safe de las funciones que hemos vistos hasta el momento son:

BaseType_t xQueueSendFromISR(    QueueHandle_t xQueue, const void *pvItemToQueue, BaseType_t *pxHigherPriorityTaskWoken );
BaseType_t xQueueReceiveFromISR( QueueHandle_t xQueue, void *pvBuffer, BaseType_t *pxHigherPriorityTaskWoken );
BaseType_t xQueuePeekFromISR(    QueueHandle_t xQueue, void *pvBuffer );

Los primeros dos argumentos son idénticos a sus contrapartes; el último, pxHigherPriorityTaskWoken, indica si una tarea de mayor prioridad a la que está ejecutándose salió del estado Blocked cuando la respectiva función fue llamada y, en consecuencia, debe ser ejecutada a continuación.

Nota que ninguna de estas funciones incluye al parámetro de tiempo de espera, xTicksToWait, ya que no tiene sentido (y jamás debería suceder) que una tarea se bloquee dentro de una interrupción.

En esta lección podrás ver cómo se utiliza el parámetro pxHigherPriorityTaskWoken en una interrupción. No creo que tengas problemas en adaptar el ejemplo para usar colas.

Funciones para ver o alterar el estado de la cola

A veces es necesario preguntar por la cantidad de elementos que aún pueden ser insertados en la cola, o por la cantidad de elementos actualmente en la cola. FreeRTOS proporciona sendas funciones:

UBaseType_t uxQueueSpacesAvailable( QueueHandle_t xQueue );
// devuelve el número de elementos que aún pueden ser insertados

UBaseType_t uxQueueMessagesWaiting( QueueHandle_t xQueue );
// devuelve el número de elementos actualmente en la cola

UBaseType_t uxQueueMessagesWaitingFromISR( QueueHandle_t xQueue );
// es la versión thread-safe de uxQueueMessagesWaiting

IMPORTANTE: Nunca uses uxQueueMessagesWaiting() ni uxQueueSpacesAvailable() en situaciones como la siguiente, porque el programa no funcionará (ya le pasó al primo de un amigo. Aquí la discusión completa):

while( 1 )
{
  if( uxQueueMessagesWaiting( queue_handler ) > 0 ){
      // aquí consumes los datos
  } 
  // El programa llega aquí si no hay elementos en la cola, pero el proceso puede ser tan
  // rápido que no "preste" la CPU y el programa se quede colgado, dado que dentro de
  // uxQueueMessagesWaiting() las interrupciones se desactivan y reactivan (en ese orden)
}

En su lugar agrega un else que llame a una función bloqueante:

while( 1 )
{
  if( uxQueueMessagesWaiting( queue_handler ) > 0 ){
      // aquí consumes los datos
  } 
  else {
     vTaskDelay( 1 ); // o cualquier otra función que se bloquée
  }
}

Cuando necesites «limpiar» la cola, sin necesidad de destruirla y volverla a crear, puedes usar a la función xQueueReset:

BaseType_t xQueueReset( QueueHandle_t xQueue );
// siempre devuelve pdTRUE. Pone a su estado original a la cola

FreeRTOS incluye un par de funciones que deben ser usadas dentro de interrupciones para preguntar si una cola está llena o vacía:

BaseType_t xQueueIsQueueFullFromISR( const QueueHandle_t xQueue );
// devuelve pdTRUE si la cola está llena; o pdFALSE en caso contrario

BaseType_t xQueueIsQueueEmptyFromISR( const QueueHandle_t xQueue );
// devuelve pdTRUE si la cola está vacía; o pdFALSE en caso contrario

NOTA: De manera extraña no existen las contrapartes de estas funciones que se puedan usar en tareas regulares.

¿Qué sigue?

Hoy vimos el 4to mecanismo que FreeRTOS incluye para llevar a cabo la comunicación inter-tareas, la cual es una actividad fundamental en cualquier programa secuencial o concurrente.

Estudiamos las funciones más importantes de la API de las colas; sin embargo, siempre podrás estudiar por tu cuenta aquellas que no vimos.

RECUERDA: Utiliza las constantes que te di al principio de esta lección, en lugar de calcular por tu cuenta los diferentes tamaños de búferes.

RECUERDA: Siempre que te sea posible utiliza objetos estáticos.

RECUERDA: Si vas a llamar a una función de FreeRTOS desde una interrupción, verifica que ésta sea la versión thread safe, es decir, tenga terminación FromISR.

RECUERDA: Las colas se utilizan cuando en la transferencia de datos intervienen dos o más productores, o dos o más consumidores. Si la transferencia se va a llevar uno-a-uno, entonces podrías usar las versiones «ligeras», como los mensajes.

Índice del curso

Espero que esta entrada haya sido de tu interés. Si fue así podrías suscribirte a mi blog, o escríbeme a fjrg76 dot hotmail dot com, o comparte esta entrada con alguien que consideres que puede serle de ayuda.


Si encuentras este blog interesante, entonces podrías considerar suscribirte a él y recibir información relevante sobre tecnología y sistemas embebidos, y de vez en cuando, uno que otro regalo.

Semáforos Mutex

Nuestros sistemas embebidos tienen un conjunto limitado de recursos, ya sean internos (UART, SPI, I2C, USB, ADC, DAC, áreas de memoria (búfers), etc), o externos (LCD, teclado, etc). En programas secuenciales (mono-tarea o de tarea única) prácticamente no existe ningún peligro de que una función que escribe en la UART sea interrumpido por otro; lo normal es que una función escriba (o lea) de la UART de principio a fin. Después, otra función, que en la lógica del programa va después, podrá utilizar a la UART de manera segura.

Sin embargo, la programación concurrente o multi-tarea trae consigo una serie de problemas de los cuales debemos estar conscientes y conocer las diferentes técnicas que existen para resolverlos. Uno de estos problemas tiene que ver con los recursos compartidos. Un recurso compartido es un recurso (como los mencionados) que puede ser utilizado por diversas partes del programa y es en lo que vamos a concentrarnos en esta lección.

Otros problemas asociados con la programación concurrente son: deadlocks (bloqueo mutuo), starvation (inanición), y priority inversion (inversión de prioridad). Ninguno es bueno.

Un problema que se presenta en los programas concurrentes es la competencia entre tareas por un mismo recurso compartido (race condition, en inglés, y condición de carrera, en español). Esto es, mientras una tarea de baja prioridad está utilizando un recurso (la impresión a través de la UART, por ejemplo) el sistema operativo decide que es momento de darle la CPU a una tarea de más alta prioridad, y esta otra tarea también utiliza a la UART imprimiendo sus propios mensajes. Cuando termina le devuelve la CPU a la tarea que había interrumpido para que continúe. La salida de un programa de esta naturaleza se ve así:

La tarea de baja prioridad imprime el texto: «abcd...789«; mientras que la tarea de alta prioridad imprime: «HOLA ... mundo«. Nota la mezcla de los textos. Lo mismo podría suceder con una pantalla LCD (ya me pasó), o con las conversiones del ADC, o con la transmisión de datos por SPI o I2C.

La impresión de texto mezclado puede llegar a ser hasta curiosa; sin embargo, el problema podría ser más grave. Por ejemplo, el convertidor ADC del ATmega328 corrompe las lecturas si a la mitad de una conversión inicias otra (ya me pasó, también). El ADC es un sólo módulo con 6 canales. Si tienes dos o más tareas realizando lecturas independientes en dos o más canales diferentes, entonces corres el peligro de obtener lecturas erróneas, y por lo tanto, nada fiables. Una solución es tener un concentrador para el ADC (le decimos «proxy» en el argót), o utilizar un mutex, lo cual, además de ser más fácil, es el tema de la lección de hoy.

A modo de anécdota te platico sobre uno de mis diseños, la tarjeta tipo PLC UB-PLR328. Cuando la estaba programando enfrenté el siguiente reto, al cual quizás tú también ya te enfrentaste: utilizar I2C para controlar diversos dispositivos. En particular la UB-PLR328 utiliza el canal I2C para 3 cosas: leer la hora de un reloj/calendario MCP79410, escribir a un LCD vía un PCF8574, y leer de un teclado vía otro PCF8574 (aquí puedes ver un video donde muestro los 3 elementos trabajando). Si suponemos que el acceso a cada uno de estos dispositivos es secuencial en un programa mono-tarea, entonces no tendremos preocupaciones mayores, quizás un poco con la lectura y decodificación del teclado. Sin embargo, en un programa multi-tarea sí que debemos preocuparnos y extremar precauciones, ¿qué tal si mientras una tarea quiere leer la hora, otra tarea quiere decodificar al teclado, y otra escribir al LCD? Continúa leyendo para que veas cómo podemos resolver esta situación utilizando mutex‘es.

Una forma de atacar este problema es con la exclusión mutua: evitar que una tarea accese a un recurso compartido mientras éste está siendo utilizado. Para implementar la exclusión mutua usamos semáforos mutex (le estaré diciendo «el mutex» a partir de ahora). El nombre mutex viene del inglés MUTual EXclusion. Los mutex son semáforos binarios con un par de características interesantes que ya veremos más adelante. El funcionamiento de la exclusión mutua es que mientras una tarea posea al mutex ninguna otra tarea podrá accesar al recurso compartido. Una vez que la tarea que tenía al mutex lo devuelve, entonces la otra tarea que esperaba por él lo obtendrá y podrá accesar al recurso. Hoy vamos a estudiar los mutex y las funciones de semáforos de FreeRTOS.

Además, y como agradecimiento a tu apoyo para la realización de este curso, vamos a estudiar brevemente a los monitores, los cuales son una versión de alto nivel para implementar la exclusión mutua. Veremos sus características y su implementación como clases de C++.

Tabla de contenidos

¿Qué es y qué necesito?

Exclusión mutua y semáforos mutex

La exclusión mutua se implementa a partir de identificar las secciones críticas del programa; esto es, encontrar los lugares donde, para el problema que nos concierne, están ubicados los accesos al recurso compartido y convertirlas en operaciones atómicas (operaciones que no pueden ser interrumpidas hasta terminar). Para el ejemplo de la UART (y el servicio asociado Serial.print() de Arduino) el pseudocódigo de una sección crítica se vería así:

Tarea()
{
   While( 1 )
   {
      // código ...

      critical_section_enter()       // entramos a la sección crítica
      Serial.println( "Hola mundo" ) // usamos al recurso como una operación atómica
      critical_section_exit()        // salimos de la sección crítica

      // ... más código
   }
}

La entrada y salida de una sección crítica puede ser implementada de diversas formas, incluyendo desactivar las interrupciones mientras el recurso es utilizado. Afortunadamente nosotros no debemos hacer eso; en su lugar utilizaremos semáforos mutex.

Un semáforo mutex es un semáforo binario con dos características especiales:

  1. El mutex puede heredar prioridades de manera temporal. Digamos que una tarea de baja prioridad, TL, posee al mutex, y mientras esto sucede una tarea de mayor prioridad, TH, desea adquirlo. Bajo estas condiciones el sistema operativo eleva la prioridad de TL al mismo nivel que TH de manera temporal. De esta forma se asegura hasta cierto punto, que TL devolverá el mutex lo más pronto posible para que TH lo utilice. Cuando TL devuelve el mutex su prioridad es restablecida. Los semáforos binarios no tienen esta característica.
  2. El mutex debe ser devuelto. La tarea que posee al mutex debe devolverlo tan pronto como termine su trabajo para que otras tareas lo puedan obtener, y en consecuencia, accesar al recurso. No es un requisito que los semáforos deban ser devueltos, de hecho, casi nunca se hace.

RECUERDA: Mientras un semáforo binario se utiliza para sincronización y señalización (por ejemplo, una ISR lo da de manera constante, mientras que una tarea lo obtiene también de manera constante), los mutex’es se utilizan para proteger recursos compartidos de ser accesados por dos o más tareas al mismo tiempo.

El pseudocódigo anterior, utilizando mutex’es, nos quedaría así:

Tarea()
{
   While( 1 )
   {
      // código ...

      take( mutex )                  // obtiene al mutex
      Serial.println( "Hola mundo" ) // usa al recurso
      give( mutex )                  // devuelve al mutex
      
      // ... más código
   }
}

Si el mutex no estuviera disponible cuando se hace la llamada take(), entonces el sistema operativo manda a la tarea al estado Blocked y la inserta en una cola de espera. Cuando el recurso es liberado a través de la llamada a give(), entonces el sistema operativo saca de la cola de espera a la tarea que más tiempo ha estado esperando por el recurso.

Un poco más adelante veremos las funciones de FreeRTOS para semáforos mutex. A continuación abordaré brevemente el tema de los monitores, los cuales harán a nuestros programas más seguros.

Monitores

Un monitor es un tipo abstracto que encapsula al recurso y al mutex, y expone las únicas operaciones que pueden ser realizadas sobre el recurso.

En la descripción dada sobre mutex’es, y en general, sobre semáforos, notarás que estos no tienen dueño ni control sobre quién lo da y quién lo obtiene. Este es un problema «normal» con los semáforos: cualquier tarea podría obtenerlo y no devolverlo, o utilizarlo sobre un recurso diferente para el que fue creado.

Una forma de enfrentar estos problemas es encapsulando en una misma entidad (tipo abstracto, clase) al mutex junto con el recurso, y exponer un conjunto básico de operaciones que serán las únicas que podrán accesar al recurso. De esta forma sería muy difícil utilizar de manera incorrecta al recurso, o aplicarle el mutex a un recurso diferente.

En los ejemplos de mutex’es que veremos más adelante el recurso a proteger es la impresión serial a través de la función Serial.print(), y podremos usar a esta función de todas las maneras posibles que necesitemos. Pero cuando implementemos al monitor solamente dos operaciones estarán disponibles: escribir una cadena de texto completa, y escribir una cadena de texto completa caracter por caracter. El cliente del monitor no puede hacer nada más, dado que esas son las únicas operaciones expuestas, y siempre operarán sobre el mismo recurso. Por otro lado, podría ser que el cliente ni siquiera se entere que dentro del monitor existe un mutex (ventajas de los tipos abstractos); o podría suceder que el cliente vea al monitor como si fuese el propio recurso (esto sería lo más deseable). Aunque más adelante veremos a detalle la implementación del monitor, te quiero presentar las únicas dos operaciones que decidí sobre el recurso serial:

class Monitor
{
private:

   // declaración del mutex y el recurso
   
public:

   // aquí va el constructor

   // estas son las únicas operaciones permitidas sobre el recurso:

   bool PrintAll(      const char* text );
   bool PrintOneByOne( const char* text, uint8_t len );
};

Mutex’es de FreeRTOS

Para usar mutex’es necesitas utilizar las primitivas de FreeRTOS de semáforos. En lecciones anteriores vimos cómo implementar semáforos binarios y semáforos contadores con un mecanismo nuevo y de bajo consumo de recursos de FreeRTOS: las notificaciones directas a la tarea. Sin embargo, por las características especiales ya mencionadas de los mutex’es sólo hay una forma de implementación en FreeRTOS y es utilizando su API primitiva de semáforos. Para ello tienes que activar esta funcionalidad en el archivo FreeRTOSConfig.h:

#define configUSE_MUTEXES 1

Y también tienes que incluir en tus códigos fuente el archivo de encabezado #include <semphr.h>. Como dato interesante debes tener en cuenta que las primitivas de semáforos de FreeRTOS utilizan colas, por lo cual el tamaño de los programas puede crecer un poco.

Desarrollo

Para usar mutex’es debes crearlos, y como siempre, hay dos formas de hacerlo, de manera dinámica y de manera estática. Si no recuerdas la diferencia entre una y otra, sus ventajas y desventajas, entonces puedes revisar esta lección y esta otra. Comencemos con la primera ya que es la más fácil:

Creación dinámica de mutex’es

Esta función no tiene argumentos.

Valor devuelto: Handler al semáforo recién creado. Antes de usarlo deberías verificar que realmente se creó. (NOTA: Aunque las funciones para manipular los mutex son las mismas que las de los semáforos binarios y contadores de la API primitiva de semáforos, cuida de no mezclarlos.)

Para crear un mutex dinámico puedes hacer lo siguiente:

#include <FreeRTOS.h>
#include <task.h>
#include <semphr.h>

SemaphoreHandle_t g_mutex = NULL;
// debe ser global

void setup()
{
   // código ...

   g_mutex = xSemaphoreCreateMutex();

   configASSERT( g_mutex );
   // error creando al mutex

   // ... más código
}

Creación estática de mutex’es

pxMutexBuffer. Puntero a una variable StaticSemaphore_t que mantendrá el estado del semáforo. Esta variable deberá existir durante toda la vida del programa, por lo cual deberás crearla de manera global o estática a la función donde fue llamada xSemaphoreCreateMutexStatic().

Valor devuelto: Handler al semáforo recién creado. Los objetos estáticos siempre son creados. (NOTA: Aunque las funciones para manipular los mutex son las mismas que las de los semáforos binarios y contadores de la API primitiva de semáforos, cuida de no mezclarlos.)

Para crear un mutex estático puedes hacer lo siguiente:

#include <FreeRTOS.h>
#include <task.h>
#include <semphr.h>

SemaphoreHandle_t g_mutex = NULL;
// debe ser global

void setup()
{
   // código ...

   static StaticSemaphore_t* mutex_buffer;
   // variable de estado del mutex
   
   g_mutex = xSemaphoreCreateMutexStatic( &mutex_buffer );

   // la creación estática nunca falla

   // ... más código
}

En este ejemplo, como en muchos otros donde creamos objetos estáticos, las variables de estado y los arreglos necesarios los hemos creado marcándolos como static. Gracias a esto la variable mutex_buffer existirá durante todo el programa, y al mismo tiempo estará oculta al resto de las funciones. Por supuesto que podrías declararla global (como con g_mutex), pero entre menos variables globales tengas, mejor. g_mutex debe ser global porque tiene que ser accesada por varias tareas; la variable de estado mutex_buffer, no. (En el Ejemplo 4 veremos una forma para que g_mutex deje de ser global, ya sea que uses monitores o no.)

Dando y obteniendo al mutex

Una vez que el mutex ha sido creado lo podemos utilizar con las funciones para darlo y obtenerlo, xSemaphoreGive() y xSemaphoreTake(), respectivamente.

xSemaphore. Handler del mutex.

xTicksToWait. Tiempo de espera, en ticks, para obtener el mutex. Para convertir milisegundos a ticks puedes utilizar la macro pdMS_TO_TICKS(). Para no esperar puedes escribir 0; para un tiempo de espera infinito escribe portMAX_DELAY (asegúrate que la constante INCLUDE_vTaskSuspend, en el archivo FreeRTOSConfig.h, está puesta a 1).

xSemaphore. Handler del mutex.

Valor devuelto. pdTRUE si el mutex fue devuelto; pdFALSE si el mutex no pudo ser devuelto. Esto podría suceder porque el mutex no fue obtenido de manera correcta en primer lugar.

Ejemplos

Vamos a ver 4 ejemplos. Los primeros dos tratan sobre la creación y uso de los mutex «en crudo» (uno con memoria dinámica y otro con memoria estática). El tercer ejemplo muestra la implementación simple, pero efectiva, de un monitor. El cuarto ejemplo es una versión mejorada y más segura del tercer ejemplo.

Ejemplo 1: Mutex dinámico

Veamos un ejemplo completo con mutex dinámico. El programa trata de lo siguiente: el recurso a proteger es la transmisión al puerto serial a través de la función Serial.print(). Hay dos tareas, una con prioridad más alta que la otra (a_task y another_task, respectivamente), y ambas imprimen una cadena larga. A la tarea con baja prioridad le puse un truco para que se tomara mucho tiempo en escribir y pudiera ser interrumpida a la mitad por la tarea con más prioridad.

Al programa le agregué un mecanismo para activar/desactivar el mutex. En un programa normal esto no es necesario, ni deseable, pero lo hice para que puedas ver los resultados con y sin mutex.

#include <FreeRTOS.h>
#include <task.h>
#include <semphr.h>


SemaphoreHandle_t g_mutex = NULL;
// debe ser global


#define MUTEX_EN 0
// Habilita/deshabilita al mutex
// 0: Deshabilitado
// 1: Habilitado

void a_task( void* pvParameters )
{
   char str[66] = "HOLA MUNDO hola mundo HOLA MUNDO hola mundo HOLA MUNDO hola mundo";

   while( 1 )
   {
      vTaskDelay( pdMS_TO_TICKS( 50 ) );

#if MUTEX_EN == 1
      if( xSemaphoreTake( g_mutex, pdMS_TO_TICKS( 100 ) ) != pdFALSE ){
      // toma el mutex; si no está disponible, entonces la tarea se va al estado Blocked
#endif
        
         Serial.println( str );

#if MUTEX_EN == 1
         xSemaphoreGive( g_mutex );
         // devuelve el mutex

      } else{ // timeover:
         Serial.println( "MTX(W)" );
      }
#endif
   }
}

void another_task( void* pvParameters )
{
   char str[63] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";

   while( 1 )
   {
      vTaskDelay( pdMS_TO_TICKS( 47 ) );

#if MUTEX_EN == 1 // Con mutex:

      if( xSemaphoreTake( g_mutex, pdMS_TO_TICKS( 100 ) ) != pdFALSE ){
      // toma el mutex; si no está disponible, entonces la tarea se va al estado Blocked

         for( uint8_t i = 0; i < 62; ++i ){
            Serial.print( str[ i ] );
            vTaskDelay( pdMS_TO_TICKS( 1 ) );
         }
         Serial.println( "" );


         xSemaphoreGive( g_mutex );
         // devuelve el mutex

      } else{ // timeover:
         Serial.println( "MTX(R)" );
      }

#else            // Sin mutex:

      for( uint8_t i = 0; i < 62; ++i ){
         Serial.print( str[ i ] );
         vTaskDelay( pdMS_TO_TICKS( 1 ) );
      }
      Serial.println( "" );

#endif
   }
}

void setup()
{
   xTaskCreate( a_task, "PROD", 128 * 3, NULL, tskIDLE_PRIORITY + 1, NULL );

   xTaskCreate( another_task, "CONS", 128 * 3, NULL, tskIDLE_PRIORITY, NULL );

   g_mutex = xSemaphoreCreateMutex();

   configASSERT( g_mutex );
   // error creando al mutex


   pinMode( 13, OUTPUT );

   Serial.begin( 115200 );

   Serial.println( "\n*** RESET ***" );

   vTaskStartScheduler();
}

void loop() {}

RECUERDA: Cuando la tarea termina con el recurso DEBE devolver al mutex. En las líneas 31 y 61 ambas tareas lo devuelven.

Podrás notar diferencias en la forma de activar/desactivar el mutex en cada una de las tareas. En la primera lo hice de tal modo que el código no se repitiera, mientras que en la segunda repetí el código. Lo hice así para que conozcas ambas formas; aunque en la segunda es más claro el uso del mutex, entre menos código copies y pegues, mejor.

La salida de este programa SIN mutex es así:

Programa sin mutex. La salidas de cada tarea se mezclan.

La salida de este programa CON mutex es así:

Programa con mutex. Cada tarea imprime correctamente.

Ejemplo 2: Mutex estático

Este ejemplo es casi idéntico al anterior, tanto que no lo voy a repetir, solamente voy a poner el código de la función setup() que es donde se crea al mutex estático. Todo lo demás se mantiene sin cambios:

void setup()
{
   xTaskCreate( a_task, "1T", 128 * 3, NULL, tskIDLE_PRIORITY + 1, NULL );

   xTaskCreate( another_task, "2T", 128 * 3, NULL, tskIDLE_PRIORITY, NULL );


   static StaticSemaphore_t mutex_buffer;
   // variable de estado del mutex
   
   g_mutex = xSemaphoreCreateMutexStatic( &mutex_buffer );

    // la creación estática nunca falla


   pinMode( 13, OUTPUT );

   Serial.begin( 115200 );

   Serial.println( "\n*** RESET ***" );

   vTaskStartScheduler();
}

Ejemplo 3: Monitor

En este ejemplo vamos a ver una versión simple, pero funcional y efectiva, de un monitor. El recurso a proteger sigue siendo la salida serial, y el monitor está implementado como un tipo abstracto a través de una clase de C++. Ambos, el mutex y el recurso deben ser variables miembro de la clase, sin embargo, para este ejemplo particular el recurso que estamos protegiendo está declarado a nivel global dentro de la arquitectura de Arduino, por lo cual no aparece declarado en la clase, pero en cualquier otro caso el recurso debería ser declarado como variable miembro de la clase que implementa al monitor, y de ser necesario, inicializarlo en el constructor:

class Monitor
{
private:
   SemaphoreHandle_t mutex{ NULL };

   // Resource resource;
   // aquí iría la declaración del recurso, pero Serial.print() es global al
   // sistema, así que sólo supondremos que lo hacemos

public:
   Monitor();

   bool PrintAll( const char* text );
   bool PrintOneByOne( const char* text, uint8_t len );
};

El monitor expone dos operaciones: PrintAll() y PrintOneByOne(). Estas son las únicas operaciones que se pueden realizar sobre el recurso. Además, por supuesto, está el constructor:

Monitor::Monitor()
{
   this->mutex = xSemaphoreCreateMutex();

   // deberíamos verificar que el mutex fue creado ... o mejor aún, crearlo de
   // manera estática

   // si el recurso necesita ser creado o inicializado, aquí lo haríamos
}

bool Monitor::PrintAll( const char* text )
{
   bool retVal{ false };

   if( xSemaphoreTake( this->mutex, pdMS_TO_TICKS( 100 ) ) != pdFALSE ){

      Serial.println( text );
      // recurso compartido: imprime el texto en una sola pasada

      xSemaphoreGive( this->mutex );

      retVal = true;
   } 

   return retVal;
}

bool Monitor::PrintOneByOne( const char* text, uint8_t len )
{
   bool retVal{ false };

   if( xSemaphoreTake( this->mutex, pdMS_TO_TICKS( 100 ) ) != pdFALSE ){

      // recurso compartido: imprime el texto caracter por caracter:
      for( uint8_t i = 0; i < len; ++i ){
         Serial.print( text[ i ] );
         vTaskDelay( pdMS_TO_TICKS( 1 ) );
      }
      Serial.println( "" );

      xSemaphoreGive( this->mutex );

      retVal = true;
   }

   return retVal;
}

Ambas operaciones devuelven un booleano para indicar si la operación se llevó a cabo (true), o si el tiempo expiró y el mutex no pudo ser adquirido (false). El tratamiento de esta última situación se maneja por fuera de la clase. En un sistema con más recursos las operaciones podrían lanzar una excepción (throw) y la tarea llamadora capturarla (catch) e intentar recuperarse.

Nota la simplicidad y seguridad para utilizar el recurso a través del monitor:

void a_task( void* pvParameters )
{
   char str[66] = "HOLA MUNDO hola mundo HOLA MUNDO hola mundo HOLA MUNDO hola mundo";

   while( 1 )
   {
      vTaskDelay( pdMS_TO_TICKS( 50 ) );

      if( monitor.PrintAll( str ) == false ){ // time over:
         Serial.println( "TO:1" );
      }
   }
}

A continuación tenemos el ejemplo completo:

#include <FreeRTOS.h>
#include <task.h>
#include <semphr.h>

/*
 * El mutex queda encapsula en la clase y el recurso sólo puede ser utilizado
 * por las operaciones definidas
 */
class Monitor
{
private:
   SemaphoreHandle_t mutex{ NULL };

   // aquí iría la declaración del recurso, pero Serial.print() es global al
   // sistema, así que sólo supondremos que lo hacemos

public:
   Monitor();

   bool PrintAll( const char* text );
   bool PrintOneByOne( const char* text, uint8_t len );
};

Monitor::Monitor()
{
   this->mutex = xSemaphoreCreateMutex();

   // deberíamos verificar que el mutex fue creado ... o mejor aún, crearlo de
   // manera estática

}

bool Monitor::PrintAll( const char* text )
{
   bool retVal{ false };

   if( xSemaphoreTake( this->mutex, pdMS_TO_TICKS( 100 ) ) != pdFALSE ){

      Serial.println( text );

      xSemaphoreGive( this->mutex );

      retVal = true;
   } 

   return retVal;
}

bool Monitor::PrintOneByOne( const char* text, uint8_t len )
{
   bool retVal{ false };

   if( xSemaphoreTake( this->mutex, pdMS_TO_TICKS( 100 ) ) != pdFALSE ){

      for( uint8_t i = 0; i < len; ++i ){
         Serial.print( text[ i ] );
         vTaskDelay( pdMS_TO_TICKS( 1 ) );
      }
      Serial.println( "" );

      xSemaphoreGive( this->mutex );

      retVal = true;
   }

   return retVal;
}

Monitor monitor;
// el monitor es global para que las tareas tengan acceso a él. Una manera más
// segura es crearlo en setup() (marcado como static) y pasárselo a las tareas
// que van a ocupar el recurso en su parámetro pvParameters


void a_task( void* pvParameters )
{
   char str[66] = "HOLA MUNDO hola mundo HOLA MUNDO hola mundo HOLA MUNDO hola mundo";

   while( 1 )
   {
      vTaskDelay( pdMS_TO_TICKS( 50 ) );

      if( monitor.PrintAll( str ) == false ){ // time over:
         Serial.println( "TO:1" );
      }
   }
}

void another_task( void* pvParameters )
{
   char str[63] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";

   while( 1 )
   {
      vTaskDelay( pdMS_TO_TICKS( 47 ) );

      if( monitor.PrintOneByOne( str, 62 ) == false ){ // time over:
         Serial.println( "TO:2" );
      }
   }
}

void setup()
{
   xTaskCreate( a_task, "1T", 128 * 3, NULL, tskIDLE_PRIORITY + 1, NULL );

   xTaskCreate( another_task, "2T", 128 * 3, NULL, tskIDLE_PRIORITY, NULL );

   pinMode( 13, OUTPUT );

   Serial.begin( 115200 );

   Serial.println( "\n*** RESET ***" );

   vTaskStartScheduler();
}

void loop() {}

Ejemplo 4: Monitor mejorado

El siguiente programa incluye algunas mejoras con respecto del Ejemplo 3:

  • El mutex es creado de manera estática con xSemaphoreCreateMutexStatic().
  • El monitor deja de ser global; ahora se declara como static en la función setup() y se le pasa a las tareas que van a utilizar al recurso compartido a través del argumento pvParameters.

Los objetos estáticos son mejores que los dinámicos porque siempre se crean, así que no hay necesidad de preocuparse por ello, ni ninguna razón para liarse con memoria dinámica. Por otro lado, en el Ejemplo 3 el monitor puede ser utilizado por cualquier tarea; en la versión que estoy a punto de mostrarte solamente las tareas que reciban al monitor podrán utilizarlo. Con esto agregamos un escalón más en la seguridad y fiabilidad de nuestros programas.

Como ya hemos visto todo lo que necesitamos para implementar al monitor en la forma que he descrito, vámonos directamente al código:

#include <FreeRTOS.h>
#include <task.h>
#include <semphr.h>

/*
 * El mutex queda encapsula en la clase y el recurso sólo puede ser utilizado
 * por las operaciones definidas
 */
class Monitor
{
private:
   SemaphoreHandle_t mutex{ NULL };

   // aquí iría la declaración del recurso, pero Serial.print() es global al
   // sistema, así que sólo supondremos que lo hacemos

   StaticSemaphore_t mutex_buffer;
   // variable de estado del mutex. Se usa únicamente en la función
   // xSemaphoreCreateMutexStatic()

public:
   Monitor();

   bool PrintAll( const char* text );
   bool PrintOneByOne( const char* text, uint8_t len );
};

Monitor::Monitor()
{
   this->mutex = xSemaphoreCreateMutexStatic( &this->mutex_buffer );
   // los objetos estáticos siempre se crean
}

bool Monitor::PrintAll( const char* text )
{
   bool retVal{ false };

   if( xSemaphoreTake( this->mutex, pdMS_TO_TICKS( 100 ) ) != pdFALSE ){

      Serial.println( text );

      xSemaphoreGive( this->mutex );

      retVal = true;
   } 

   return retVal;
}

bool Monitor::PrintOneByOne( const char* text, uint8_t len )
{
   bool retVal{ false };

   if( xSemaphoreTake( this->mutex, pdMS_TO_TICKS( 100 ) ) != pdFALSE ){

      for( uint8_t i = 0; i < len; ++i ){
         Serial.print( text[ i ] );
         vTaskDelay( pdMS_TO_TICKS( 1 ) );
      }
      Serial.println( "" );


      xSemaphoreGive( this->mutex );

      retVal = true;
   }

   return retVal;
}

void a_task( void* pvParameters )
{
   Monitor* monitor = (Monitor*) pvParameters;
   // aceptamos al monitor

   char str[66] = "HOLA MUNDO hola mundo HOLA MUNDO hola mundo HOLA MUNDO hola mundo";

   while( 1 )
   {
      vTaskDelay( pdMS_TO_TICKS( 50 ) );

      if( monitor->PrintAll( str ) == false ){ // time over:
         Serial.println( "TO:1" );
      }
   }
}

void another_task( void* pvParameters )
{
   Monitor* monitor = (Monitor*) pvParameters;
   // aceptamos al monitor

   char str[63] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";

   while( 1 )
   {
      vTaskDelay( pdMS_TO_TICKS( 47 ) );

      if( monitor->PrintOneByOne( str, 62 ) == false ){ // time over:
         Serial.println( "TO:2" );
      }
   }
}

void setup()
{
   static Monitor monitor;
   // el monitor es local a setup(), pero 'static' logra que exista durante todo el programa, 
   // y al mismo tiempo nadie fuera de esta función lo verá

   xTaskCreate( a_task, "1T", 128 * 3, (void*) &monitor, tskIDLE_PRIORITY + 1, NULL );
   // solamente las tareas que reciban al monitor podrán utilizarlo

   xTaskCreate( another_task, "2T", 128 * 3, (void*) &monitor, tskIDLE_PRIORITY, NULL );
   // solamente las tareas que reciban al monitor podrán utilizarlo

   pinMode( 13, OUTPUT );

   Serial.begin( 115200 );

   Serial.println( "\n*** RESET ***" );

   vTaskStartScheduler();
}

void loop() {}

NOTA: El mecanismo anterior funciona de maravilla cuando son tareas las que están involucradas con el recurso; pero si se tratase de una interrupción quien fuera el productor o consumidor, entonces esta solución (hacer static al monitor) no funciona, ya que no existe modo de pasárselo a la interrupción. En tal caso el monitor debe ser global. Sin embargo, como verás a continuación, FreeRTOS recomienda no usar mutex’es dentro de interrupciones.

Otras funciones

Mutex’es e interrupciones

A pesar de que la API primitiva de semáforos incluye funciones espejo para ser utilizadas desde dentro de interrupciones (ISR’s) FreeRTOS recomienda no utilizar mutex’es dentro de ISR’s. Esto debido a dos cosas: la posible elevación temporal de la prioridad de la tarea que posee al mutex sólo tiene sentido a nivel de tarea, y una ISR no debería bloquearse esperando obtener al mutex. Por esta razón no mencionaré dichas funciones espejo.

Verificando si el mutex está disponible o no

En los ejemplos vistos hemos tratado de obtener directamente al mutex, y de no estar disponible, entonces la tarea se va al estado Blocked. Sin embargo, en algunas situaciones primero querremos saber si está disponible, y en caso negativo continuar sin tomarlo evitando que la tarea se bloquée. Para preguntar si el mutex está disponible puedes utilizar la función uxSemaphoreGetCount():

xSemaphore. Handler del mutex.

Valor devuelto. 1 si el mutex está disponible, y 0 si no lo está.

Preguntando cuál tarea posee al mutex

La función uxSemaphoreGetCount() te dice si el mutex está disponible o no, pero no te dice quién lo tiene; la función xSemaphoreGetMutexHolder() te indica cuál tarea lo posee. El valor devuelto por esta función está un poco enredado, así que intentaré explicarlo lo más simple posible:

xMutex. Handler del mutex del cual estamos preguntando.

Valor devuelto. Esta función puede devolver dos valores:
1. El handler de la tarea que lo posee, si es que alguna tarea lo posee, es decir, no está disponible.
2. NULL en cualquiera de los siguientes dos casos:
2.1 El semáforo indicado en xMutex no es un semáforo mutex, o
2.2 El mutex está disponible, y por lo tanto, ninguna tarea lo posee.

Suponiendo que no has mezclado diferentes tipos de semáforos, entonces bastaría con asumir que el valor NULL significa que ninguna tarea lo posee.

¿Qué sigue?

Hoy mencionamos los diversos problemas que enfrentamos cuando programamos de manera concurrente, y el problema que atacamos fue el de las condiciones de carrera, es decir del acceso por dos o más tareas a un mismo recurso compartido al mismo tiempo. Vimos que una solución es tratar al recurso como una sección crítica y limitar el acceso esta sección con un semáforo de exclusión mutua, o mutex.

Así mismo, llevamos el concepto de mutex un paso adelante con los monitores. Un monitor encapsula al mutex y al recurso compartido, y solamente permite que el cliente accese a éste a través de un conjunto mínimo de operaciones, evitando que se utilice mal o sobre un recurso diferente.

Índice del curso

Espero que esta entrada haya sido de tu interés. Si fue así podrías suscribirte a mi blog, o escríbeme a fjrg76 dot hotmail dot com, o comparte esta entrada con alguien que consideres que puede serle de ayuda.


Si encuentras este blog interesante, entonces podrías considerar suscribirte a él y recibir información relevante sobre tecnología y sistemas embebidos, y de vez en cuando, uno que otro regalo.

Comunicación inter-tareas (II): Mensajes (message buffers)

En la lección anterior estudiamos un mecanismo para pasar secuencias de bytes entre tareas, o de una interrupción a una tarea. Prácticamente todos los protocolos de comunicaciones son orientados al byte, lo que pone de manifiesto la importancia de este tipos de xxx.

Sin embargo, en casi todas nuestras aplicaciones vamos a necesitar transferir información de datos de tipos simples (int, float, etc) y de datos compuestos (struct), ya sea en forma de una variable o de un arreglo. Para estas ocasiones FreeRTOS incluyó los mensajes, cuya filosofía es la misma que la de los flujos, con la diferencia de que podrás transferir información de cualquier tipo, y no sólo bytes (por byte debemos entender: char, unsigned char, uint8_t, int8_t).

En la lección del día de hoy vamos a estudiar los mensajes explicando sus detalles y viendo 4 ejemplos representativos de su utilización.

Tabla de contenidos

¿Qué es y qué necesito?

Los mensajes (message buffers, en FreeRTOS) son un mecanismo que está basado en los flujos y que sirve para transferir información compleja (más de un byte), ya sea de tipos básicos del lenguaje C o de tipos compuestos.

Aunque similiares, los mensajes tienen diferencias importantes con respecto a los flujos. Una de éstas, y la más importante, es con respecto a la integridad de la información. Con los flujos puedes leer los bytes que tú quieras, pero esto no funciona con tipos diferentes a los bytes. Imagina que quieres usar un flujo para transmitir un entero de 32 bits (el cual requiere 4 bytes), pero a la hora de leer la información lees nada más 3 bytes. Esto es claramente un error que debemos evitar (pero que nos podría pasar), y los mensajes ya lo toman en cuenta; de hecho, en los mensajes no existe el punto de disparo (trigger point).

En uno de los ejemplos de la lección anterior la tarea consumidora escribía bytes individuales, y una vez que se alcanzaba un determinado número de ellos, la tarea consumidora era despertada y consumía todos. Bueno, esto no lo puedes hacer con los mensajes, afortunadamente. Esto es, si utilizando mensajes quieres recibir un arreglo de ints, entonces la tarea productora debe enviar un arreglo de ints con el mismo número de elementos. Si quieres transferir un dato de tipo float, entonces la tarea consumidora transmitiría exactamente 4 bytes y la tarea consumidora tomará exactamente los 4 bytes. Y esto es bueno.

Para que puedas utilizar los mensajes en tus programas asegúrate que el archivo stream_buffer.c sea parte del proceso de compilación. Este archivo ya está incluído cuando descargas FreeRTOS, y en consecuencia, en el proyecto Molcajete también está donde debe de estar, por lo que no debes procuparte por ello. Así mismo, en cada archivo fuente donde uses a los mensajes deberás agregar el encabezado #include <message_buffer.h>.

(Los mensajes están basados en los flujos, por lo que no existe un archivo message_buffer.c, pero lo que sí existe es el archivo de encabezado message_buffer.h.)

También es necesario que la constante INCLUDE_xTaskGetCurrentTaskHandle esté puesta a 1 (en el archivo de configuración FreeRTOSConfig.h):

#define INCLUDE_xTaskGetCurrentTaskHandle 1

Antes de ver las funciones de la API de los mensajes debes tener en cuenta en todo momento la integridad de la información, y para esto FreeRTOS impone unas reglas a los tamaños de los búferes de datos que vamos a utilizar; si estableces un búfer al que le falte un byte de lo que FreeRTOS espera, entonces no habrá transferencia de datos. Y eso es bueno. Los tamaños de los búferes se obtienen a partir de fórmulas simples, pero que hay que seguir al pie de la letra, ya que tienen que ver con la cantidad de elementos que quieres escribir/leer y con el tamaño en bytes de cada elemento. Si los calculas más pequeños de lo que tienen que ser, entonces no habrá transferencia de datos.

Para facilitar el uso de los mensajes en nuestros programas escribí 5 constantes numéricas, las cuales vamos a utilizar en los ejemplos de esta lección. De éstas tú estableces las dos primeras ; las últimas 3 se calculan de manera automática.

#define MESSAGE_ELEMS ( 8 )

Es el número de elementos (no de bytes) que deseas escribir/leer. Esta constante la debes utilizar en ambas funciones xMessageBufferSend() y xMessageBufferReceive() (las cuales explicaré más adelante). El número de elementos puede ir desde 1.

#define MESSAGE_TYPE float

Aquí estableces el tipo de dato del o de los elementos que deseas transmitir. En este ejemplo se trata de variables float, pero pueden ser de tipo compuesto, como lo verás en los ejemplos.

#define MESSAGE_TYPE_SIZE ( sizeof( MESSAGE_TYPE ) )

Calcula el tamaño del arreglo interno subyacente al mensaje. Esta constante la utilizarás en las funciones de creación del mensaje. Más adelante explicaré la fórmula. No deberías manipularlo directamente.

#define MESSAGE_SIZE ( MESSAGE_ELEMS ) * ( MESSAGE_TYPE_SIZE )

Calcula el número de bytes que necesita una transferencia. No deberías manipularlo directamente.

#define MESSAGE_BUFFER_SIZE ( ( MESSAGE_ELEMS * MESSAGE_TYPE_SIZE ) + 4 )

Calcula el número de bytes que necesita un elemento de tipo MESSAGE_TYPE. No deberías manipularlo directamente.

La constante MESSAGE_BUFFER_SIZE requiere su propia explicación. El «manual» (es decir, la documentación oficial) dice que FreeRTOS agrega a la secuencia a transferir el número de bytes del mensaje. Esto es, si tu transferencia es de 100 bytes, entonces se transmitirán 104 bytes (ahorita explico el ‘4’). El problema es el siguiente: imagina que vas a transferir un arreglo de 5 elementos, donde cada elemento ocupa 20 bytes. Y si tú estableces que MESSAGE_BUFFER_SIZE sea de 100 bytes, entonces solamente podrás transferir, en una pasada, 4 de los 5 elementos, y siempre quedarán 16 bytes inutilizables. La solución es establecer MESSAGE_BUFFER_SIZE a 5*20+4=104 (bytes). Con esto aseguras transferir los 5 elementos en una pasada.

¿Y de dónde salió ese ‘4’? El «manual» dice que el número de bytes agregados, esos que guardan el tamaño del mensaje, deben ser equivalentes al número de bytes que ocuparía una variable del tipo size_t (éste es un alias para unsigned int), y la fórmula quedaría así:

#define MESSAGE_BUFFER_SIZE ( ( MESSAGE_ELEMS * MESSAGE_TYPE_SIZE ) + sizeof( size_t ) )

El propio manual explica que un valor común para size_t es de 32 bits, o 4 bytes, lo cual es cierto para la mayoría de sistemas, pero no para el compilador GCC-AVR utilizado por Arduino. En esta plataforma, por lo menos para el chip ATmega328, ¡size_t es de 2 bytes! Y por supuesto, aunque los programas compilarían, no funcionarían como deben (ya le pasó al primo de un amigo). Por lo tanto, en lugar de utilizar size_t en la fórmula me ví obligado a escribir el valor 4. En otras plataformas no debería existir ese problema.

Desarrollo

Los mensajes necesitan un búfer de datos interno, es decir, un lugar dónde guardar cada byte. Este búfer es un arreglo de elementos uint8_t de tamaño MESSAGE_BUFFER_SIZE cuando creas a los objetos. Cuando utilizas creación dinámica de objetos este arreglo se crea de manera automática dentro de la función xMessageBufferCreate(), por lo cual no debes preocuparte de nada (en apariencia. Siempre que te sea posible utiliza la creación estática de objetos, es más segura); mientras que si usas la creación estática de objetos, tú eres responsable de pasarle dicho arreglo a la función xMessageBufferCreateStatic(). A continuación veremos ambas formas.

(Para conocer o recordar la diferencia y la configuración entre objetos dinámicos y estáticos, te recomiendo leer esta lección y esta otra lección, respectivamente. Aunque ambas hablan sobre tareas, la teoría aplica para cualquier tipo de objeto.)

Creación dinámica

La función para crear mensajes dinámicos es:

xBufferSizeBytes. Es el número máximo de bytes que el búfer puede alojar. Para realizar transferencias completas en una sola pasada recuerda agregar size_t bytes extras, es decir, sizeof( size_t ) bytes. Puedes utilizar el valor MESSAGE_BUFFER_SIZE del que hablé hace un momento.

Valor devuelto. Si hubo memoria suficiente para crear al mensaje, entonces la función devuelve el handler; en caso contrario (no hubo memoria suficiente) devuelve NULL. Este handler lo debes de guardar, tanto para que el resto de funciones de flujos sepan sobre cuál van a operar, como para probar que efectivamente fue creado.

Nota que esta función, a diferencia de la función xStreamBufferCreate(), no incluye al parámetro para el punto de disparo, xTriggerLevelBytes.

Por ejemplo, si en tu programa quisieras transferir un arreglo de 8 valores float, entonces tendrías lo siguiente:

#include <FreeRTOS.h>
#include <task.h>
#include <message_buffer.h>

MessageBufferHandle_t g_message_buffer = NULL;
// el handler debe ser global

#define MESSAGE_ELEMS ( 8 )
#define MESSAGE_TYPE float
#define MESSAGE_TYPE_SIZE ( sizeof( MESSAGE_TYPE ) )

#define MESSAGE_BUFFER_SIZE ( ( MESSAGE_ELEMS * MESSAGE_TYPE_SIZE ) + sizeof( size_t ) )
// forma general que NO funciona para el ATmega328, pero en el ejemplo
// completo dado más adelante, ya lo arreglamos

#define MESSAGE_SIZE ( MESSAGE_ELEMS ) * ( MESSAGE_TYPE_SIZE )

void setup()
{
   // código ...

   g_message_buffer = xMessageBufferCreate( MESSAGE_BUFFER_SIZE );

   configASSERT( g_message_buffer );
   // Error creando al mensaje

   // más código ...
}

El handler, g_message_buffer, es global porque debe ser visible tanto en la tarea productora como en la consumidora.

La creación dinámica de objetos no asegura que los objetos se creen, principalmente por escasez de memoria RAM. Por eso siempre deberías preguntar si el objeto realmente fue creado, y aunque existen diversas maneras de hacerlo, en este ejemplo escogí la macro configASSERT() que es una versión propia de la conocida assert() de C (en el archivo FreeRTOSConfig.h podrás ver la implementación que utilicé para el projecto Molcajete). Por supuesto que puedes usar al condicional if y tratar de recuperarte sin terminar el programa de manera abrupta.

Creación estática

Ahora veamos cómo crear a un mensaje de manera estática. Como es usual, esta forma de creación de objetos necesita más pasos, pero es mucho más segura ya que los objetos siempre serán creados.

xBufferSizeBytes. Es el número máximo de bytes que el búfer puede alojar. Para realizar transferencias completas en una sola pasada recuerda agregar size_t bytes extras, es decir, sizeof( size_t ) bytes. Puedes utilizar el valor MESSAGE_BUFFER_SIZE del que hablé hace un momento.

pucMessageBufferStorageArea. Es el búfer interno del mensaje donde se guardarán los datos, y no es otra cosa que un arreglo de elementos de tipo uint8_t de tamaño al menos xBufferSizeBytes + 1. Este arreglo deberá existir a lo largo del programa, por lo cual deberás crearlo de manera global o estática a la función donde fue llamada xMessageBufferCreateStatic().

pxStaticMessageBuffer. Esta es la variable que guarda el estado del mensaje, es decir, la estructura de datos que maneja al mensaje (el mensaje es algo más que un simple arreglo). Al igual que el parámetro pucMessageBufferStorageArea, deberá existir a lo largo del programa.

Valor devuelto. El handler al mensaje. La creación estática de objetos nunca falla, a menos que el arreglo o la variable de estado sean NULL, cosa que tal vez nunca suceda.

#include <FreeRTOS.h>
#include <task.h>
#include <message_buffer.h>

MessageBufferHandle_t g_message_buffer = NULL;
// el handler debe ser global

#define MESSAGE_ELEMS ( 8 )
#define MESSAGE_TYPE float
#define MESSAGE_TYPE_SIZE ( sizeof( MESSAGE_TYPE ) )

#define MESSAGE_BUFFER_SIZE ( ( MESSAGE_ELEMS * MESSAGE_TYPE_SIZE ) + sizeof( size_t ) )
// forma general que NO funciona para el ATmega328, pero en el ejemplo
// completo dado más adelante, ya lo arreglamos

#define MESSAGE_SIZE ( MESSAGE_ELEMS ) * ( MESSAGE_TYPE_SIZE )

void setup()
{
   // código

   static uint8_t message_buffer_array[ MESSAGE_BUFFER_SIZE + 1 ];
   // arreglo subyacente para el búfer interno

   static StaticMessageBuffer_t message_buffer_struct;
   // variable de estado que mantiene a la estructura del mensaje

   g_message_buffer =              // handler (debe ser global)
   xMessageBufferCreateStatic( 
         MESSAGE_BUFFER_SIZE,      // tamaño del búfer interno
         message_buffer_array,     // arreglo subyacente que hace las veces de búfer interno
         &message_buffer_struct ); // variable de estado

   // los objetos estáticos siempre se crean, así que no hay necesidad de
   // preguntar si el mensaje fue creado o no

   // más código ...
}

El tamaño del arreglo subyacente (message_buffer_array[]), es decir, el búfer donde se guardan los datos, debe ser un byte más grande que el establecido en el argumento xBufferSizeBytes. En este ejemplo marqué a las variables message_buffer_array y message_buffer_struct como static; esto para que perduren durante todo el programa, pero que a la vez sean invisibles al resto de funciones. Una alternativa es que las declares globales, aunque entre menos variables globales tengas en tus programas, mejor. Sin embargo el handler, g_message_buffer, sí que debe ser global porque tanto la tarea productora como la consumidora deben tener acceso a él. Este es un ejemplo de las veces en que sí son necesarias las variables globales. El prefijo g_ nos recuerda que es una variable global y que debemos tener mucho cuidado con ella.

Escribiendo y leyendo al mensaje

Una vez que el mensaje ha sido creado ya podemos utilizarlo escribiendo en y leyendo desde él. Los datos que deseas transmitir se deben encontrar en un búfer de tu aplicación, y dependiendo de lo que desees transmitir este búfer puede ser un arreglo o una variable, ya sean tipos básicos o tipos compuestos. En los ejemplos que veremos utilizamos ambas para que veas cómo se utilizan.

La función para escribir en el mensaje es:

xMessageBuffer. Es el handler del mensaje en el que queremos escribir.

pvTxData. Es un apuntador al búfer que queremos transmitir. Dicho búfer lo deberás promocionar a void*. Es importante que recuerdes que los elementos de este búfer son copiados al mensaje.

xDataLengthBytes. Es la cantidad máxima de bytes que deseas escribir al mensaje en una sola llamada. El número lo obtienes de multiplicar el número de elementos por el tamaño en bytes que ocupa cada elemento. Puedes utilizar el valor MESSAGE_SIZE del que hablé hace un momento.

xTicksToWait. Si no hay espacio disponible para una nueva copia de xDataLengthBytes bytes en el mensaje, entonces la tarea productora (la que llama a esta función) entrará al estado Blocked por el tiempo determinado en este parámetro. Si el tiempo expira la tarea pasará al estado Ready. El tiempo de espera está en ticks y puedes usar la macro pdMS_TO_TICKS() para convertir milisegundos a ticks. Así mismo, si escribes 0 la función regresa inmediatamente, y si escribes portMAX_DELAY el tiempo de espera es infinito (siempre y cuando la constante simbólica INCLUDE_vTaskSuspend esté a 1).

Valor devuelto. Esta función devuelve el número de bytes escritos al mensaje. Para asegurarte de mantener la integridad de la información, siempre deberías verificar este valor. En los ejemplos te muestro cómo hacerlo.

La función para leer desde el mensaje es:

xMessageBuffer. Es el handler del mensaje del que queremos leer.

pvRxData. Es un apuntador al búfer en el que los datos leídos se van a guardar. Dicho búfer lo deberás promocionar a void*. Es importante que recuerdes que los elementos de este búfer son copiados desde el mensaje al búfer.

xBufferLengthBytes. Es la cantidad máxima de bytes que deseas leer desde el mensaje en una sola llamada. El número lo obtienes de multiplicar el número de elementos por el tamaño en bytes que ocupa cada elemento. Puedes utilizar el valor MESSAGE_SIZE del que hablé hace un momento.

xTicksToWait. Es el tiempo de espera máximo que la tarea consumidora estará en el estado Blocked mientras arriban bytes a partir de que el búfer estuviera vacío. El tiempo de espera está en ticks y puedes usar la macro pdMS_TO_TICKS() para convertir milisegundos a ticks. Así mismo, si escribes 0 la función regresa inmediatamente, y si escribes portMAX_DELAY el tiempo de espera es infinito (siempre y cuando la constante simbólica INCLUDE_vTaskSuspend esté a 1).

Valor devuelto. Esta función devuelve el número de bytes leídos desde el flujo. Para asegurarte de mantener la integridad de la información, siempre deberías verificar este valor. En los ejemplos te muestro cómo hacerlo.

Ejemplos

Dada la importancia de transferir información de tipos básicos y compuestos creí conveniente realizar 4 ejemplos que muestran las diferentes formas en que podemos usar a los mensajes en nuestros programas:

  1. Transfiere un arreglo de valores float. El mensaje se crea de forma dinámica.
  2. Transfiere un valor float. El mensaje se crea de forma estática.
  3. Transfiere un arreglo de variables compuestas. El mensaje se crea de forma estática.
  4. Transfiere una variable compuesta. El mensaje se crea de forma estática.

Para los ejemplos 3 y 4 tomé como plantilla al ejemplo 2, de ahí que los 3 ejemplos (2, 3 y 4) utilicé la creación estática de mensajes. Sin embargo, con lo visto en el ejemplo 1 podrás crear mensajes dinámicos sin ningún inconveniente.

Nota cómo, cuándo y dónde utilicé las constantes simbólicas que vimos hace un momento. Utilizarlas te facilitará la programación y te ahorrará los problemas que yo enfrenté. En particular las funciones xStreamBufferSend() y xStreamBufferReceive() utilizan al mismo valor MESSAGE_SIZE para indicar el número de bytes que se escriben y se leen.

Hace un rato mencioné que la fórmula general para obtener el tamaño del búfer interno del mensaje es:

#define MESSAGE_BUFFER_SIZE ( ( MESSAGE_ELEMS * MESSAGE_TYPE_SIZE ) + sizeof( size_t ) )

Pero para el caso del ATmega328, utilizando al compilador GCC-AVR, en todos los ejemplos la he modificado a:

#define MESSAGE_BUFFER_SIZE ( ( MESSAGE_ELEMS * MESSAGE_TYPE_SIZE ) + 4 )

Si tu compilador te entrega como resultado de sizeof( size_t ) el valor 4, entonces utiliza la fórmula general (para que no te involucres con magic numbers).

En las funciones de creación de las tareas notarás que utilicé como tamaño de pila el valor 128*3:

void setup()
{
   xTaskCreate( producer_task, "PROD", 128 * 3, NULL, tskIDLE_PRIORITY + 1, NULL );
   xTaskCreate( consumer_task, "CONS", 128 * 3, NULL, tskIDLE_PRIORITY, NULL );
   // ...
}

En lecciones anteriores estuve usando el valor 128, porque a los más declarábamos una o dos variables dentro de las tareas. Sin embargo, en esta lección (y en la anterior) hemos estado declarando arreglos de valores dentro de las tareas, y la memoria para estos arreglos es tomada de la pila de la tarea, por lo cual el tamaño de la pila debe considerarlos. Quizás exageré en un valor tan grande, pero tú siempre podrás calcularlo de acuerdo a las necesidades de tus aplicaciones.

En la lección anterior ya había advertido de lo miserable que se comportan las funciones Serial.print() y similares en programas concurrentes. Por supuesto que este triste comportamiento también aparece con los mensajes. Por eso notarás que en los ejemplos agregué información de depuración que puede ser habilitada o deshabilitada a tu gusto, estableciendo un valor conveniente en la constante USER_DBG:

#define USER_DBG 1
// Para habilitar/deshabilitar información de depuración:
// 0: Deshabilitada
// 1: Información básica
// 2: Más información, pero puede hacer que el sistema falle (debido a Serial.print())

Ejemplo 1: Transfiere un arreglo de valores float. El mensaje se crea de forma dinámica.

Este ejemplo muestra cómo transferir un arreglo de valores float. Puedes modificarlo para usarlo con valores enteros, int, o reales dobles, double.

Así mismo, y como ya había mencionado, deberías de verificar que el número de bytes transmitidos efectivamente es el mismo que tú especificaste:

if( bytes_sent != MESSAGE_SIZE ){ // timeout:
   Serial.print( "TO(W): " );
} 

Obviamente debes hacer la misma prueba del lado de la función consumidora:

if( received_bytes < MESSAGE_SIZE ) { // timeover:
   Serial.println( "TO(R)" );
} 

El ejemplo completo es el siguiente:

#include <FreeRTOS.h>
#include <task.h>
#include <message_buffer.h>


MessageBufferHandle_t g_message_buffer = NULL;


#define MESSAGE_ELEMS ( 8 )
// ¿Cuántos elementos tendrá el búfer de mensajes del USUARIO?

#define MESSAGE_TYPE float
// ¿De qué tipo es el búfer de mensajes?

#define MESSAGE_TYPE_SIZE ( sizeof( MESSAGE_TYPE ) )
// ¿Cuántos bytes ocupa una variable de tipo MESSAGE_TYPE?

#define MESSAGE_BUFFER_SIZE ( ( MESSAGE_ELEMS * MESSAGE_TYPE_SIZE ) + 4 )
// ¿Cuántos bytes ocupa el búfer INTERNO de mensajes?

#define MESSAGE_SIZE ( MESSAGE_ELEMS ) * ( MESSAGE_TYPE_SIZE )
// ¿Cuántos bytes ocupa el stream de mensajes que se transmitirá/recibirá?


#define USER_DBG 1
// Para habilitar/deshabilitar información de depuración:
// 0: Deshabilitada
// 1: Información básica
// 2: Más información, pero puede hacer que el sistema falle (debido a Serial.print())


void producer_task( void* pvParameters )
{
   MESSAGE_TYPE out_buffer[ MESSAGE_ELEMS ];

   uint8_t index = 0;

   float data = 0.5;

   while( 1 )
   {
      vTaskDelay( pdMS_TO_TICKS( 500 ) );

      for( uint8_t i = 0; i < MESSAGE_ELEMS; ++i ){
         
         out_buffer[ i ] = data;

         data += 0.5;

      }

      size_t bytes_sent = 
         xMessageBufferSend( 
            g_message_buffer,
            (void*) out_buffer,
            MESSAGE_SIZE,
            pdMS_TO_TICKS( 50 ) );

#if USER_DBG > 0
      Serial.print( "ENVIADOS: " );
      Serial.println( bytes_sent );
#endif

      if( bytes_sent != MESSAGE_SIZE ){ // timeout:
         Serial.print( "TO(W): " );
      } 
   }
}

void consumer_task( void* pvParameters )
{
   MESSAGE_TYPE in_buffer[ MESSAGE_ELEMS ];

   while( 1 )
   {
      size_t received_bytes = 
         xMessageBufferReceive(
            g_message_buffer,
            (void*) in_buffer,
            MESSAGE_SIZE,
            pdMS_TO_TICKS( 1000 ) );

#if USER_DBG > 0
      Serial.print( "RECIBIDOS: " );
      Serial.println( received_bytes );
#endif

      if( received_bytes < MESSAGE_SIZE ) { // timeover:
         Serial.println( "TO(R)" );
      } 

#if USER_DBG > 1
      else{
         for( size_t i = 0; i < received_bytes; ++i ){
            Serial.println( in_buffer[ i ] );
         }
      }
#endif
   }
}


void setup()
{
   xTaskCreate( producer_task, "PROD", 128 * 3, NULL, tskIDLE_PRIORITY + 1, NULL );

   xTaskCreate( consumer_task, "CONS", 128 * 3, NULL, tskIDLE_PRIORITY, NULL );

   g_message_buffer = xMessageBufferCreate( MESSAGE_BUFFER_SIZE );

   configASSERT( g_message_buffer );
   // Error creando al flujo

   pinMode( 13, OUTPUT );

   Serial.begin( 115200 );

   vTaskStartScheduler();
}

void loop() {}

Una salida de este programa con la constante USER_DBG puesta a 1 es:

Ejemplo 2: Transfiere un valor float. El mensaje se crea de forma estática

Este ejemplo muestra cómo transferir un sólo valor float. Puedes modificarlo para usarlo con valores enteros, int, o reales dobles, double. Presta especial atención a la constante MESSAGE_ELEMS la cual está puesta a 1, ya que es éste el número de elementos que queremos transferir. Así mismo, como no tenemos necesidad de declarar un arreglo, pasaremos a las funciones de escritura/lectura la dirección de una variable float, la cual hace las veces de búfer (sí, una simple variable puede ser un búfer, todo depende del contexto).

#include <FreeRTOS.h>
#include <task.h>
#include <message_buffer.h>


MessageBufferHandle_t g_message_buffer = NULL;

#define MESSAGE_ELEMS ( 1 )
// ¿Cuántos elementos tendrá el búfer de mensajes del USUARIO?

#define MESSAGE_TYPE float
// ¿De qué tipo es el búfer de mensajes?

#define MESSAGE_TYPE_SIZE ( sizeof( MESSAGE_TYPE ) )
// ¿Cuántos bytes ocupa una variable de tipo MESSAGE_TYPE?

#define MESSAGE_BUFFER_SIZE ( ( MESSAGE_ELEMS * MESSAGE_TYPE_SIZE ) + 4 )
// ¿Cuántos bytes ocupa el búfer INTERNO de mensajes?

#define MESSAGE_SIZE ( MESSAGE_ELEMS ) * ( MESSAGE_TYPE_SIZE )
// ¿Cuántos bytes ocupa el stream de mensajes que se transmitirá/recibirá?


#define USER_DBG 2
// Para habilitar/deshabilitar información de depuración:
// 0: Deshabilitada
// 1: Información básica
// 2: Más información, pero puede hacer que el sistema falle (debido a Serial.print())


void producer_task( void* pvParameters )
{
   float data = 0.0;

   while( 1 )
   {

      vTaskDelay( pdMS_TO_TICKS( 500 ) );
      data += 0.5;

      size_t bytes_sent = 
         xMessageBufferSend( 
            g_message_buffer,
            (void*) &data,
            MESSAGE_SIZE,
            pdMS_TO_TICKS( 50 ) );

#if USER_DBG > 0
      Serial.print( "ENVIADOS: " );
      Serial.println( bytes_sent );
#endif

      if( bytes_sent != MESSAGE_SIZE ){ // timeout:
         Serial.print( "TO(W): " );
      } 
   }
}

void consumer_task( void* pvParameters )
{
   float data = 0.0;

   while( 1 )
   {
      size_t received_bytes = 
         xMessageBufferReceive(
            g_message_buffer,
            (void*) &data,
            MESSAGE_SIZE,
            pdMS_TO_TICKS( 1000 ) );

#if USER_DBG > 0
      Serial.print( "RECIBIDOS: " );
      Serial.println( received_bytes );
#endif

      if( received_bytes < MESSAGE_SIZE ) { // timeover:
         Serial.println( "TO(R)" );
      } 

#if USER_DBG > 1
      else{
         Serial.print( "DATA: " );
         Serial.println( data );
      }
#endif
   }
}

void setup()
{
   xTaskCreate( producer_task, "PROD", 128 * 3, NULL, tskIDLE_PRIORITY + 1, NULL );

   xTaskCreate( consumer_task, "CONS", 128 * 3, NULL, tskIDLE_PRIORITY, NULL );


   static uint8_t message_buffer_array[ MESSAGE_BUFFER_SIZE + 1 ];
   // arreglo subyacente para el búfer interno

   static StaticMessageBuffer_t message_buffer_struct;
   // variable de estado que mantiene a la estructura del mensaje

   g_message_buffer = 
   xMessageBufferCreateStatic( 
         MESSAGE_BUFFER_SIZE,
         message_buffer_array,
         &message_buffer_struct );


   pinMode( 13, OUTPUT );

   Serial.begin( 115200 );

   vTaskStartScheduler();
}

void loop() {}

La salida de este programa, con la constante USER_DBG puesta a 2 para imprimir el valor transferido, ya que la carga de Serial.print() no es tan alta (es sólo un valor float), es:

Ejemplo 3:Transfiere un arreglo de variables compuestas. El mensaje se crea de forma estática

Aunque es muy útil transferir variables o arreglos de tipos básicos, es igual de útil transferir información más compleja. En este ejemplo (y en el siguiente) tendremos un tipo compuesto Message (tú puedes darle el nombre que mejor se adapte en tu aplicación):

typedef struct Message_t
{
   uint16_t cont;
   float temp;
} Message;

La idea de este tipo compuesto es tener dos variables de diferentes tipos lógicamente relacionadas bajo un mismo nombre (lo cual coincide con la definición de estructura en C). El campo cont simplemente es un contador ascendente, mientras que el campo temp es la temperatura ambiente (a través de un sensor LM35). El ejemplo transferirá un arreglo de 4 elementos de tipo Message.

Nota que la constante MESSAGE_TYPE ahora está puesta al nombre que le dimos a la estructura, es decir:

#define MESSAGE_TYPE Message

El ejemplo completo es:

#include <FreeRTOS.h>
#include <task.h>
#include <message_buffer.h>


MessageBufferHandle_t g_message_buffer = NULL;

typedef struct Message_t
{
   uint16_t cont;
   float temp;
} Message;


#define MESSAGE_ELEMS ( 4 )
// ¿Cuántos elementos tendrá el búfer de mensajes del USUARIO?

#define MESSAGE_TYPE Message
// ¿De qué tipo es el búfer de mensajes?

#define MESSAGE_TYPE_SIZE ( sizeof( MESSAGE_TYPE ) )
// ¿Cuántos bytes ocupa una variable de tipo MESSAGE_TYPE?

#define MESSAGE_BUFFER_SIZE ( ( MESSAGE_ELEMS * MESSAGE_TYPE_SIZE ) + 4 )
// ¿Cuántos bytes ocupa el búfer INTERNO de mensajes?

#define MESSAGE_SIZE ( MESSAGE_ELEMS ) * ( MESSAGE_TYPE_SIZE )
// ¿Cuántos bytes ocupa el stream de mensajes que se transmitirá/recibirá?


#define USER_DBG 1
// Para habilitar/deshabilitar información de depuración:
// 0: Deshabilitada
// 1: Información básica
// 2: Más información, pero puede hacer que el sistema falle (debido a Serial.print())



void producer_task( void* pvParameters )
{
   Message data_tx[ MESSAGE_ELEMS ] = { 0 };

   while( 1 )
   {
      for( size_t i = 0; i < MESSAGE_ELEMS; ++i ){

         vTaskDelay( pdMS_TO_TICKS( 100 ) );

         data_tx[ i ].cont++;
         data_tx[ i ].temp = ( analogRead( A0 ) * 5.0 * 100.0 ) / 1024.0;
      }

      size_t bytes_sent = 
         xMessageBufferSend( 
            g_message_buffer,
            (void*) data_tx,
            MESSAGE_SIZE,
            pdMS_TO_TICKS( 50 ) );

#if USER_DBG > 0
      Serial.print( "ENVIADOS: " );
      Serial.println( bytes_sent );
#endif

      if( bytes_sent != MESSAGE_SIZE ){ // timeout:
         Serial.print( "TO(W): " );
      } 
   }
}

void consumer_task( void* pvParameters )
{
   Message data_rx[ MESSAGE_ELEMS ] = { 0 };

   while( 1 )
   {
      size_t received_bytes = 
         xMessageBufferReceive(
            g_message_buffer,
            (void*) data_rx,
            MESSAGE_SIZE,
            pdMS_TO_TICKS( 1000 ) );

#if USER_DBG > 0
      Serial.print( "RECIBIDOS: " );
      Serial.println( received_bytes );
#endif

      if( received_bytes < MESSAGE_SIZE ) { // timeover:
         Serial.println( "TO(R)" );
      } 

#if USER_DBG > 1
      else{
         for( size_t i = 0; i < MESSAGE_ELEMS; ++i ){

            Serial.print( "CONT: " );
            Serial.print( data_rx[ i ].cont );

            if( data_rx[ i ].temp > 23.0 ){
               Serial.print( ", TEMP: " );
               Serial.print( data_rx[ i ].temp );
            }
            Serial.print( "\n" );
         }
      }
#endif
   }
}


void setup()
{
   xTaskCreate( producer_task, "PROD", 128 * 3, NULL, tskIDLE_PRIORITY + 1, NULL );

   xTaskCreate( consumer_task, "CONS", 128 * 3, NULL, tskIDLE_PRIORITY, NULL );


   static uint8_t message_buffer_array[ MESSAGE_BUFFER_SIZE + 1 ];
   // arreglo subyacente para el búfer interno

   static StaticMessageBuffer_t message_buffer_struct;
   // variable de estado que mantiene a la estructura del mensaje


   g_message_buffer = 
   xMessageBufferCreateStatic( 
         MESSAGE_BUFFER_SIZE,
         message_buffer_array,
         &message_buffer_struct );


   pinMode( 13, OUTPUT );

   Serial.begin( 115200 );

   Serial.println( "\nRESET" );

   vTaskStartScheduler();
}

void loop() {}

La salida de este programa con la constante USER_DBG puesta a 1 es:

Ejemplo 4: Transfiere una variable compuesta. El mensaje se crea de forma estática

Este último ejemplo muestra cómo transferir una variable simple de tipo Message. Las tareas productora y consumidora declaran sendas variables del tipo mencionado, mientras que la constante MESSAGE_ELEMS toma el valor 1:

#define MESSAGE_ELEMS ( 1 )

void producer_task( void* pvParameters )
{
   Message data_tx = { 0 };
   // ...
}

void consumer_task( void* pvParameters )
{
   Message data_rx = { 0 };
   // ...
}

El ejemplo completo es:

#include <FreeRTOS.h>
#include <task.h>
#include <message_buffer.h>


MessageBufferHandle_t g_message_buffer = NULL;

typedef struct Message_t
{
   uint16_t cont;
   float temp;
} Message;


#define MESSAGE_ELEMS ( 1 )
// ¿Cuántos elementos tendrá el búfer de mensajes del USUARIO?

#define MESSAGE_TYPE Message
// ¿De qué tipo es el búfer de mensajes?

#define MESSAGE_TYPE_SIZE ( sizeof( MESSAGE_TYPE ) )
// ¿Cuántos bytes ocupa una variable de tipo MESSAGE_TYPE?

#define MESSAGE_BUFFER_SIZE ( ( MESSAGE_ELEMS * MESSAGE_TYPE_SIZE ) + 4 )
// ¿Cuántos bytes ocupa el búfer INTERNO de mensajes?

#define MESSAGE_SIZE ( MESSAGE_ELEMS ) * ( MESSAGE_TYPE_SIZE )
// ¿Cuántos bytes ocupa el stream de mensajes que se transmitirá/recibirá?


#define USER_DBG 1
// Para habilitar/deshabilitar información de depuración:
// 0: Deshabilitada
// 1: Información básica
// 2: Más información, pero puede hacer que el sistema falle (debido a Serial.print())


void producer_task( void* pvParameters )
{
   Message data_tx = { 0 };

   while( 1 )
   {
      vTaskDelay( pdMS_TO_TICKS( 500 ) );

      data_tx.cont++;
      data_tx.temp = ( analogRead( A0 ) * 5.0 * 100.0 ) / 1024.0;

      size_t bytes_sent = 
         xMessageBufferSend( 
            g_message_buffer,
            (void*) &data_tx,
            MESSAGE_SIZE,
            pdMS_TO_TICKS( 50 ) );

#if USER_DBG > 0
      Serial.print( "ENVIADOS: " );
      Serial.println( bytes_sent );
#endif

      if( bytes_sent != MESSAGE_SIZE ){ // timeout:
         Serial.print( "TO(W): " );
      } 
   }
}

void consumer_task( void* pvParameters )
{
   Message data_rx = { 0 };

   while( 1 )
   {
      size_t received_bytes = 
         xMessageBufferReceive(
            g_message_buffer,
            (void*) &data_rx,
            MESSAGE_SIZE,
            pdMS_TO_TICKS( 1000 ) );

#if USER_DBG > 0
      Serial.print( "RECIBIDOS: " );
      Serial.println( received_bytes );
#endif

      if( received_bytes < MESSAGE_SIZE ) { // timeover:
         Serial.println( "TO(R)" );
      } 

#if USER_DBG > 1
      else{
         Serial.print( "CONT: " );
         Serial.print( data_rx.cont );
         Serial.print( ", TEMP: " );
         Serial.println( data_rx.temp );
      }
#endif
   }
}


void setup()
{
   xTaskCreate( producer_task, "PROD", 128 * 3, NULL, tskIDLE_PRIORITY + 1, NULL );

   xTaskCreate( consumer_task, "CONS", 128 * 3, NULL, tskIDLE_PRIORITY, NULL );


   static uint8_t message_buffer_array[ MESSAGE_BUFFER_SIZE + 1 ];
   // arreglo subyacente para el búfer interno

   static StaticMessageBuffer_t message_buffer_struct;
   // variable de estado que mantiene a la estructura del mensaje

   g_message_buffer = 
   xMessageBufferCreateStatic( 
         MESSAGE_BUFFER_SIZE,
         message_buffer_array,
         &message_buffer_struct );


   pinMode( 13, OUTPUT );

   Serial.begin( 115200 );

   Serial.println( "\nRESET" );

   vTaskStartScheduler();
}

void loop() {}

La salida de este programa con la constante USER_DBG puesta a 1 es:

Otras funciones de la API

Hemos estudiado apenas las funciones más útiles de la API, sin embargo quedan algunas que es importante que las conozcas (aquí podrás ver la API completa).

ESCRITURA Y LECTURA DESDE INTERRUPCIONES

En lecciones anteriores hablé de funciones espejo que deben ser utilizadas cuando son llamadas desde dentro de una interrupción (ISR); a este tipo de funciones FreeRTOS le llama interrupt safe version. Los mensajes tienen dos:

Los tres primeros parámetros y el valor devuelto son lo mismo que sus contrapartes. Pero observa dos diferencias fundamentales:

  • No tienen al parámetro xTicksToWait; esto es así porque una ISR no debe bloquearse jamás.
  • Las funciones ponen a pdTRUE el parámetro pxHigherPriorityTaskWoken si la llamada a éstas implica que una tarea de alta prioridad pasó del estado Blocked al estado Ready y es necesario hacer un cambio de contexto, es decir, pasarla al estado Run.

El siguiente fragmento muestra su posible uso (no lo probé, pero así se usa):

void producer_ISR()
{
   isr_ack();
   // "reconocemos" la interrupción (ack -> acknowledge)


   BaseType_t pxHigherPriorityTaskWoken = pdFALSE;
   // siempre debe ser inicializada a pdFALSE

   static uint32_t data = 0;

   size_t bytes_sent = 
     xMessageBufferSendFromISR( 
        g_message_buffer,
        (void*) &data,
        MESSAGE_SIZE,
        &pxHigherPriorityTaskWoken );

   ++data;
   // simulamos la información

   if( bytes_sent != MESSAGE_SIZE ){
      // no hubo espacio en el flujo
   }

   if( pxHigherPriorityTaskWoken != pdFALSE ) taskYIELD;
   // cambio de contexto opcional, pero recomendable
}

FUNCIONES AUXILIARES

La API de los mensajes incluye una serie de funciones para preguntar o alterar el estado del flujo:

  • BaseType_t xMessageBufferIsEmpty( MessageBufferHandle_t xMessageBuffer ). Devuelve pdTRUE si el mensaje indicado en xMessageBuffer está vacío, o pdFALSE en caso contrario.
  • BaseType_t xMessageBufferIsFull( MessageBufferHandle_t xMessageBuffer ). Devuelve pdTRUE si el mensaje indicado en xMessageBuffer está lleno, o pdFALSE en caso contrario.
  • size_t xMessageBufferBytesAvailable( MessageBufferHandle_t xMessageBuffer ). Devuelve el número de elementos actualmente en el mensaje indicado por xMessageBuffer que se pueden leer.
  • size_t xMessageBufferSpacesAvailable( MessageBufferHandle_t xMessageBuffer ). Devuelve el número de elementos libres en el mensaje indicado por xMessageBuffer, es decir, cuánto espacio queda libre.
  • BaseType_t xMessageBufferReset( MessageBufferHandle_t xMessageBuffer ). Pone a su estado inicial al mensaje indicado por xMessageBuffer.

¿Qué sigue?

Transferir información de una tarea a otra, o de una interrupción a una tarea, es una actividad muy común y muy importante en nuestros sistemas embebidos. En la lección de hoy vimos cómo transferir datos de tipos simples (diferentes de bytes) y de tipos compuestos utilizando el mecanismo de FreeRTOS, mensajes (en su nomenclatura oficial, message buffers). Los mensajes están basados en los flujos, pero salvaguardan la integridad de la información evitando que leas 3 bytes de un entero de 32 bits, o 1 byte de un número real de 4 bytes.

RECUERDA: Utiliza las constantes que te di al principio de esta lección, en lugar de calcular por tu cuenta los diferentes tamaños de búferes y bytes a transferir.

RECUERDA: Siempre que te sea posible utiliza objetos estáticos.

RECUERDA: Si vas a llamar a una función de FreeRTOS desde una interrupción, verifica que ésta sea interrupt safe version, es decir, tenga terminación FromISR.

RECUERDA: Los mensajes se utilizan para transferir todos los tipos básicos de C y tipos compuestos. Si tu intención es transferir bytes sin ningún formato, ni inicio ni fin, entonces deberías utilizar los flujos.

En la siguiente lección estudiaremos los semáforos mutex, los cuales nos pueden ayudar a que un recurso no sea accesado por más de una tarea al mismo tiempo.

(En esta página encontrarás información de suma importancia sobre los flujos, mensajes y notificaciones. Deberías leerla en cuanto te sea posible.)

Índice del curso

Espero que esta entrada haya sido de tu interés. Si fue así podrías suscribirte a mi blog, o escríbeme a fjrg76 dot hotmail dot com, o comparte esta entrada con alguien que consideres que puede serle de ayuda.


Si encuentras este blog interesante, entonces podrías considerar suscribirte a él y recibir información relevante sobre tecnología y sistemas embebidos, y de vez en cuando, uno que otro regalo.

Comunicación inter-tareas (I): Flujos (stream buffers)

En los sistemas concurrentes y multitarea las diferentes tareas deben pasarse información; de otra manera el sistema no tendría utilidad. En los sistemas de cómputo tradicional con una o varias CPUs el sistema operativo reserva una área de la memoria RAM para escribir y leer información; a esta memoria se le conoce como memoria compartida, y dependiendo de la complejidad del sistema ésta podría estar protegida. En esta memoria compartida uno o varios procesos escriben, y uno o varios procesos leen, todo bajo la supervisión del sistema operativo, y quizás, de la propia CPU.

En nuestros sistemas embebidos de pequeña escala la memoria compartida podría tomar la forma de variables globales, arreglos globales o listas enlazadas globales, y casi siempre sin intervención ni supervisión de un sistema operativo. Esto es, podríamos declarar una variable global que sea escrita por una o más tareas y leída por una o más tareas, pero aunque este esquema es simple, es inseguro e ineficiente: ¿cómo sabemos que el dato está listo para ser leído?, ¿cómo evitamos que dos o más tareas escriban al mismo tiempo en la variable?

Para lograr una comunicación inter-tareas (inter-task communication, en inglés) segura y eficiente debemos utilizar esquemas más elaborados, de los cuales, afortunadamente, FreeRTOS incluye una cantidad de ellos, a saber:

  • Notificaciones. Sirven para pasar un dato de hasta 32 bits, y ya las estudiamos en esta lección.
  • Flujos. Sirven para pasar una cadena de bytes (sin formato) de una tarea a otra. Es el tema de la lección de hoy.
  • Mensajes. Sirven para pasar objetos (con formato) de una tarea a otra. Es el tema de la siguiente lección.
  • Colas. Igual que los mensajes. Históricamente aparecieron en FreeRTOS antes que los mensajes, y tienen ventajas sobre éstos, por ejemplo, diferentes tareas pueden escribir a la cola, y diferentes tareas pueden leer de la cola; esto no es posible ni con los fujos ni mensajes. Es el tema de una siguiente lección.

Las notificaciones, flujos y mensajes son de reciente aparición en FreeRTOS y han sido implementados como una opción ligera (consume menos recursos, pero con algunas restricciones) a las colas, las cuales existen prácticamente desde el inicio de de FreeRTOS.

Tabla de contenidos

¿Qué es y qué necesito?

Un flujo (stream buffer, en FreeRTOS) es una secuencia de bytes (es decir, datos de tipo uint8_t) para pasar información de una tarea a otra, o de una interrupción a una tarea. Como veremos más adelante, podemos pasar de una tarea a otra un arreglo completo de datos uint8_t, o juntar datos uint8_t individuales hasta completar un cierto número, después de lo cual la tarea consumidora los consumirá.

Hago énfasis en que el tipo de dato debe ser uint8_t, ya sea un arreglo o variable simple, porque pretender utilizar los flujos para pasar datos de tipos diferentes al mencionado (int, float, struct) conlleva una serie de problemas, que si no son resueltos, corromperán los datos (ya me pasó).

A TOMAR EN CUENTA: Los flujos (stream buffers, en FreeRTOS) se utilizan para pasar datos cuyo tipo nativo sea uint_8 (o su equivalente unsigned char). Si necesitas pasar tipos simples (int, float) o tipos compuestos (struct), entonces debes usar los mensajes (message buffers, en FreeRTOS).

Mucha de la información generada y procesada en nuestros sistemas embebidos es de tipo uint8_t:

  • Los datos que entran y salen por las UARTs.
  • Los datos que entran y salen por los canales SPI desde/hacia periféricos.
  • Los datos que entran y salen por los canales I2C e IIS desde/hacia periféricos.

Cuando te encuentres en uno de estos casos, entre otros muchos que no he enlistado, entonces utiliza los flujos.

Algo que debes saber cuando utilizas flujos (y mensajes y colas) es que cada byte se escribe y se lee por copia, en lugar de por referencia. Supogamos que en tu tarea tienes un búfer de datos para transmitir (un búfer es un arreglo) de 80 bytes. Cuando haces la llamada a la función xStreamBufferSend() (la cual veremos en un momento) los 80 bytes de tu búfer se copian al búfer interno del flujo (este búfer interno es un arreglo de tipo uint8_t). Cuando la tarea consumidora hace la llamada a la función xStreamBufferReceive() (la cual veremos en un momento) los 80 bytes del búfer interno del flujo se copian al arreglo que hayas declarado como búfer de recepción.

Otra cosa que debes saber cuando utilizas flujos y mensajes es que éstos están optmizados para operar tarea-a-tarea, es decir, una sola tarea escribe al flujo, y una sola tarea lee del flujo. No están hechos para que dos o más tareas escriban, o dos o más tareas lean desde el mismo flujo o mensaje, pero tampoco implementan un mecanismo que lo evite. Tú como programador eres responsable de que eso no pase. Pero si en verdad necesitas que un mismo flujo sea accesado por dos o más tareas, entonces deberás utilizar mecanismos que supervisen que solamente una tarea está accesando al flujo en un determinado momento. En FreeRTOS puedes utilizar los semáforos mutex (los cuales son tema de una próxima lección) para otorgar acceso exclusivo a una tarea a la vez.

Para que puedas utilizar los flujos en tus programas asegúrate que el archivo stream_buffer.c sea parte del proceso de compilación. Este archivo ya está incluído cuando descargas FreeRTOS, y en consecuencia, en el proyecto Molcajete también está donde debe de estar, por lo que no debes procuparte por ello. Así mismo, en cada archivo fuente donde uses a los flujos deberás agregar el encabezado #include <stream_buffer.h>.

También es necesario que la constante INCLUDE_xTaskGetCurrentTaskHandle esté puesta a 1 (en el archivo de configuración FreeRTOSConfig.h):

#define INCLUDE_xTaskGetCurrentTaskHandle 1

Los 3 mecanismos mencionados (flujos, mensajes y colas) necesitan un búfer de datos interno, es decir, un lugar dónde guardar cada byte. Este búfer es un arreglo de elementos uint8_t de un tamaño que tú estableces cuando creas a los objetos. Cuando utilizas creación dinámica de objetos este arreglo se crea de manera automática dentro de la función xStreamBufferCreate(), por lo cual no debes preocuparte de nada (en apariencia. Siempre que te sea posible utiliza la creación estática de objetos, es más segura); mientras que si usas la creación estática de objetos, tú eres responsable de pasarle dicho arreglo a la función xStreamBufferCreateStatic(). A continuación veremos ambas formas.

(Para conocer o recordar la diferencia y la configuración entre objetos dinámicos y estáticos, te recomiendo leer esta lección y esta otra lección, respectivamente. Aunque ambas hablan sobre tareas, la teoría aplica para cualquier tipo de objeto.)

Creación dinámica

Los flujos requieren ser creados, a diferencia de las notificaciones que son parte de las tareas. Como mencioné, podemos crearlos de forma dinámica o estática; veamos primero la forma dinámica:

xBufferSizeBytes. Es el número máximo de bytes que el búfer puede alojar. (size_t es un alias para unsigned long int, y lo usamos cuando se trata de tamaños y límites.)

xTriggerLevelBytes. Aquí especificas cuántos bytes se tienen que escribir al flujo para sacar del estado Blocked (es decir, despertar) a la tarea que estuviera esperando por la información. El valor mínimo es 1 y el máximo no puede ser mayor que xBufferSizeBytes. Por ejemplo, si el búfer estuviera vació y tú especificas el valor 10, entonces la tarea consumidora no saldrá del estado Blocked sino hasta que 10 bytes hayan sido escritos. Es posible cambiar este valor con la función xStreamBufferSetTriggerLevel(). Si la cantidad de bytes establecida en esta función no arriban dentro del tiempo establecido en la tarea consumidora, los n bytes que hubieran sido escritos estarán disponibles. En el ejemplo de los 10 bytes, si sólo llegaron 7 antes de que el tiempo de espera de la tarea consumidora expire, entonces ésta tendrá acceso a esos 7 bytes. Para saber cuántos bytes están disponibles usa la función xStreamBufferBytesAvailable().

Valor devuelto. Si hubo memoria suficiente para crear al flujo, entonces la función devuelve el handler; en caso contrario (no hubo memoria suficiente) devuelve NULL. Este handler lo debes de guardar, tanto para que el resto de funciones de flujos sepan sobre cuál van a operar, como para probar que efectivamente fue creado.

Por ejemplo, para crear un flujo de 128 elementos y que la tarea consumidora sea despertada (sacada del estado Blocked) cuando el flujo tenga al menos 12 bytes, el código nos queda así:

#include <FreeRTOS.h>
#include <task.h>
#include <stream_buffer.h>

StreamBufferHandle_t g_stream_buffer = NULL;
// el handler debe ser global


#define MAX_CAP 128
#define TRIGGER_POINT 12

void setup()
{
   // código

   g_stream_buffer = xStreamBufferCreate( MAX_CAP, TRIGGER_POINT );

   configASSERT( g_stream_buffer );
   // Error creando al flujo

   // más código ...
}

¿Porqué el valor para despertar a la tarea consumidora es 12, mientras que el tamaño máximo es mucho mayor? Que hayan llegado los 12 bytes necesarios no significa que la tarea consumidora los procesará de manera inmediata, peor si ésta es de baja prioridad. Esto es, cuando los 12 bytes hayan llegado al flujo la tarea consumidora pasará del estado Blocked al estado Ready (elegible para ejecutarse), pero quizás no al estado Run (es decir, usar ciclos de CPU) de manera inmediata. Entonces más datos podrían estarse escribiendo en el búfer durante el tiempo que le tome pasar del estado Ready al estado Run. Por esta razón habrá que hacer al búfer un poco más grande (aunque en el ejemplo exageré).

El handler, g_stream_buffer, es global porque debe ser visible tanto en la tarea productora como en la consumidora.

La creación dinámica de objetos no asegura que los objetos se creen, principalmente por escasez de memoria RAM. Por eso siempre deberíamos preguntar si el objeto realmente fue creado, y aunque existen diversas maneras de hacerlo, en este ejemplo escogí la macro configASSERT() que es una versión propia de la conocida assert() de C (en el archivo FreeRTOSConfig.h podrás ver la implementación que utilicé para el projecto Molcajete). Por supuesto que puedes usar al condicional if y tratar de recuperarte sin terminar el programa de manera abrupta.

Creación estática

Ahora veamos cómo crear a un flujo de manera estática. Como es usual, esta forma de creación de objetos necesita más pasos, pero es mucho más segura ya que los objetos siempre serán creados.

Los primeros dos argumentos son idénticos a los de la función xStreamBufferCreate(), así que no los repetiré, y la explicación de los restantes aquí está:

pucStreamBufferStorageArea. Es el búfer interno del flujo donde se guardarán los datos, y no es otra cosa que un arreglo de elementos de tipo uint8_t de tamaño al menos xBufferSizeBytes + 1. Este arreglo deberá existir a lo largo del programa, por lo cual deberás crearlo de manera global o estática a la función donde fue llamada xStreamBufferCreateStatic().

pxStaticStreamBuffer. Esta es la variable que guarda el estado del flujo, es decir, la estructura de datos que maneja al flujo (el flujo es algo más que un simple arreglo). Al igual que el parámetro pucStreamBufferStorageArea deberá existir a lo largo del programa.

Valor devuelto. El handler al flujo. La creación estática de objetos nunca falla, a menos que el arreglo o la variable de estado sean NULL, cosa que tal vez nunca suceda.

#include <FreeRTOS.h>
#include <task.h>
#include <stream_buffer.h>

StreamBufferHandle_t g_stream_buffer = NULL;
// el handler debe ser global

#define MAX_CAP 128
#define TRIGGER_POINT 12

void setup()
{
   xTaskCreate( producer_task, "PROD", 128, NULL, tskIDLE_PRIORITY + 1, NULL );

   xTaskCreate( consumer_task, "CONS", 128, NULL, tskIDLE_PRIORITY, &consumer_handler );

   static uint8_t stream_buffer_array[ MAX_CAP + 1 ];
   // el array subyacente debe ser un elemento más grande que el parámetro xBufferSizeBytes
   // es static para que el arreglo perdure durante todo el programa sin ser visible al resto de funciones

   static StaticStreamBuffer_t stream_buffer_struct;
   // es static para que la variable perdure durante todo el programa sin ser visible al resto de funciones

   g_stream_buffer = xStreamBufferCreateStatic( 
         MAX_CAP,
         TRIGGER_POINT,
         stream_buffer_array,     // arreglo que hace las veces de búfer interno
         &stream_buffer_struct ); // variable de estado del flujo

   // los objetos estáticos siempre se crean, así que no hay necesidad de
   // preguntar si el flujo fue creado o no

   pinMode( 13, OUTPUT );
   // emula a la luz de fondo del LCD

   Serial.begin( 115200 );
   // lo usa la tarea principal

   vTaskStartScheduler();
}

El tamaño del arreglo subyacente (stream_buffer_array[]), es decir, el búfer donde se guardan los datos, debe ser un elemento más grande que el establecido en el argumento xBufferSizeBytes. En este ejemplo marqué a las variables stream_buffer_array y stream_buffer_struct como static; esto para que perduren durante todo el programa, pero que a la vez sean invisibles al resto de funciones. Una alternativa es que las declares globales, aunque entre menos variables globales tengas en tus programas, mejor. Sin embargo el handler, g_stream_buffer, sí que debe ser global porque tanto la tarea productora como la consumidora deben tener acceso a él. Este es un ejemplo de las veces en que sí son necesarias las variables globales. El prefijo g_ nos recuerda que es una variable global y que debemos tener mucho cuidado con ella.

Escribiendo y leyendo al flujo

Una vez que el flujo ha sido creado ya podemos utilizarlo escribiendo en y leyendo desde él. Los datos que deseas transmitir se deben encontrar en un búfer de tu aplicación, y dependiendo de lo que desees transmitir este búfer puede ser un arreglo o una variable, ambos de tipo char, unsigned char, int8_t, o uint8_t. Recuerda que si quieres transmitir variables de tipos simples o de tipos compuestos, entonces deberías utilizar a los mensajes (tema de la siguiente lección).

La función para escribir en el flujo es:

xStreamBuffer. Es el handler del flujo en el que queremos escribir.

pvTxData. Es un apuntador al búfer que queremos transmitir. Dicho búfer lo deberás promocionar a void*. Es importante que recuerdes que los elementos de este búfer son copiados al flujo.

xTicksToWait. Si no hay espacio disponible para una nueva copia de xDataLengthBytes bytes en el flujo, entonces la tarea productora (la que llama a esta función) entrará al estado Blocked por el tiempo determinado en este parámetro. Si el tiempo expira la tarea pasará al estado Ready. El tiempo de espera está en ticks y puedes usar la macro pdMS_TO_TICKS() para convertir milisegundos a ticks. Así mismo, si escribes 0 la función regresa inmediatamente, y si escribes portMAX_DELAY el tiempo de espera es infinito (siempre y cuando la constante simbólica INCLUDE_vTaskSuspend esté a 1). Si el tiempo hubiese expirado, la función habrá escrito la mayor cantidad de bytes posible.

Valor devuelto. Esta función devuelve el número de bytes escritos al flujo.

La función para leer desde el flujo es:

xStreamBuffer. Es el handler del flujo del que queremos leer.

pvRxData. Es un apuntador al búfer en el que los datos leídos se van a guardar. Dicho búfer lo deberás promocionar a void*. Es importante que recuerdes que los elementos de este búfer son copiados desde el flujo al búfer.

xBufferLengthBytes. Es la cantidad máxima de bytes que deseas leer desde el flujo en una sola llamada. Para cualquier tipo diferente de char deberás multiplicar el número de elementos de ese tipo por el tamaño en bytes que ocupa una variable de ese tipo. Esta función devolverá tantos bytes como sea posible, teniendo como límite xBufferLengthBytes.

xTicksToWait. Es el tiempo de espera máximo que la tarea consumidora estará en el estado Blocked mientras arriban bytes a partir de que el búfer estuviera vacío. El tiempo de espera está en ticks y puedes usar la macro pdMS_TO_TICKS() para convertir milisegundos a ticks. Así mismo, si escribes 0 la función regresa inmediatamente, y si escribes portMAX_DELAY el tiempo de espera es infinito (siempre y cuando la constante simbólica INCLUDE_vTaskSuspend esté a 1). Si el tiempo hubiese expirado antes de completar la petición (), la función habrá leído la mayor cantidad de bytes posible.

Valor devuelto. Esta función devuelve el número de bytes leídos desde flujo.

Ejemplo 1

El primer ejemplo que vamos a ver muestra el uso más básico de los flujos: transmitir n bytes de la tarea productora a la consumidora; con n constante. Los flujos no imponen un inicio o un final en la secuencia de bytes, pero en muchas aplicaciones vamos a tomar 10 lecturas y transmitir ese mismo número, sin trucos ni cosas raras.

Con dicha restricción el ejemplo se simplifica ya que no tendremos que verificar si hay espacio en el flujo o no, ni preocuparnos por el punto de disparo (trigger point, en inglés) xTriggerLevelBytes. A éste lo podemos definir como 1 o como n. Recuerda que el punto de disparo es la cantidad de bytes que deben estar en el flujo para que la tarea consumidora sea sacada del estado Blocked. Pero como la copia del búfer de la tarea productora hacia el flujo es muy rápida, en muchas situaciones el resultado será el mismo con el punto de disparo a 1 o a n.

IMPORTANTE: La función de Arduino Serial.print() (y derivados) no son thread-safe; esto es, su comportamiento es miserable en ambientes multitarea (dos tareas tratando de imprimir), dado que Arduino nunca fue pensado para sistemas concurrentes. Esto quedó de manifiesto mientras desarrollaba el ejemplo, al grado que tuve que eliminar prácticamente todas las llamadas.

Mi primer idea para el ejemplo era leer del ADC, quitarle 2 bits a cada lectura (el ADC es de 10 bits) y transmitirlos; sin embargo, para ver que lo transmitido era idéntido a lo recibido debía imprimir los valores tanto en la tarea productora como en la consumidora (en el segundo ejemplo sí que uso al ADC). Pero dado lo que comenté en el recuadro anterior con respecto a Serial.print(), preferí irme por el lado seguro y utilizar un contador ascendente. De esta manera, lo impreso del lado de la tarea consumidora debía ser una secuencia de valores perfectamente definida. Sin más, aquí está el ejemplo:

#include <FreeRTOS.h>
#include <task.h>
#include <stream_buffer.h>


StreamBufferHandle_t g_stream_buffer = NULL;

#define BUFFER_ELEMS ( 8 )
// número de elementos para cada uno de los búfers del usuario

#define STREAM_ELEMS ( 10 )
// número de elementos en el flujo. Hemos dejado un par de bytes para seguridad

#define STREAM_TRIGGER_POINT ( 8 )
// número de bytes escritos al flujo para que la tarea consumidora sea despertada


void producer_task( void* pvParameters )
{
   uint8_t out_buffer[ BUFFER_ELEMS ];

   uint8_t index = 0;

   uint8_t data = 0;

   while( 1 )
   {
      for( uint8_t i = 0; i < BUFFER_ELEMS; ++i ){
         vTaskDelay( pdMS_TO_TICKS( 50 ) );
       
         out_buffer[ i ] = data++;
      }

      size_t bytes_sent = 
         xStreamBufferSend( 
            g_stream_buffer,
            (void*) out_buffer,
            BUFFER_ELEMS,
            pdMS_TO_TICKS( 50 ) );

      if( bytes_sent != sizeof( out_buffer ) ){ // timeout:
         Serial.print( "TO(W): " );
         Serial.println( bytes_sent );
      } 
   }
}

void consumer_task( void* pvParameters )
{
   uint8_t in_buffer[ BUFFER_ELEMS ];

   while( 1 )
   {
      size_t received_bytes = 
         xStreamBufferReceive(
            g_stream_buffer,
            (void*) in_buffer,
            BUFFER_ELEMS,
            pdMS_TO_TICKS( 500 ) );

         Serial.print( "RECIBIDOS: " );
         Serial.println( received_bytes );

      if( received_bytes < BUFFER_ELEMS ) { // timeover:
         Serial.println( "TO(R)" );
      } else{

         for( size_t i = 0; i < received_bytes; ++i ){
            Serial.println( in_buffer[ i ] );
         }

      }
   }
}

void setup()
{
   xTaskCreate( producer_task, "PROD", 128 * 3, NULL, tskIDLE_PRIORITY + 1, NULL );

   xTaskCreate( consumer_task, "CONS", 128 * 3, NULL, tskIDLE_PRIORITY, NULL );

   g_stream_buffer = xStreamBufferCreate( STREAM_ELEMS, STREAM_TRIGGER_POINT );

   configASSERT( g_stream_buffer );
   // Error creando al flujo

   pinMode( 13, OUTPUT );
   // emula a la luz de fondo del LCD

   Serial.begin( 115200 );
   // lo usa la tarea principal

   vTaskStartScheduler();
}

void loop() {}

Una corrida de este ejemplo es:

Salida del programa. La información hasta el símbolo -> es parte de la terminal serial de la IDE de Arduino. Lo importante es la secuencia correcta de valores.

Lo primero que notarás en el ejemplo es que utilicé constantes simbólicas, #define XXX YYY, para establecer algunos tamaños, por seguridad. Imagina que usas números directamente, y luego cambias de 8 elementos a 6, pero se te olvida realizar la actualización en todos los lados donde el tamaño del búfer va a ser utilizado. Eso no es bueno, nada bueno. Establecí el punto de disparo en 8 elementos; esto es, cuando el flujo tenga 8 elementos, entonces sacará del estado Blocked a la tarea consumidora. El valor 1 entrega los mismos resultados.

Ejemplo 2

En este segundo ejemplo veremos algunas cosas interesantes:

  • La creación estática de los flujos.
  • Escribiremos al flujo dato por dato, es decir, la tarea productora no implementa un búfer, como en el ejemplo anterior, sino que una vez generada la información la escribiremos. Esto implica que en el parámetro xTicksToWait de la función xStreamBufferSend() podríamos escribir un tiempo de espera de 0; esto es, la tarea productora no se bloqueará en caso de que el flujo esté lleno, simplemente regresará. Usé 0 para variar al ejemplo.
  • Para este ejemplo sí utilicé un sensor de temperatura. El valor 11 que verás repetido en la imagen de la corrida del programa se corresponde con una temperatura de 21.48 grados.
  • Establecí el punto de disparo (trigger point) a 8. Esto significa que la tarea consumidora será despertada (es decir, sacada del estado Blocked) cada vez que se hayan realizado 8 lecturas del ADC.
  • El ADC del ATmega328 es de 10 bits, pero para meterlo en un dato de 8 bits le quité dos. En el código agregué la fórmula en caso de que quieras imprimir la temperatura en lugar del valor de lectura del ADC.
#include <FreeRTOS.h>
#include <task.h>
#include <stream_buffer.h>


StreamBufferHandle_t g_stream_buffer = NULL;

#define BUFFER_ELEMS ( 8 )
// número de elementos para cada uno de los búfers del usuario

#define STREAM_ELEMS ( 10 )
// número de elementos en el flujo. Hemos dejado un par de bytes para seguridad

#define STREAM_TRIGGER_POINT ( 8 )
// número de bytes escritos al flujo para que la tarea consumidora sea despertada

void producer_task( void* pvParameters )
{
   TickType_t xLastWakeTime = xTaskGetTickCount();

   uint8_t data = 0;

   while( 1 )
   {
      vTaskDelayUntil( &xLastWakeTime, pdMS_TO_TICKS( 50 ) );

      data = analogRead( A0 ) >> 2;
      // -- usé el sensor LM35
      // -- le quitamos dos bits a la lectura (el valor es correcto, pero con menos
      // resolución)

      size_t bytes_sent = xStreamBufferSend( g_stream_buffer, (void*) &data, 1, 0 );
      // escribimos un sólo byte y no hay necesidad de esperar

      if( bytes_sent == 0 ){ // timeout:
         Serial.println( "TO(W)" );
      }
   }
}

void consumer_task( void* pvParameters )
{
   uint8_t in_buffer[ BUFFER_ELEMS ];

   while( 1 )
   {
      size_t received_bytes = 
         xStreamBufferReceive(
            g_stream_buffer,
            (void*) in_buffer,
            BUFFER_ELEMS,
            pdMS_TO_TICKS( 500 ) );

         Serial.print( "RECIBIDOS: " );
         Serial.println( received_bytes );

      if( received_bytes < BUFFER_ELEMS ) { // timeover:
         Serial.println( "TO(R)" );
      } 
#if 1
      else{

         for( size_t i = 0; i < received_bytes; ++i ){

            Serial.println( in_buffer[ i ] );

            // si quieres imprimir la temperatura usa la siguiente fórmula:
            // temp = ( in_buffer[ i ] * 500 ) / 256.0;
            // Serial.println( temp );
         }
      }
#endif
   }
}


void setup()
{
   xTaskCreate( producer_task, "PROD", 128, NULL, tskIDLE_PRIORITY + 1, NULL );

   xTaskCreate( consumer_task, "CONS", 128, NULL, tskIDLE_PRIORITY, NULL );

   static uint8_t stream_buffer_array[ STREAM_ELEMS + 1 ];
   // el array subyacente debe ser un elemento más grande que el parámetro xBufferSizeBytes
   // es static para que el arreglo perdure durante todo el programa sin ser visible al resto de funciones

   static StaticStreamBuffer_t stream_buffer_struct;
   // es static para que la variable perdure durante todo el programa sin ser visible al resto de funciones


   g_stream_buffer = xStreamBufferCreateStatic( 
         STREAM_ELEMS,
         STREAM_TRIGGER_POINT,
         stream_buffer_array,
         &stream_buffer_struct );

   // los objetos estáticos siempre se crean, así que no hay necesidad de
   // preguntar si el flujo fue creado o no

   pinMode( 13, OUTPUT );
   // emula a la luz de fondo del LCD

   Serial.begin( 115200 );
   // lo usa la tarea principal

   vTaskStartScheduler();
}

void loop() {}

Una corrida de este programa es:

El valor 11 se corresponde a una temperatura de 21.48 grados.

Otras funciones de la API

Hemos estudiado apenas las funciones más útiles de la API, sin embargo quedan algunas que es importante que las conozcas (aquí podrás ver la API completa).

Escritura y lectura desde interrupciones

En lecciones anteriores hablé de funciones espejo que deben ser utilizadas cuando son llamadas desde dentro de una interrupción (ISR); a este tipo de funciones FreeRTOS le llama interrupt safe version. Los flujos tienen dos:

Los tres primeros parámetros y el valor devuelto son lo mismo que sus contrapartes. Pero observa dos diferencias fundamentales:

  • No tienen al parámetro xTicksToWait; esto es así porque una ISR no debe bloquearse jamás.
  • Las funciones ponen a pdTRUE el parámetro pxHigherPriorityTaskWoken si la llamada a éstas implica que una tarea de alta prioridad pasó del estado Blocked al estado Ready y es necesario hacer un cambio de contexto, es decir, pasarla al estado Run.

El siguiente fragmento muestra su posible uso (no lo probé, pero así se usa):

void producer_ISR()
{
   isr_ack();
   // "reconocemos" la interrupción (ack -> acknowledge)


   BaseType_t pxHigherPriorityTaskWoken = pdFALSE;
   // siempre debe ser inicializada a pdFALSE

   static uint8_t data = 0;

   size_t bytes_sent = 
     xStreamBufferSendFromISR( 
        g_stream_buffer,
        (void*) &data,
        1,
        &pxHigherPriorityTaskWoken );

   if( bytes_sent == 0 ){
      // no hubo espacio en el flujo
   }

   if( pxHigherPriorityTaskWoken != pdFALSE ) taskYIELD;
   // cambio de contexto opcional, pero recomendable
}

Funciones auxiliares

La API de los flujos incluye una serie de funciones para preguntar o alterar el estado del flujo:

  • BaseType_t xStreamBufferIsEmpty( StreamBufferHandle_t xStreamBuffer ). Devuelve pdTRUE si el flujo indicado en xStreamBuffer está vacío, o pdFALSE en caso contrario.
  • BaseType_t xStreamBufferIsFull( StreamBufferHandle_t xStreamBuffer ). Devuelve pdTRUE si el flujo indicado en xStreamBuffer está lleno, o pdFALSE en caso contrario.
  • BaseType_t xStreamBufferSetTriggerLevel( StreamBufferHandle_t xStreamBuffer, size_t xTriggerLevel ). Establece el nuevo punto de disparo xTriggerLevel para el flujo indicado en xStreamBuffer. Devuelve pdTRUE si xTriggerLevel es menor o igual al tamaño del flujo, o pdFALSE en caso contrario (el punto de disparo es mayor que el tamaño del flujo).
  • size_t xStreamBufferBytesAvailable( StreamBufferHandle_t xStreamBuffer ). Devuelve el número de elementos actualmente en el flujo indicado por xStreamBuffer que se pueden leer.
  • size_t xStreamBufferSpacesAvailable( StreamBufferHandle_t xStreamBuffer ). Devuelve el número de elementos libres en el flujo indicado por xStreamBuffer, es decir, cuánto espacio queda libre.
  • BaseType_t xStreamBufferReset( StreamBufferHandle_t xStreamBuffer ). Pone a su estado inicial al flujo indicado por xStreamBuffer.

¿Qué sigue?

En esta lección hemos estudiado el primero de 3 mecanismos que FreeRTOS incluye para pasar largas cantidades de información entre tareas, o entre una interrupción y una tarea. Vimos cómo crear flujos de manera dinámica y estática, y también cómo escribir y leer tanto arreglos como datos individuales.

RECUERDA: Siempre que te sea posible utiliza objetos estáticos.

RECUERDA: Si vas a llamar a una función de FreeRTOS desde una interrupción, verifica que ésta sea interrupt safe version, es decir, tenga terminación FromISR.

RECUERDA: Los flujos se utilizan para transmitir y recibir secuencias de bytes. Si tu intención es otra, entonces usa mensajes o colas.

En la siguiente lección estudiaremos a los mensajes, los cuales son una extensión lógica (y muy necesaria) de los flujos. Con los mensajes ya podremos transmitir y recibir datos más grandes que uint8_t, tales como int‘s, float‘s, y tipos compuestos, struct.

(En esta página encontrarás información de suma importancia sobre los flujos, mensajes y notificaciones. Deberías leerla en cuanto te sea posible.)

Índice del curso

Espero que esta entrada haya sido de tu interés. Si fue así podrías suscribirte a mi blog, o escríbeme a fjrg76 dot hotmail dot com, o comparte esta entrada con alguien que consideres que puede serle de ayuda.


Si encuentras este blog interesante, entonces podrías considerar suscribirte a él y recibir información relevante sobre tecnología y sistemas embebidos, y de vez en cuando, uno que otro regalo.

Semáforos con notificaciones

En la programación concurrente, la que hemos estado realizando con FreeRTOS, es difícil, si no imposible, determinar a priori cuándo se va a ejecutar una tarea; peor aun cuando la ejecución de una tarea depende de otra. Para sincronizar tareas utilizamos semáforos binarios. Un semáforo binario tiene solamente dos posibles valores, 0 y 1. El valor 0 puede significar que el evento esperado no ha sucedido, mientras que el valor 1 puede significar que el evento ya sucedió y que podemos seguir adelante.

Por otro lado a veces una tarea tiene que contar eventos (personas, vueltas, autos, etc) y emitir un aviso a otra tarea cuando se llegue a un límite predefinido, o ir procesando uno a uno. En estos casos utilizamos semáforos contadores, los cuales son mucho mejores que una variable entera contadora. Así mismo, los semáforos contadores los podemos utilizar también para administrar los limitados recursos de nuestros sistemas.

Además de los semáforos binarios y contadores, existe una tercer clase de semáforos, los mutex (del inglés, mutual exclusion, y en español, exclusion mutua). Este tipo de semáforos asegura, hasta cierto punto, que un recurso solamente puede ser accesado por una tarea a la vez. Imagina que tienes dos o más tareas, donde cada una utiliza al convertidor ADC. El ADC de los procesadores es uno solo con varios canales. Y resulta que, al menos en el procesador ATmega328, si inicias una conversión mientras otra está en proceso, ambas se corrompen. ¿Qué hacer para asegurarse que nada más una tarea accesa al ADC a la vez? Un semáforo mutex. Una primer tarea obtiene el mutex, usa al ADC, y luego devuelve el mutex para que otra tarea lo use.

En FreeRTOS hay dos formas de implementar a los semáforos binarios y contadores: utilizando las notificaciones directas a la tarea (direct to task notifications) o utilizando primitivas del sistema operativo para semáforos (aquí puedes ver la API de estas primitivas). La diferencia es que estás últimas consumen más recursos (aunque pueden notificar a más de una tarea), y el propio creador de FreeRTOS recomienda las notificaciones directas en lugar de las primitivas para la mayoría de casos (mientras que para casos extremos recomienda usar dichas primitivas). En esta lección veremos semáforos binarios y contadores a través de las notificaciones.

Los semáforos mutex son más complejos, por lo que la única forma de implementarlos es a través de las primitivas de FreeRTOS, lo cual requeriría de su propia lección, y por eso no los trataremos en la lección de hoy.

En esta lección y en esta ya traté ampliamente el tema de las notificaciones directas a la tarea, y aunque en la lección de hoy los semáforos binarios y contadores están basadas en éstas, afortunadamente no debemos meternos en sus detalles, ya que las funciones para usar semáforos se encargan de ellos.

Tabla de contenidos

¿Qué es y qué necesito?

Los semáforos son mecanismos de sincronización, conteo y exclusión mutua internos a los sistemas operativos concurrentes (o multitarea). Los semáforos operan sobre dos premisas, ser dados y ser obtenidos (en inglés, given y taken, respectivamente). La tarea que quiere avisar que el evento se produjo da al (u otorga el) semáforo, mientras que la tarea que está a la espera de él lo obtiene. (A la tarea que lo da le estaré llamando «la tarea productora«, mientras que a la tarea que lo obtiene le llamaré «la tarea consumidora«). En la literatura encontrarás diferentes denominaciones para las operaciones de dar y obtener (Given/Taken, P/V, Signal/Wait, etc) pero la semántica es la misma; voy a usar los términos given y taken porque son los utilizados por FreeRTOS.

Un semáforo binario puede ser dado una vez, y obtenido una vez, mientras que un semáforo contador puede ser dado varias veces y obtenido una o varias veces. El semáforo binario es un caso especial del semáforo contador; siendo que la cuenta máxima del semáforo binario es uno. El semáforo mutex es un semáforo binario, pero con características extras, como la capacidad de elevar (de forma temportal) la prioridad de la tarea que lo obtiene.

Los semáforos basados en notificaciones directas no necesitan ser inicializados ni destruídos, ya que cada tarea incluye un campo de 32 bits que se usa para llevar la cuenta. FreeRTOS proporciona, en su forma más básica, dos funciones para las operaciones de dar y obtener (hay más, de las cuales hablaré después porque tienen que ver con ser llamadas desde interrupciones), xTaskNotifyGive() y ulTaskNotifyTake(), respectivamente.

La función para dar el semáforo es:

xTaskToNotify. Es el handler de la tarea consumidora, es de decir, aquella tarea que está esperando por el semáforo. Una de las limitaciones de usar semáforos basados en notificaciones es que únicamente una tarea puede ser notificada. Si un mismo evento debe ser notificado a varias tareas, entonces tendrías que usar varios semáforos.

Valor devuelto. Esta función siempre devuelve el valor pdPASS.

RECUERDA QUE: la operación Give (o dar) incrementa en uno al contador del semáforo.

La función para obtener el semáforo es:

xClearCountOnExit. Cuando este parámetro vale pdFALSE la cuenta se decrementa en 1 antes de abandonar la función. Lo utilizaremos para semáforos contadores. Cuando este parámetro vale pdTRUE la cuenta se pone a 0 antes de abandonar la función. Lo utilizaremos para semáforos binarios.

xTicksToWait. Tiempo de espera, en ticks, antes de que la tarea se desbloquée; esto es, después de una llamada a esta función la tarea esperará a lo más xTicksToWait ticks en el estado Block (es decir, sin usar ciclos de CPU) antes de despertarse. Para convertir milisegundos a ticks puedes utilizar la macro pdMS_TO_TICKS(). Para no esperar puedes escribir 0; para un tiempo de espera infinito escribe portMAX_DELAY (asegúrate que la constante INCLUDE_vTaskSuspend, en el archivo FreeRTOSConfig.h, está puesta a 1).

Valor devuelto. Esta función devuelve valor de la cuenta antes de que se le aplique lo indicado por xClearCountOnExit.

RECUERDA QUE: la operación Take (u obtener) decrementa en uno al contador del semáforo o lo pone a cero (dependiendo de si xClearCountOnExit es pdFALSE o pdTRUE, respectivamente).

Semáforos binarios

Vamos a ver un ejemplo simple de semáforo binario utilizando las notificaciones directas a la tarea. La tarea productora otorga el semáforo a intervalos regulares de tiempo. En una aplicación real o más elaborada estaría avisando que una conversión está lista, o que un mensaje se recibió, o que se llegó a una cuenta de eventos externos. La siguiente línea es la que otorga al semáforo:

xTaskNotifyGive( consumer_handler );

El handler a la tarea que obtiene al semáforo debe ser global para que la tarea productora lo vea.

Por otro lado, la tarea consumidora se bloquea mientras el semáforo no le sea dado:

if( ulTaskNotifyTake( pdTRUE, pdMS_TO_TICKS( 2000 ) ) == 1 ){

    // el semáforo fue obtenido y se hace algo

} else{ 

    // el semáforo no se obtuvo después del tiempo de espera

}

Observa que: el parámetro xClearCountOnExit está a pdTRUE indicando que después de leer la cuenta, ésta será puesta a 0; y que el tiempo de espera lo pusimos a 2000 ms. Si el semáforo no fuera dado dentro de esta ventana de tiempo, entonces tomamos acciones correctivas.

/*
 * Semáforo binario
 */

#include <FreeRTOS.h>
#include <task.h>

TaskHandle_t consumer_handler = NULL;
// la tarea productora debe conocer el handler de la tarea receptora


void producer_task( void* pvParameters )
{
   TickType_t xLastWakeTime = xTaskGetTickCount();

   while( 1 )
   {
      vTaskDelayUntil( &xLastWakeTime, pdMS_TO_TICKS( 1000 ) );

      xTaskNotifyGive( consumer_handler );
   }
}

void consumer_task( void* pvParameters )
{
   while( 1 )
   {
      if( ulTaskNotifyTake( pdTRUE, pdMS_TO_TICKS( 2000 ) ) == 1 ){

         digitalWrite( 13, HIGH );
         vTaskDelay( pdMS_TO_TICKS( 250 ) );
         digitalWrite( 13, LOW );

      } else{ 
      // timeout:

         digitalWrite( 13, HIGH );

      }
   }
}

void setup()
{
   xTaskCreate( producer_task, "PROD", 128, NULL, tskIDLE_PRIORITY + 1, NULL );

   xTaskCreate( consumer_task, "CONS", 128, NULL, tskIDLE_PRIORITY, &consumer_handler );

   pinMode( 13, OUTPUT );

   Serial.begin( 115200 );
   // lo usa la tarea principal

   vTaskStartScheduler();
}

void loop() {}

Semáforos contadores

La teoría detrás de los semáforos contadores indica que podemos usar a éstos para contar eventos (ir de 0 hasta un cierto número), o para contar recursos (ir de un cierto número a 0). Sin embargo, los semáforos basados en notificaciones a la tarea de FreeRTOS no poseen ninguna de estas cualidades; es decir, la única forma para saber que la cuenta llegó a un máximo establecido es que la tarea receptora lleve la cuenta por cada elemento del cual se le ha notificado, y cuando llegue al máximo, realizar lo necesario.

Por otro lado, tampoco es posible inicializar a estos semáforos en un cierto número y luego irlo decrementando por cada recurso utilizado, lo cual complica, al igual que en el escenario recién descrito, llevar la cuenta de los recursos que quedan en un sistema, ya que es el programador, y no el semáforo, quien lo debe hacer. Los propios ejemplos de FreeRTOS se adaptan a la falta de ambas cualidades que mencioné, dejando nada más una aplicación práctica para este tipo de semáforos contadores: las interrupciones diferidas.

Es de todos sabido que el código dentro de una interrupción (ISR, en inglés) debe ser lo más rápida posible. Sin embargo, algunas veces debemos realizar algún tipo de procesamiento «lento» sobre el dato o evento que originó la llamada a la ISR. Una técnica eficiente es diferir el procesamiento de la ISR pasándole la responsabilidad del procesamiento a una tarea. A eso es a lo que le decimos interrupciones diferidas, y podemos utilizar semáforos contadores para llevarla a cabo.

En el siguiente ejemplo la tarea productora genera «eventos» a dos frecuencias diferentes, una lenta y una rápida, tratando de imitar la llegada de eventos a diferentes tiempos; en cada evento otorga el semáforo. Por otro lado, la tarea consumidora «consume» (no esperábamos menos) un evento a la vez. Esos números raros que verás en el código son números primos que utilicé para que la simulación fuera más realista, evitando que hubiera alguna relación entre ellos. Los «eventos» no son otra cosa que un contador que se incrementa cada vez que el semáforo es dado, y que se decrementa cuando el semáforo es obtenido.

Vale la pena notar la diferencia en la llamada a la función ulTaskNotifyTake() en un semáforo contador contra el binario que vimos hace un momento:

if( ulTaskNotifyTake( pdFALSE, pdMS_TO_TICKS( 1901 ) ) > 0 ){ /*...*/ }

El argumento xClearCountOnExit está puesto a pdFALSE, indicando que la cuenta debe ser decrementada (en lugar de ser puesta a 0, como en los binarios), y además probamos si el valor devuelto es mayor que 0. Si es mayor a 0 quiere decir que hay «eventos» que deben ser procesados.

/*
 * Semáforo contador
 */

#include <FreeRTOS.h>
#include <task.h>

TaskHandle_t consumer_handler = NULL;

uint16_t g_token = 0;
// contador de "eventos"


void producer_task( void* pvParameters )
{
   TickType_t xLastWakeTime = xTaskGetTickCount();

   bool rate = true;
   uint8_t cont = 3;
   TickType_t frequency = 347;

   while( 1 )
   {
      vTaskDelayUntil( &xLastWakeTime, pdMS_TO_TICKS( frequency ) );

      ++g_token;
      xTaskNotifyGive( consumer_handler );


      --cont;
      if( cont == 0 ){

         if( rate == true ){    // lento:
            rate = false;
            cont = 5;
            frequency = 1451;
         } else{                // rápido:
            rate = true;
            cont = 3;
            frequency = 347;
         }
      }

      Serial.print( "TOKEN(G): " );
      Serial.println( g_token );
   }
}

void consumer_task( void* pvParameters )
{
   while( 1 )
   {
      vTaskDelay( pdMS_TO_TICKS( 907 ) );
      // hacemos tiempo para que se nos acumulen las notificaciones

      if( ulTaskNotifyTake( pdFALSE, pdMS_TO_TICKS( 1901 ) ) > 0 ){

         --g_token;
         Serial.print( "TOKEN(T): " );
         Serial.println( g_token );

      } else{ 
      // timeout:

         Serial.println( "TIMEOVER" );

      }
   }
}

void setup()
{
   xTaskCreate( producer_task, "PROD", 128, NULL, tskIDLE_PRIORITY + 1, NULL );

   xTaskCreate( consumer_task, "CONS", 128, NULL, tskIDLE_PRIORITY, &consumer_handler );

   pinMode( 13, OUTPUT );

   Serial.begin( 115200 );
   // lo usa la tarea principal

   vTaskStartScheduler();
}

void loop() {}

Una salida parcial del programa anterior es:

TOKEN(G) significa: semáforo dado; TOKEN(T) significa: semáforo obtenido. La cifra es el número de notificaciones pendientes.

Semáforos y las interrupciones

Si hay un lugar en el que seguramente vamos a querer usar semáforos es dentro de las interrupciones (ISRs), ya sea por diseño o porque utilizamos el mecanismo que describí hace un momento: las interrupciones diferidas. En cualquier caso necesitaremos otorgar el semáforo (binario o contador) desde dentro de la ISR, pero no puedes usar a la función xTaskNotifyGive(), ya que las llamadas a las funciones de FreeRTOS desde dentro de una ISR tienen un comportamiento diferente a cuando son llamadas desde funciones regulares.

FreeRTOS incluye muchas funciones espejo que deben ser utilizadas desde dentro de las ISRs; para el caso que nos ocupa esta función es vTaskNotifyGiveFromISR():

xTaskToNotify. Es el handler de la tarea consumidora, es de decir, aquella tarea que está esperando por el semáforo. Una de las limitaciones de usar semáforos basados en notificaciones es que únicamente una tarea puede ser notificada. Si un mismo evento debe ser notificado a varias tareas, entonces tendrías que usar varios semáforos.

pxHigherPriorityTaskWoken. Esta función pondrá este parámetro a pdTRUE si la llamada saca del estado Block (es decir, despierta) a una tarea que estuviera esperando por la notificación, y que además dicha tarea tuviera una prioridad mayor que la tarea que actualmente se estuviera ejecutando. Si este es el caso, entonces tú como programador podrías llevar a cabo un cambio de contexto (en inglés, context switch) para que la tarea con mayor prioridad se ejecute saliendo de la ISR. No es obligatorio realizar el cambio de contexto, pero de lo contrario, de haber despertado a una tarea con mayor prioridad ésta deberá esperar hasta un nuevo cambio de contexto del sistema operativo (por ejemplo, en el siguiente tick) o que alguna tarea devuelva voluntariamente la CPU (a través de la macro taskYIELD()).

Valor devuelto. A diferencia de su función espejo, ésta no devuelve nada.

El acceso a las interrupciones en Arduino es algo complicado, así que en lugar de usar una ISR real, simplemente vamos a simular una, y aunque no podremos probarla, espero que te quede claro el uso de esta función.

void producer_ISR()
{
   isr_ack();
   // "reconocemos" la interrupción (ack -> acknowledge)


   BaseType_t pxHigherPriorityTaskWoken = pdFALSE;

   xTaskNotifyGiveFromISR( consumer_handler, &pxHigherPriorityTaskWoken );

   if( pxHigherPriorityTaskWoken != pdFALSE ) taskYIELD;
   // cambio de contexto opcional, pero recomendable
}

Puedes usar con toda seguridadLa función ulTaskNotifyTake() tanto en funciones regulares como en ISRs.

¿Qué sigue?

En la lección de hoy hemos visto cómo utilizar las notificaciones directas a la tarea como semáforos. Vimos ejemplos de semáforos binarios y contadores, y cómo otorgar al semáforo desde dentro de una ISR.

A diferencia de los semáforos propiamente dichos, hacerlo de esta manera ahorra muchos recursos, pero como nada es gratis en la vida, existen compromisos. Uno de los principales es que solamente una tarea puede ser notificada. Los semáforos «reales» (es decir, aquellos incluídos como tal en FreeRTOS) pueden notificar a varias tareas. Así mismo, este mecanismo no permite iniciar en un valor diferente de 0, ni establecer una cuenta máxima. Sin embargo, el abanico de aplicaciones donde pueden ser usados hace que valga la pena conocerlos. En una lección posterior veremos los semáforos «reales» de FreeRTOS.

Finalmente, ten en cuenta que los semáforos no son la panacea de la programación concurrente, ya que traen asociados sus propios problemas; sin embargo, enlistar éstos y buscar las posibles soluciones va más allá del ámbito de esta lección, y quizás merezcan la suya propia. La buena noticia es que muchos de los objetos de FreeRTOS para pasar información (notificaciones, colas, mensajes, flujos) incluyen mecanismos de sincronización («despiertan» o sacan del estado Block a las tareas consumidoras) que hace que utilizar semáforos «en bruto» sea el plan B.

Índice del curso

Espero que esta entrada haya sido de tu interés. Si fue así podrías suscribirte a mi blog, o escríbeme a fjrg76 dot hotmail dot com, o comparte esta entrada con alguien que consideres que puede serle de ayuda.


Si encuentras este blog interesante, entonces podrías considerar suscribirte a él y recibir información relevante sobre tecnología y sistemas embebidos, y de vez en cuando, uno que otro regalo.

Software timers

Muchas de nuestras aplicaciones están comandadas por el tiempo: cada cierto tiempo se debe actualizar el display; cada cierto tiempo se debe leer el teclado, cada cierto tiempo se debe realizar una conversión en el ADC, etc.

En la programación convencional (sin sistema operativo) nos hemos visto obligados a crear máquinas de estado dentro de una interrupción periódica para procesar todas esas actividades. Y aunque que esta solución efectivamente funciona, es difícil que escale.

Por otro lado, cuando usamos un sistema operativo para el manejo de esas tareas periódicas debemos crear una tarea por cada una de ellas. Cada tarea, por simple que sea (tal como leer un teclado o actualizar un display), consume recursos. Y tarea a tarea, los recursos se terminan.

La idea de los software timers (temporizadores por software) es tener una tarea a nivel del sistema operativo que centralice y administre a diferentes temporizadores, reduciendo el costo en recursos y simplificando la programación. Tú como programador solamente te encargas de decirle a cada temporizador lo que debe hacer cuando el tiempo establecido expire.

En esta lección veremos cómo puedes utilizarlos en tus proyectos de tiempo real.

Tabla de contenidos

¿Qué es y qué necesito?

Los temporizadores por software (de aquí en adelante les diré simplemente temporizadores o timers) son un mecanismo de FreeRTOS que nos permite temporizar actividades de manera muy fácil y con un mínimo de recursos. Luego de activar esta funcionalidad, dentro de FreeRTOS existirá una tarea (oculta a nosotros) que administrará cada uno de los temporizadores que hayamos creado, ya sin nuestra intervención.

Cada temporizador puede ser iniciado, detenido o reiniciado. Cada uno de estas acciones (FreeRTOS les dice comandos) puede ser lanzado por la propia callback, por otra callback, o por otra tarea.

Un dato interesante es que los temporizadores no dependen de otras tareas en el sistema (las tuyas), lo cual permite que ¡tengas una aplicación completa basada en temporizadores sin tareas! Podrás imaginarte cómo lograrlo después de ver los ejemplos más adelante.

Otra característica de los temporizadores es que los puedes crear como de una sola vez (one-shot) o repetitivos (auto-reload), y después, si es necesario, cambiar este comportamiento (a esta característica le decimos el modo, o mode en inglés). Así mismo, es posible cambiar el periodo (es decir, el tiempo entre llamadas) cuando el programa se está ejecutando.

Un elemento central de los temporizadres son las funciones callback, por lo que es lo primero que debemos revisar.

Funciones callback

Una función callback es una función, Fa, que tú le pasas a otra función, Fb, como parámetro para que ésta la ejecute; es decir, la función Fb recibe y ejecuta el código de Fa.

Cada vez que creas un temporizador deberás pasar una callback para que cuando el tiempo expire dicho código sea ejecutado, en cualquiera de los dos modos de operación de los temporizadores (one-shot o auto-reload).

La firma de las callbacks que los temporizadores utilizan es así:

Esto es, la función que escribas para que se ejecute cada vez que el tiempo expire no devuelve ningún resultado (void) y tiene un parámetro (TimerHandle_t). Por ejemplo, una función update_display() que actualiza un display de 7 segmentos cada cierto tiempo se vería así (omitiendo el código real):

void update_display( TimerHandle_t xTimer )
{
    // aquí el código que actualiza al display
}

El parámetro xTimer (al cual puedes nombrar como tú quieras) sirve para saber cuál temporizador mandó llamar a la callback. FreeRTOS permite que diferentes temporizadores llamen a la misma callback, por lo tanto, ésta debe de saber quién la llamó para ejecutar el código correcto. Pero si tu callback es llamada por un único temporizador, entonces podrías ignorarlo, o darle otros usos, como lo explicaré más adelante.

Demonios y daemons

A la tarea administradora de los timers le llamamos daemon (demonio en español).

El término daemon ha sido utilizado desde el principio de las computadoras para indicar procesos (o tareas) del sistema operativo que no son interactivas y que se ejecutan de manera oculta al usuario.

Cuando habilitas los temporizadores en FreeRTOS, éste crea una tarea que estará corriendo sin que te des cuenta (de ahí el término daemon) verificando el tiempo de finalización de cada temporizador.

Habilitación de los temporizadores

Necesitas 4 simples pasos para habilitar esta funcionalidad en tus proyectos:

1er paso. En el archivo de configuración FreeRTOSConfig.h asegúrate de tener las siguientes líneas (en cualquier lugar del archivo):

#define configUSE_TIMERS                    1
#define configTIMER_TASK_PRIORITY           ( ( UBaseType_t ) 1 )
#define configTIMER_QUEUE_LENGTH            ( ( UBaseType_t ) 3 )
#define configTIMER_TASK_STACK_DEPTH        configMINIMAL_STACK_SIZE * 2

configUSE_TIMERS: La pones a 1 para activar la funcionalidad de timers en FreeRTOS.

configTIMER_TASK_PRIORITY: El daemon de los temporizadores es a final de cuentas una tarea (interna a FreeRTOS) por lo cual requiere una prioridad. Una baja prioridad podría dar lugar a que los comandos no se ejecuten ya que tareas con mayor prioridad no le presten la CPU al daemon. Una alta prioridad (junto con un mal diseño) podría dar lugar a que el daemon consuma todo el tiempo de CPU.

configTIMER_QUEUE_LENGTH: Esta es una cola de comandos. Un comando es una llamada a las funciones (las cuales veremos más adelante) que inician, detienen, reinician o destruyen a los temporizadores. El hecho de hacer la llamada no significa que la instrucción le llegue de inmediato al daemon, y en consecuencia, al temporizador referido. Si hubiera tareas de mayor prioridad al daemon ejecutándose, entonces cada comando se inserta en una cola, y una vez que el daemon tiene tiempo de CPU los comandos se extraen de la cola y, efectivamente, se ejecutan sobre el o los timers indicados. El valor para esta constante simbólica dependerá de la carga de trabajo del sistema, el diseño, y la prioridad del daemon.

configTIMER_TASK_STACK_DEPTH: Esta constante tiene sentido cuando está habilitada la creación dinámica de objetos (configSUPPORT_DYNAMIC_ALLOCATION está puesta a 1). Aquí estableces cuánta memoria le quieres dar al daemon. En el caso de que la creación dinámica estuviera deshabilitada, entonces tú deberás proporcionar la memoria para el daemon (revisa esta lección si no recuerdas cómo hacerlo).

2do paso. Asegúrate que los archivos timers.h y timers.c están en la carpeta src donde están el resto de archivos de FreeRTOS. En el proyecto Molcajete ya están donde deben de estar, así que no deberías preocuparte.

3er paso. Si decides utilizar la forma estática para crear a los temporizadores (de manera exclusiva o junto con la forma dinámica) deberás declarar una función callback para obtener memoria para los temporizadores (nada que ver con las callbacks que hemos estado platicando). Se oye complicado, pero no tienes nada de que preocuparte: FreeRTOS te dice cómo debe ser esa función (aquí puedes verlo) y yo ya la puse donde tiene que ir; en otras palabras no tienes que hacer nada, solamente saberlo. (Si estás interesado busca en el directorio src el archivo static_mem_callbacks.c que yo agregué como parte del proyecto Molcajete.)

4to paso. En los archivos de tu proyecto donde utilices a los temporizadores deberás agregar el archivo de encabezado timers.h:

#include <timers.h>

Desarrollo

Creación de los temporizadores

Tanto el daemon que administra los temporizadores como los temporizadores, pueden ser creados con memoria dinámica o con memoria estática (revisa esta lección y esta para que recuerdes la diferencia entre estas dos formas). Aunque para la mayoría los ejemplos que veremos estaré usando la forma dinámica ya que es más fácil de implementar, te recuerdo que la creación estática de objetos es mejor y más segura.

Las funciones para la creación dinámica y estática de los temporizadores son casi idénticas, de no ser por un parámetro extra para la segunda, por eso voy a utilizar a esta última para explicar los diferentes parámetros.

Temporizadores con memoria estática

pcTimerName: Es una cadena de texto que representa un nombre que le quieras dar al temporizador. FreeRTOS no lo utiliza internamente, pero tú lo puedes utilizar cuando estés depurando tus aplicaciones. El nombre lo puedes obtener posteriormente con la función pcTimerGetName().

xTimerPeriod: Es el periodo, en ticks, del temporizador. Puedes usar la macro pdMS_TO_TICKS() para utilizar milisegundos (ms). Por ejemplo, para un periodo de 500 ms, escribirías pdMS_TO_TICKS( 500 ).

uxAutoReload: Este parámetro es el modo. Cuando escribes pdFALSE estableces que el temporizador será de una sola vez (one-shot), mientras que cuando escribes pdTRUE estableces que será continuo (auto-reload). One-shot significa que cuando expire el periodo la callback se llamará y luego el temporizador entrará al estado dormant (durmiente) hasta que reciba un nuevo comando de inicio. Auto-reload significa que la callback se estará llamando de manera repetida cada vez que expire el periodo sin necesidad de enviar el comando de inicio. El comando de inicio lo envías con la función xTimerStart().

pvTimerID: Una misma callback puede ser llamada por varios temporizadores. En este parámetro puedes establecer un identificador que le ayudará a la callback a determinar cuál temporizador la llamó. Dado que este es un parámetro void* también puedes usarlo para pasarle datos a la callback, o para que ésta mantenga un estado (junto con las funciones TimerSetTimerID() y pvTimerGetTimerID()).

pxCallbackFunction: Es la función que se ejecutará cuando el periodo expire.

pxTimerBuffer: (Este parámetro sólo está definido para la función xTimerCreateStatic()). Es una variable de tipo StaticTimer_t (interna a y) que FreeRTOS utilizará para guardar el estado del temporizador. Deberás crearla global o estática (revisa aquí)

Valor devuelto: Esta función devuelve un handler de tipo TimerHandle_t. Este handler lo necesitan todas las funciones de los temporizadores para saber sobre cuál operarán. En caso de que el temporizador no se haya creado, entonces el valor devuelto es NULL.

Ejemplo 1

Vamos a ver un ejemplo simple utilizando temporizadores estáticos:

#include <FreeRTOS.h>
#include <task.h>
#include <timers.h>

enum { TMR_ONE_SHOT = pdFALSE, TMR_AUTO_RELOAD = pdTRUE };

void callback( TimerHandle_t h )
{
   Serial.println( "hola mundo" );
}

void setup()
{
   static StaticTimer_t tmr1_state;

   TimerHandle_t tmr1_h = xTimerCreateStatic(
         "TMR1",               // nombre
         pdMS_TO_TICKS( 500 ), // periodo inicial
         TMR_AUTO_RELOAD,      // tipo (pdTRUE = auto-reload)
         NULL,                 // ID (se puede usar para pasar información)
         callback,             // función a ejecutar
         &tmr1_state );        // variable para guardar el estado

   configASSERT( tmr1_h != NULL );       

   xTimerStart( tmr1_h, 0 );
   // "arranca" al temporizador que acabamos de crear

   Serial.begin( 115200 );

   vTaskStartScheduler();
}

void loop() 
{
}

En este ejemplo podemos observar varias cosas:

No hay ninguna tarea creada por el usuario (tú). La única actividad del programa se lleva a cabo en la callback, la cual es llamada de manera continua por el daemon del temporizador. Que no haya tareas en este ejemplo no quiere decir que siempre será así; lo menciono porque es interesante que podamos crear aplicaciones completas basadas totalmente en temporizadores.

El handler que devuelve la función lo guardamos en una variable local a la función setup() porque es en esta misma función, y por única vez en este ejemplo, donde lo utilizamos. Lo más común es que sea otra función (una callback o una tarea) quien lo use. En esos casos deberás crear al handler (es decir, la variable tmr1_h) global.

Como el temporizador fue creado de manera estática debíamos crear su variable de estado de tal modo que fuera permanente durante toda la ejecución del programa. Teníamos dos opciones, que fuera global, o hacerla estática a la función. Escogí la segunda por ser más segura:

static StaticTimer_t tmr1_state;

Si utilizas a la función xTimerCreate() en su lugar, entonces este último paso no es necesario.

Habrás notado que en lugar de escribir directamente en el parámetro de modo el valor pdTRUE (modo continuo) agregué una enumeración con símbolos más claros; no es necesario, pero el código se entiende mejor:

enum { TMR_ONE_SHOT = pdFALSE, TMR_AUTO_RELOAD = pdTRUE };

La callback es muy sencilla, y para este ejemplo no guarda ningún estado ni utiliza el parámetro asociado al handler del temporizador que la llamó. Eso lo veremos en un siguiente ejemplo.

La salida es como la esperábamos:

Funciones para iniciar, detener y reiniciar

Antes de ver el siguiente ejemplo detengámonos un momento para ver a algunas de las funciones que controlan la operación de los temporizadores. Comencemos con la función de arranque, xTimerStart():

xTimer: Es el handler del temporizador que quieres iniciar. Éste lo obtuviste cuando lo creaste.

xBlockTime: Es el tiempo de espera, en ticks, para que el comando llegue a la cola de comandos antes de abortar la operación. Ya mencioné que emitir la instrucción no significa que el temporizador la recibirá de forma inmediata, y en caso de que el sistema esté ocupado, ésta se encolará (¿recuerdas la constante configTIMER_QUEUE_LENGTH?). Para convertir milisegundos a ticks puedes usar la macro pdMS_TO_TICKS(). Para un tiempo de espera infinito escribe portMAX_DELAY (asegúrate que la constante INCLUDE_vTaskSuspend, en el archivo FreeRTOSConfig.h, está puesta a 1); para regresar inmediatamente (es decir, sin esperar, puedes escribir 0. Este parámetro es ignorado cuando la función se llama antes de que el planificador inicie; por eso en el ejemplo anterior escribí el valor 0.

Valor devuelto: Si la instrucción no pudo ser encolada antes de que el tiempo establecido por xBlockTime expirara, entonces devolverá pdFAIL. En caso de que el comando sí alcance lugar en la cola, entonces devolverá pdTRUE. La instrucción será efectivamente ejecutada cuando el daemon obtenga tiempo de CPU.

Una función muy importante que quizás llegarás a utilizar con frecuencia es la que detiene al temporizador, xTimerStop():

Los parámetros y el valor devuelto tienen el mismo significado que la función xTimerStart(). Esta función detiene al temporizador indicado por xTimer que previamente hubiera sido iniciado con la función xTimerStart(). La función no espera a que el tiempo expire; en cuanto el daemon recibe el comando el temporizador es detenido. Por supuesto que podrás volver a arrancarlo.

Un temporizador detenido con xTimerStop() se va al estado Dormant. Esto significa que el temporizador sigue siendo parte de sistema, está inactivo (no usa ciclos de CPU), pero, a diferencia del estado Blocked, no está esperando por ningún evento. Como mencioné, para volverlo a la vida deberás reactivarlo con la función xTimerStart() o xTimerReset().

Ejemplo: Arrancando y deteniendo temporizadores

Ahora sí, otro ejemplo. En éste crearemos dos temporizadores. Uno imprimirá texto, T1, mientras que el otro hará parpadear a un LED, T2. T1 detendrá y arrancará a T2 de manera repetida. Por otro lado, los temporizadores serán creados con memoria dinámica:

#include <FreeRTOS.h>
#include <task.h>
#include <timers.h>

enum { TMR_ONE_SHOT = pdFALSE, TMR_AUTO_RELOAD = pdTRUE };

#define BLINKS 3

TimerHandle_t led_h = NULL;
// el handler del timer asociado al LED debe ser global


void msg_callback( TimerHandle_t h )
{
   static uint8_t blinks = BLINKS;
   // usamos una variable estática para guardar el contador.
   // Este método no debe ser usado si la callback es compartida.

   static bool led_state = false;
   // idem


   Serial.print( blinks );
   Serial.println( ": hola mundo" );

   --blinks;
   if( blinks == 0 ) {
      blinks = BLINKS;

      led_state != false ?  
         xTimerStart( led_h, portMAX_DELAY ) :
         xTimerStop( led_h, portMAX_DELAY );

      led_state = !led_state;
   }
}

void led_callback( TimerHandle_t h )
{
   static bool state = false;
   // usamos una variable estática para guardar el estado de esta callback.
   // Este método no debe ser usado si la callback es compartida.

   digitalWrite( 13, state );

   state = !state;
}

void setup()
{
   TimerHandle_t msg_h = xTimerCreate(
         "MSG",                // nombre
         pdMS_TO_TICKS( 1000 ),// periodo inicial
         TMR_AUTO_RELOAD,      // tipo
         NULL,                 // ID
         msg_callback );       // función a ejecutar

   configASSERT( msg_h != NULL );       

   led_h = xTimerCreate(
         "LED",                // nombre
         pdMS_TO_TICKS( 125 ), // periodo inicial
         TMR_AUTO_RELOAD,      // tipo
         NULL,                 // ID
         led_callback );       // función a ejecutar

   configASSERT( msg_h != NULL );       

   xTimerStart( msg_h, 0 );
   // inicia al timer "maestro"
   
   pinMode( 13, OUTPUT );
   // lo usa el timer LED

   Serial.begin( 115200 );
   // lo usa el timer MSG

   vTaskStartScheduler();
}

void loop() 
{
}

La única información nueva que incluí en el ejemplo es la utilización de la constante simbólica portMAX_DELAY en las funciones xTimerStart() y xTimerStop(). Ésta se utiliza para indicarle a FreeRTOS que deberá esperar de manera indefinida por un evento; en otras palabras, que el tiempo de espera xBlockTime será infinito. Lo he hecho de esta manera para que supieras que puedes usarla en tus programas y por simplicidad. Sin embargo, recuerda que en un programa profesional deberás establecer un tiempo de espera para que el sistema responda correctamente ante posibles bloqueos.

Guardando el estado

En el ejemplo anterior guardamos estados y contadores marcando a las respectivas variables como static. Y funciona. Pero como mencioné en el código, éste método no es el adecuado cuando dos o más temporizadores comparten la misma callback. El siguiente ejemplo te muestra cómo utilizar el parámetro ID, junto con las funciones vTimerSetTimerID() y pvTimerGetTimerID(), para guardar un estado.

Antes del ejemplo revisemos muy rápidamente las firmas de dichas funciones:

El punto en común de estas funciones (junto con las de creación de los temporizadores) es que el parámetro para el identificador ID es del tipo void*. Este tipo es oro molido para nosotros los desarrolladores ya que se presta para realizar muchas cosas más allá de las que los diseñadores originales imaginaron. Es decir, la idea del parámetro (y sus funciones asociadas) es utilizarlo para asignar un ID del tipo que nosotros necesitemos (int o char, por ejemplo); sin embargo, void* es un lienzo en blanco que podemos utilizar, como lo muestra el siguiente ejemplo, para cosas completamente distintas:

#include <FreeRTOS.h>
#include <task.h>
#include <timers.h>

enum { TMR_ONE_SHOT = pdFALSE, TMR_AUTO_RELOAD = pdTRUE };

void led_callback( TimerHandle_t h )
{
   int state = (int)pvTimerGetTimerID( h );

   digitalWrite( 13, state );

   vTimerSetTimerID( h, state == LOW ? (void*)HIGH : (void*)LOW );
}

void setup()
{
   static StaticTimer_t tmr1_state;

   auto tmr1_h = xTimerCreateStatic(
         "TMR1",               
         pdMS_TO_TICKS( 500 ), 
         TMR_AUTO_RELOAD,      
         NULL,                 // aquí podríamos pasar el estado inicial del LED
         led_callback,         
         &tmr1_state );        

   configASSERT( tmr1_h != NULL );       

   vTimerSetTimerID( tmr1_h, (void*)HIGH );
   // el estado inicial del LED es HIGH

   xTimerStart( tmr1_h, 0 );

   
   Serial.begin( 115200 );

   pinMode( 13, OUTPUT );

   vTaskStartScheduler();
}

void loop() 
{
}

Temporizador one-shot

Los ejemplos vistos hasta el momento han utilizado el modo auto-reload, el cual inicia de forma continua al temporizador. Sin embargo, en ocasiones vamos a querer iniciar un temporizador y que al finalizar su tiempo realice alguna acción, y ya; es decir, hará su tarea una sola vez. De ser necesario el programa volvería a arrancarlo.

Esta característica nos puede servir para establecer un tiempo para una acción y olvidarnos de ella (nosotros como programadores, no así el sistema operativo); esto es, nuestras tareas no ocuparán ciclos de CPU para estar revisando si el tiempo terminó.

Ejemplo

Imagina que estás escribiendo un sistema que necesite emitir un sonido intermitente mientras la temperatura de un proceso se mantenga por encima de un cierto nivel, solamente para llamar la atención (lo cual es diferente de una alarma donde la señal sería contínua hasta que alguien resuelva el problema). Por supuesto que podrías escribir el código que se encarga de esto en una de tus tareas, pero un temporizador en modo one-shot te simplificará la vida como lo muestra el siguiente código:

#include <FreeRTOS.h>
#include <task.h>
#include <timers.h>

enum { TMR_ONE_SHOT = pdFALSE, TMR_AUTO_RELOAD = pdTRUE };


TimerHandle_t beep_h = NULL;


void read_task( void* pvParameters )
{
   TickType_t last_wake_time = xTaskGetTickCount();

   while( 1 )
   {
      vTaskDelayUntil( &last_wake_time, pdMS_TO_TICKS( pdMS_TO_TICKS( 2000 ) ) );

      float temp = ( analogRead( A0 ) * 5.0 * 100.0 ) / 1024;
      // el sensor es un LM35

      if( temp > 25.0 ){

         // aquí apagaríamos al actuador

         digitalWrite( 13, HIGH );
         // simulamos que activamos la señal audible

         xTimerStart( beep_h, portMAX_DELAY );
         // arrancamos al timer
      }

      Serial.print( "Temp = " );
      Serial.println( temp );
   }
}

void beep_callback( TimerHandle_t h )
{
   // llegaremos aquí cuando el tiempo expire

   digitalWrite( 13, LOW );
   // simulamos que desactivamos la señal audible
}

void setup()
{
   xTaskCreate( read_task, "TEMP", 128, NULL, tskIDLE_PRIORITY, NULL );
   // esta tarea realiza las lecturas. Pudo haber sido un timer también,
   // pero quise cambiarle un poco

   beep_h = xTimerCreate(
         "BEEP",               // nombre
         pdMS_TO_TICKS( 500 ), // duración del beep
         TMR_ONE_SHOT,         // una sola vez
         NULL,                 // no lo usamos
         beep_callback );      // función a ejecutar

   configASSERT( beep_h != NULL );       

   pinMode( 13, OUTPUT );
   // emula a un búzer

   Serial.begin( 115200 );
   // lo usa la tarea principal

   vTaskStartScheduler();
}

void loop() 
{
}

Un diagrama de tiempo de este ejemplo es así:

Reiniciando al temporizador

El último tema que veremos en esta lección es sobre el reinicio (reset) de los temporizadores, a través de la función xTimerReset().

Esto es, puedes hacer que el tiempo se reinicie mientras el temporizador está activo. Una aplicación que se me ocurre es la luz de fondo (backlight) de una pantalla LCD de 16×2. Cuando el usuario presione una tecla la luz del LCD se encenderá por, digamos, 5 segundos. Si el usuario no volviera a presionar ninguna tecla dentro de esa ventana de tiempo, entonces la luz se apagaría; pero si el usuario presionara una tecla dentro de la ventana, entonces el temporizador se reiniciaría a 5 segundos otra vez. Y así sucesivamente mientras el usuario siga presionando teclas. Un ejemplo alternativo, pero similar, es de un sensor de presencia que mientras detecte que hay gente en un pasillo mantendrá las luces encendidas; una vez que deja de detectar personas y la ventana de tiempo se cierra, entonces las luces se apagan.

La firma de esta función es:

Los argumentos y el valor devuelto tienen el mismo significado que los de las funciones xTimerStart() y xTimerStop(), por lo cual no los voy a repetir aquí.

Un punto interesante a destacar, y que facilita la programación, es que si el temporizador está activo, entonces el tiempo se reinicia; pero si el temporizador está inactivo (es decir, en estado Dormant), entonces esta función lo arrancará, como si hubiese sido una llamada a xTimerStart().

El siguiente programa simula el encendido de la luz de fondo de un LCD:

#include <FreeRTOS.h>
#include <task.h>
#include <timers.h>

enum { TMR_ONE_SHOT = pdFALSE, TMR_AUTO_RELOAD = pdTRUE };


TimerHandle_t button_h = NULL;
TimerHandle_t backl_h = NULL;


void button_callback( TimerHandle_t h )
{
   static uint8_t state = 0;
   
   static uint8_t ticks = 0;

   uint8_t pin = (uint8_t)pvTimerGetTimerID( button_h );
   // recuperamos el pin donde está conectado el push-button

   switch( state ) {

   case 0:
      if( digitalRead( pin ) == LOW ){
         state = 1;

         ticks = 5;
         // debouncing de 50 ms (10ms * 5)

      }
   break;

   case 1:
      --ticks;
      if( ticks == 0 ){

         if( digitalRead( pin ) == LOW ){

            digitalWrite( 13, HIGH );
            // encendemos la luz de fondo del LCD

            xTimerReset( backl_h, portMAX_DELAY );
            // si el timer está activo, lo reinicia;
            // en caso contrario lo arranca (como lo haría xTimerStart)
            
            state = 2;
            // esperamos a que deje de presionar el push-button

         } else{ // fue ruido:
            state = 0;
         }
      }
   break;

   case 2:
      if( digitalRead( pin ) != LOW ) state = 0;
   break;

   default:
      state = 0;
   break;
   }
}

void backl_callback( TimerHandle_t h )
{
   digitalWrite( 13, LOW );
   // apagamos la luz de fondo del LCD
}


#define PUSHBUTTON_PIN 2

void setup()
{
   button_h = xTimerCreate(
         "BTN",
         pdMS_TO_TICKS( 10 ),
         TMR_AUTO_RELOAD,
         
         (void*) PUSHBUTTON_PIN, 
         // usamos al campo ID para pasar el número de pin
         
         button_callback );

   backl_h = xTimerCreate(
         "BCKL",               
         pdMS_TO_TICKS( 5000 ),
         TMR_ONE_SHOT,         
         NULL,                 
         backl_callback );     

   pinMode( 13, OUTPUT );
   // emula a la luz de fondo del LCD

   pinMode( PUSHBUTTON_PIN, INPUT );
   // nuestro push-button

   Serial.begin( 115200 );
   // lo usa la tarea principal


   xTimerStart( button_h, 0 );
   // arrancamos al temporizador "maestro"

   vTaskStartScheduler();
}

void loop() 
{
}

La API de FreeRTOS y las interrupciones

En todos los ejemplos que hemos visto a lo largo de este curso hemos estado llamando a las diferentes funciones de FreeRTOS desde nuestras funciones y tareas; sin embargo, en la vida real vamos a querer obtener la misma funcionalidad desde las interrupciones.

Muchas de las funciones de FreeRTOS que hemos visto (y muchas otras que no hemos visto) no deben ser llamadas desde una interrupción, so pena de un mal-funcionamiento del programa. Para esos casos FreeRTOS provee funciones espejo que pueden ser llamadas con toda seguridad desde dentro de una interrupción. Estas funciones espejo tienen la terminación FromISR():

Funciones de la API de los software-timers de FreeRTOS.

En el caso que nos ocupa en esta lección, de la imagen anterior podrás observar diversas funciones con terminación FromISR(). Esas son las que debes llamar si lo necesitas hacer desde dentro de una interrupción (ISR son las siglas de Interrupt Service Routine).

RECUERDA
Cuando tengas que llamar a alguna función de FreeRTOS desde una interrupción, verifica que estás llamando a la versión correcta.

¿Qué sigue?

Los temporizadores (software-timers) son una adición reciente de FreeRTOS que, como pudiste observar, simplifican nuestras aplicaciones gracias a que la administración de los diferentes temporizadores se delega a una tarea central integrada en el sistema operativo (el daemon). Y seamos honestos, mucho de nuestro tiempo de diseño y desarrollo lo invertimos en actividades temporizadas, y lo queramos o no, terminamos escribiendo temporizadores. Cuando nos veamos haciendo esto deberíamos considerar utilizar los software timers tomando en cuenta que si dichas actividades son complejas, escogeríamos las tareas; pero si no son tan complejas, escogeríamos temporizadores.

En esta lección vimos las operaciones más importantes de los temporizadores, pero como viste en la imagen de la API de los mismos, hay muchas más funciones, las cuales puedes revisar por tu cuenta. Sin embargo, con lo que estudiamos es suficiente para que comiences a integrarlos a tus aplicaciones.

RECUERDA (otra vez): Cuando llames a funciones de FreeRTOS desde interrupciones, verifica que estás llamando a la versión correcta. (Debía insistir en este punto.)

En la siguiente lección veremos semáforos binarios y semáforos contadores utilizando las notificaciones directas a las tareas (direct to task notifications). Éstas ya las vimos, pero ahora lo haremos desde el punto de vista de la sincronización de tareas.

Índice del curso

Espero que esta entrada haya sido de tu interés. Si fue así podrías suscribirte a mi blog, o escríbeme a fjrg76 dot hotmail dot com, o comparte esta entrada con alguien que consideres que puede serle de ayuda.


Si encuentras este blog interesante, entonces podrías considerar suscribirte a él y recibir información relevante sobre tecnología y sistemas embebidos, y de vez en cuando, uno que otro regalo.