Variables de condición y monitores, I

Esta entrada la quiero presentar a través de dos ejemplos y algunas preguntas.

Imagina un sistema concurrente o multitarea con sistema operativo, como muchos de los que hemos estado escribiendo. Imagina que tienes una estructura de datos Cola (Queue en inglés) y que es compartida por varias tareas, es decir, dos o más tareas tratarán de escribir o leer de ella. Los problemas que se presentan son:

  1. ¿Qué haces si la cola está vacía y una o más tareas necesitan leer de ella?
  2. ¿Qué haces si la cola está llena y una o más tareas necesitan escribir en ella?
  3. ¿Qué haces si mientras una tarea está escribiendo en la cola otra quiere leer de ella “al mismo tiempo”?

Este otro ejemplo es clásico en el tema que vamos a tratar hoy. Imagina un sistema bancario que puede recibir depósitos y puede entregar dinero en efectivo. Varias personas pueden depositar, mientras que una o más pueden retirar.

  1. ¿Qué haces si dos personas quieren depositar al mismo tiempo?
  2. ¿Qué haces si dos personas quieren retirar al mismo tiempo?
  3. ¿Qué haces si mientras una persona está depositando otra quiere retirar, o viceversa?
  4. ¿Qué haces si quieres retirar una cantidad, pero el saldo no es suficiente?

El punto en común de ambos ejemplos es que hay un recurso compartido que por un lado de ninguna manera debe ser accesado al mismo tiempo, so pena de corromperlo, y por otro lado tiene tiempos de inactividad en que el solicitante debe esperar (no hay datos para desencolar, la cola está llena, saldo insuficiente para cubrir el retiro, etc).

Una solución es que si el recurso está siendo utilizado, los solicitantes se agreguen a una lista de espera y se vayan a dormir. En cuanto el recurso es liberado, uno de los solicitantes en la lista de espera es sacado de la misma y despertado para que use al recurso. A esta solución le vamos a llamar Variable de condición (en inglés, Condition variable).

Y por otro lado tenemos a los monitores. Un monitor es una construcción (en forma de tipo abstracto o clase) que encapsula a un recurso compartido, administra el acceso a éste a través de un semáforo mutex, y presenta un conjunto de operaciones al cliente del mismo. El recurso sólo puede ser utilizado a través de las operaciones públicas, es decir, el recurso no puede ser usado de maneras que no hayan sido previstas; además, es el monitor el que se encarga de la administración del mutex, no el usuario del recurso. Un monitor puede usar o no variables de condición.

En esta primera parte del artículo de hoy voy a mostrarte una implementación de variable de condición para un microcontrolador con núcleo ARM Cortex-M3 y el sistema operativo FreeRTOS. Por supuesto que las ideas expuestas, así como el código, los podrás trasladar a otras arquitecturas y sistemas operativos con un mínimo de esfuerzo. En la segunda parte te mostraré cómo implementar un monitores con y sin variables de condición.

Tabla de contenidos

¿Qué es y qué necesito?

Un semáforo mutex (de aquí en adelante solamente ‘mutex’) puede implementar exclusión mutua simple; esto es, nada más una tarea puede accesar a un recurso compartido (o estar activa dentro de una región crítica) en un tiempo dado. Para que otra tarea accese al recurso, la tarea que tenía el mutex lo debe devolver. Esta situación hace imposible que la tarea que tiene al mutex, y por lo tanto al recurso compartido, sea suspendida y despertarla después cuando una condición se cumpla. (En mi curso de Arduino en tiempo real escribí esta entrada sobre los mutex’es.)

En los ejemplos que dí al principio lo podemos observar: una tarea que quiere escribir a la cola ya tiene el mutex, es decir, ya tiene acceso a la cola, pero ésta está llena, y por lo tanto no se puede escribir en ella, ¿qué debe hacer la tarea que tiene el acceso a la cola y quiere escribir? Puede hacer dos cosas:

  1. Devolver el mutex y salir para después tentar a la suerte e intentar adquirir al mutex (y al recurso) cuando se haya hecho algo de espacio, o
  2. Ser insertada en una cola de espera, irse a dormir, y después ser desencolada de manera automática tan pronto se haya hecho algo de espacio en la cola.

El punto 2 es la definición de una variable de condición: una tarea a la espera del mutex es encolada y suspendida, y cuando la condición (en este ejemplo, hay espacio en la cola) se cumple, la tarea es desencolada, despertada, adquiere el mutex y se pone a trabajar. La cola de espera permite que varias tareas estén suspendidas (dormidas) a la espera del mutex, y cada una lo obtendrá tan pronto la condición se cumpla.

Una variable de condición es un tipo abstracto que presenta, al menos, las siguientes operaciones:

  • CWait(). Si el mutex no está disponible, entonces la tarea que hizo la llamada a esta función se encola y se suspende.
  • CNotify(). Avisa que la condición se cumplió, y por lo tanto, la tarea que lleva más tiempo en la cola de espera es despertada y puesta a trabajar. Si no hubiera ninguna tarea esperando, entonces no se hace nada.

(La letra C en los nombres anteriores es de Condition, pero es lo de menos.)

En su forma más básica los atributos de este tipo abstracto es una estructura de datos Cola, es decir, el primer elemento en haber entrado (el más antiguo) será el primero en salir (en inglés, FIFO: First-in, First-out). Aunque el mutex puede ser parte del mismo, la mayoría de implementaciones que he visto lo manejan de forma externa, es decir, son parte de la lista de argumentos de la función CWait(). Quizás más adelante modifique (o mejor aún, tú modifiques) la clase CondVar para que también provea al mutex.

Así mismo, además de las dos funciones básicas mencionadas, también se podrían incluir las siguientes tres:

  • CWaitFor(). Igual que CWait(), pero si no obtiene al mutex antes de que el tiempo (relativo a cuando se hizo la llamada) establecido expire sale y devuelve el valor false.
  • CWaitUntil(). Igual que CWait(), pero si no obtiene al mutex antes de que el tiempo (absoluto con referencia a una marca de tiempo en el pasado) establecido expire sale y devuelve el valor false.
  • CNotifyAll(). Igual que CNotify(), pero despierta a todas las tareas que estuvieran en la cola de espera.

El ejemplo que te voy a mostrar más adelante incluye las funciones CWait() y CNotify(). Y hablando de ejemplos, antes de ver los pseudo códigos correspondientes creí necesario que supieras que tanto las variables de condición, como los monitores (que veremos en la segunda parte), pueden ser codificados de manera procedimental (en lenguaje C), u orientada a objetos (con clases en lenguaje C++, Java, C#, etc). El ejemplo te lo voy a mostrar en C++ con objetos, pero verás que no tendrás ningún problema es hacerlo en C plano.

Operación CWait

El núcleo de una variable de condición es la operación CWait(), cuyo pseudo código es:

CWait( mutex )
{
    Inserta a la tarea en la cola de espera
    Devuelve el mutex
    La tarea se suspende

    // esta es la instrucción que la tarea ve cuando es despertada:
    Adquiere al mutex
}

Ojalá su implementación fuera tan simple como su pseudo código. Nota que cuando la tarea es despertada lo primero que hace es intentar adquirir al mutex de nuevo (en la línea 4 lo devolvió).

Ahora, entre el momento de que la tarea fue despertada y el momento en que obtuvo al mutex de regreso, muchas cosas pudieron haber sucedido, entre ellas que la condición dejara de cumplirse. Sí, podría pasar. Por ejemplo, si la tarea estaba esperando un espacio para escribir, pero antes de obtener nuevamente el mutex otra tarea llenó la cola, entonces debe volver a esperar. Esta condición se puede manejar así:

Mutex mutex

una_tarea()
{
    Mientras 1 == 1
    {
        Obtiene al mutex

        Mientras la condición no se cumpla
        {
            CWait( mutex )
        }

        // usa al recurso 

        Devuelve al mutex
    }
}

¿Y cómo sabe que la condición se cumplió? ¡Ah! Aquí es donde la función CNotify() entra a escena.

Operación CNotify()

Cuando el código del cliente de la variable de condición detecta que la condición se ha cumplido, entonces manda llamar a CNotify(), cuyo pseudo código es:

CNotify()
{
    Si hay elementos en la cola de espera
    {
        Desencola a la tarea que ha esperado por más tiempo
        La despierta
    }
}

Que una tarea sea despertada no significa que vaya a ser ejecutada inmediatamente; el sistema operativo la pone como candidata a ser ejecutada (es decir, en la lista de tareas Ready) y se ejecutará cuando le corresponda.

¿Cómo se usan ambas funciones, CWait() y CNotify()? Veámoslo en un pseudo código:

una_tarea()
{
    Mientras 1 == 1
    {
        Obtiene al mutex

        Mientras la condición no se cumpla 
        {
            CWait( mutex )
        }

        // usa al recurso 

        Devuelve al mutex
    }
}
    
otra_tarea()
{
    Mientras 1 == 1
    {
        Si la condición se ha cumplido 
        {
            CNotify()
        }
    }
}

La función otra_tarea está revisando el cumplimiento de la condición, y cuando ésta ha sido cumplida, manda llamar a CNotify(), la cual despierta, de existir, a la tarea que más tiempo ha estado esperando. Como la condición ya fue cumplida, el ciclo wait deja de ejecutarse y la tara una_tarea puede accesar al recurso.

Desarrollo

Una vez que ya tenemos una idea de qué son y cómo se usan las variables de condición, es momento de aterrizar los conceptos. Los libros e Internet están llenos de teoría, y me costó mucho trabajo encontrar ejemplos concretos. Por eso, además de realizar la implementación de este tipo de mecanismos, es que quise compartirla contigo por si en algún momento tienes que usarlas.

Función CWait() y compañía

La función CWait() en C/C++, y utilizando al sistema operativo FreeRTOS, queda de la manera siguiente. Como mencioné, si estás utilizando C plano o algún otro sistema operativo me parece que no tendrás problemas en adaptarlo. Al final del artículo te pasaré el repositorio donde podrás descargar el código completo para el microcontrolador LPC1549 de 32 bits, de núcleo Cortex-M3.

   bool CWait( SemaphoreHandle_t& mutex, TickType_t ticks = portMAX_DELAY )
   {
      TaskHandle_t self;

      self = xTaskGetCurrentTaskHandle();
      // obtiene la tarea actual

      taskENTER_CRITICAL();
      {
         // guarda la tarea en la cola de espera:
         configASSERT( this->len < this->max );
         this->queue[ this->tail++ ] = self;
         if( this->tail == this->max ) this->tail = 0;
         ++this->len;

      } taskEXIT_CRITICAL();

      xSemaphoreGive( mutex );
      // devuelve el mutex

      vTaskSuspend( self );
      // la tarea se auto suspende
     
      return xSemaphoreTake( mutex, ticks );
      // intenta reobtener al mutex
   }

Nota que el código que tiene que ver con guardar la tarea (en realidad, su handler o descriptor) en la cola de espera está dentro de una sección crítica. No queremos que sucedan los problemas que mencioné en un principio, es decir, que mientras una tarea está intentando insertarse en la cola, otra, quizás de mayor prioridad, le gane. Usé una sección crítica para que la operación de inserción (y más adelante, la de extracción) fuese atómica, pero también es posible utilizar un mutex dedicado a la variable de condición con el mismo propósito. De hecho muchos libros lo hacen así (en la teoría). En un futuro cambiaré la sección crítica por un mutex dedicado.

CWait() espera de manera indefinida para obtener el mutex. Su argumento ticks indica que espere por siempre (gracias al valor por defecto portMAX_DELAY); esto es como si el argumento no existiera, y así debe ser para esta función. Sin embargo, las funciones CWaitFor() y CWaitUntil() sí que lo usan:

   bool CWaitFor( SemaphoreHandle_t& mutex, TickType_t ticks )
   {
      return CWait( mutex, ticks );
   }

   bool CWaitUntil( SemaphoreHandle_t mutex, TickType_t& last, TickType_t ticks ) 
   {
      TickType_t now = xTaskGetTickCount();

      TickType_t until = last + ticks - now;
      // los overflows se manejan de manera automática

      last = until;

      return CWait( mutex, until );
   }

Si estás familiarizado con la función de FreeRTOS vTaskDelayUntil(), no tendras problema para entender a la función CWaitUntil() que funciona de manera similar: el tiempo establecido es absoluto, es decir, está referido a un punto establecido en el pasado, el cual se actualiza antes de salir de la función. Con esto evitas el desplazamiento de las llamadas en la línea de tiempo. Sin embargo, si tu aplicación no tiene problemas con ello, entonces puedes usar CWaitFor(), la cual establece un tiempo de salida relativo a cuando fue llamada, similar a la función de FreeRTOS vTaskDelay(). En esta lección de mi curso gratuito Arduino en tiempo real explico las diferencias y los usos de ambas funciones, vTaskDelay() y vTaskDelayUntil().

Función CNotify() y compañía

La función CNotify() en C/C++, y utilizando al sistema operativo FreeRTOS, queda de la manera siguiente. Es más fácil que su contraparte CWait(), pero igual de importante, ya que es ésta la que vuelve a la vida a las tareas para que continuen con su trabajo:

   void CNotify()
   {
      TaskHandle_t task = nullptr;

      taskENTER_CRITICAL();
      {
         if( this->len > 0 ){
            task = this->queue[ this->head++ ];
            if( this->head == this->max ) this->head = 0;
            --this->len;
         }
      } taskEXIT_CRITICAL();

      if( task != nullptr ) vTaskResume( task );
   }

RECUERDA: Hablando de la misma variable de condición, una tarea llama a CWait(), mientras que otra debe llamar a CNotify(). En otras palabras, las tareas consumidoras llaman a CWait(), mientras que las tareas productoras llaman a CNotify().

La sección crítica evita que si dos tareas llaman a CNotify() al mismo tiempo interfieran entre sí y corrompan a la cola de espera.

CNotify() despierta a la tarea que ha esperado por más tiempo, de haberla. La función CNotifyAll() despierta a todas:

   void CNotifyAll()
   {
      taskENTER_CRITICAL();
      {
         while( this->len ) CNotify();
      } taskEXIT_CRITICAL();
   }

Ejemplo

Ahora que conocemos qué son, qué hacen y cómo se implementan las variables de condición, ya podemos ver un ejemplo simple. El recurso compartido será una cola de caracteres, y habrá una función put() que escriba en ella y una función get() que lea de ella. Para poder escribir es necesario que haya espacio, y para poder leer es necesario que hayan caracteres disponibles. Para solucionar este problema vamos a usar dos variables de condición, una por cada condición (¡Wow!)

Primero declaramos las dos variables de condición:

ConditionV<4> data_avail;
// Variable de condición que indica que hay caracteres disponibles para leer

ConditionV<4> space_avail;
// Variable de condición que indica que hay espacio disponible en la cola de caracteres

El valor 4 indica el máximo número de tareas que la cola de espera de la variable de condición podrá guardar. En tu aplicación tú decides si son más o son menos.

La función put() queda así:

void put( char c )
{
   xSemaphoreTake( mutex, portMAX_DELAY );
   // adquiere al recurso

   while( len >= max ) space_avail.CWait( mutex );
   // la tarea se bloquea si la cola de caracteres está llena
   // la función get() realiza la notificación

   // escribe un carácter en la cola:
   buf[ tail++ ] = c;
   if( tail >= max ) tail = 0;
   ++len;

   data_avail.CNotify();
   // avisa que hay al menos un carácter listo para ser leído,
   // la usa la función get()

   xSemaphoreGive( mutex );
   // devuelve al recurso
}

La función get() es:

char get()
{
   xSemaphoreTake( mutex, portMAX_DELAY );
   // adquiere al recurso

   while( len == 0 ) data_avail.CWait( mutex );
   // la tarea se bloquea si la cola de caracteres está vacía
   // la función put() realiza la notificación

   // lee un carácter de la cola:
   char c = buf[ head++ ];
   if( head >= max ) head = 0;
   --len;

   space_avail.CNotify();
   // avisa que hay al menos un espacio para escribir,
   // la usa la función put()

   xSemaphoreGive( mutex );
   // devuelve al recurso

   return c;
   // devuelve al carácter
}

Las tareas productoras y consumidoras son las siguientes (en realidad es una de cada una, pero las reutilicé) :

void producer( void* pvParameters )
{
   uint32_t offset = (uint32_t)pvParameters;
   char cont = 0;

   char* task_name = pcTaskGetName( NULL );

   char letter = offset;

   while( 1 )
   {
      put( letter++ );
      // intenta escribir un carácter en la cola

      ++cont;
      if( cont == 10 ){
         cont = 0;
         letter = offset;
      }

      vTaskDelay( pdMS_TO_TICKS( ( rand() % 50 ) + 20 ) );
   }
}

void consumer( void* pvParameters )
{
   char* task_name = pcTaskGetName( NULL );

   while( 1 )
   {
      char c = get();
      // intenta leer un carácter de la cola

      vTaskDelay( pdMS_TO_TICKS( ( rand() % 150 ) + 25 ) );
   }
}

Quizás notaste que los tiempos que ambas tareas establecen para irse dormir (y así liberar la CPU) son aleatorios. Esto no es requisito para las variables de condición. Lo hice así con el fin de probar el código en condiciones cercanas a la realidad.

Para completar el ejemplo veamos la creación de las tareas:

   xTaskCreate( producer, "P0", 128,  (void*) 'A', tskIDLE_PRIORITY + 1, NULL ); // prints out: ABC...J
   xTaskCreate( producer, "P1", 128,  (void*) 'a', tskIDLE_PRIORITY + 1, NULL ); // prints out: abc...j
   xTaskCreate( producer, "P2", 128,  (void*) '0', tskIDLE_PRIORITY + 1, NULL ); // prints out: 012...9

   xTaskCreate( consumer, "C0", 128, NULL, tskIDLE_PRIORITY, NULL );
   xTaskCreate( consumer, "C1", 128, NULL, tskIDLE_PRIORITY, NULL )

Código fuente

El código fuente completo de este artículo lo puedes descargar o clonar desde este commit del repositorio del proyecto; pero si quieres obtener la última versión completa, incluyendo al monitor (tema de la segunda parte), lo puedes hacer desde aquí.

¿Qué sigue?

Hoy vimos qué son y cómo se implementan las variables de condición, herramientas muy útiles en la programación multi tarea.

Utilicé secciones críticas en las funciones CWait() y CNotify(). Una sección crítica detiene a todo el sistema porque desactiva las interrupciones. Una mejora, como lo mencioné, sería agregar un mutex dedicado a la variable de condición en lugar de las secciones críticas. Con esto logramos que el sistema siga funcionando independiente de las variables de condición.

Otra modificación muy interesante de cara a los sistemas de tiempo real es cambiar la cola de espera, que es circular, por una cola de espera doble, o una cola de prioridad, de tal manera que las tareas con prioridad alta sean las primeras en ser despachadas. No es fácil tomar la decisión ya que las tareas de alta prioridad podrían quedarse con la CPU y nunca prestársela a las tareas con baja prioridad que estuvieran esperando por el recurso. De cualquier manera vale la pena realizar el análisis y la modificación de la misma. O en su defecto, tener dos versiones de variables de condición: modo “normal” y modo “tiempo real”.

En la segunda parte abordaré el tema de los monitores, los cuales, como también mencioné, pueden o no usar variables de condición.