Introduzione ai sistemi operativi con UNIX
Prec 6. La comunicazione tra i processi Succ

6.2 Il problema del produttore-consumatore

Nel paragrafo precedente abbiamo visto un semplice problema di competizione: l'incremento di una variabile condivisa che conta il numero di esecuzioni di uno stesso programma. Abbiamo individuato il problema di competizione e la sua soluzione più ovvia: si possono perdere dei conteggi se l'istruzione di aggiornamento non è atomica e non interrompibile.

Abbiamo però osservato che rendere atomiche tutte le sezioni critiche può dar luogo ad inefficienza, perché di fatto si può disattivare per lungo tempo il meccanismo della multiprogrammazione.

La soluzione migliore consiste nell'utilizzare uno strumento inventato dall'olandese Dijkstra nei lontani anni '60: i semafori. I semafori, se usati correttamente dal programmatore, operano la sincronizzazione dei processi in modo efficiente. Per sincronizzazione si intende in generale porre dei vincoli temporali (e possibilmente solo quelli strettamente necessari) nella esecuzione delle operazioni dei processi per evitare scorrette computazioni. Per competizione si intende l'accesso da parte dei processi ad una qualunque risorsa comune che non si escluda mutuamente nel tempo, che avvenga in modo concorrente e sovrapposto.

Un semaforo (generico) è una variabile intera positiva o nulla, inizializzata con un valore iniziale a scelta (ma sempre maggiore o uguale a 0); poi c'è una coda dei descrittori dei processi (o puntatori a descrittori dei processi) che sono sospesi (sono nello stato wait dei tre stati principali di un processo) o come si dice con paragone automobilistico, sono in coda sul semaforo.

Un semaforo viene messo a protezione della risorsa comune e garantisce che i processi accedano alla risorsa in maniera mutuamente esclusiva, qualunque sia la loro velocità relativa.

Sui semafori si agisce solamente usando due primitive, la wait() e la signal(). La wait() può essere sospensiva o meno per il processo che la esegue a seconda del valore del semaforo, la signal() invece non è mai sospensiva, non può mai alterare lo stato del processo che la invoca, ma può solo risvegliare uno dei processi in coda.

Il semaforo stesso rappresenta una variabile di memoria condivisa tra due o più processi. La wait() e la signal() rappresentano sezioni critiche per il semaforo che è condiviso. Questo significa che durante una wait() o una signal() nessun altro processo può accedere al semaforo fino a che l'operazione è stata completata. Nel caso più comune in cui vi sia una sola CPU (macchine monoprocessore), l'atomicità della wait() e della signal() si ottiene disabilitando le interruzioni durante la loro esecuzione:

void wait(struct semaphore* sem)
{ disabilita le interruzioni
  if (sem->s==0)
  { sem->q=puntatore al descrittore del processo corrente;
    sospendi il processo corrente (ponilo nello stato wait);
  }
  else
    (sem->s)--;
  riabilita le interruzioni
}

void signal(struct semaphore* sem)
{ disabilita le interruzioni
  if (sem->q!=NULL)
  { poni nello stato ready il processo il cui descrittore è sem->q;
    sem->q=NULL;
  }
  else
    (sem->s)++;
  riabilita le interruzioni
}

È facile verificare che se le primitive wait() e signal() non fossero non interrompibili, il semaforo non potrebbe assicurare la sincronizzazione e non servirebbe a niente. Ad esempio potrebbe verificarsi che un processo esegua la wait(), questa trova che il semaforo è 1, ma poi il processo viene sospeso e non fa in tempo a decrementare il valore del semaforo (azione else della wait()). Supponiamo subentri un altro processo che pure esegue la wait(): trovando che il semaforo è 1, lo pone a zero e non si sospende, e comincia ad eseguire la sezione critica. Prima che abbia finito viene interrotto e riprende ad eseguire il primo processo, che pone il semaforo a -1 (un valore negativo, è già un errore secondo la nostra definizione di semaforo, ma stiamo a vedere cosa accade di realmente sbagliato) e poi esegue anche lui la sua sezione critica. Ecco che l'esecuzione delle sezioni critiche si è sovrapposta. Questo dovrebbe convincervi del perché la wait() e la signal() devono essere atomiche.

È ovvio che per trattare il problema della mutua esclusione quando i processi sono più di due, occorre estendere la definizione di semaforo: invece di un solo descrittore di un processo in coda occorre che il semaforo sia in grado di tenere una lista di descrittori di processi in coda, perché se vi sono più di due processi, più di un processo può essere bloccato sul semaforo.

Se ci sono più processi in coda al semaforo solitamente la signal() ne risveglia uno secondo una politica FIFO (First-In, First-Out): il primo bloccato è il primo ad essere risvegliato, perché è anche quello che sta aspettando da più tempo. Questo è il motivo per cui di solito si sceglie di trattare con una politica FIFO, anziché LIFO (cioè come uno stack), la lista dei processi in coda associata al semaforo.

In signal() la condiz. (sem->q!=NULL) significa che c'è un processo in coda al semaforo. Se è vera il processo viene estratto dalla coda e il suo stato viene cambiato da bloccato a pronto. L'incremento di s significa che la risorsa si è liberata e in generale registra il fatto che una sezione critica ha terminato l'esecuzione.

Un semaforo che assuma solo i valori 0,1 viene detto semaforo binario. Per risolvere il problema di mutua esclusione tra due o più processi concorrenti è sufficiente usare un semaforo binario, perché la risorsa ha solo due stati (libera o occupata). Occorre semplicemente inizializzare il semaforo ad 1 (risorsa libera) e fare un wrapping tramite la wait() e la signal() di ogni sezione critica. Questo naturalmente produce un certo overhead non nullo, vale a dire c'è del tempo che viene sottratto alla esecuzione dei processi per eseguire la wait() e la signal(), ma ciò è indispensabile per garantire una corretta esecuzione.

I semafori sono un meccanismo generale molto potente per risolvere i problemi di sincronizzazione e permettono di risolvere anche problemi più complicati della semplice competizione.

Quest'ultimi sono i problemi di comunicazione, in cui oltre alla mutua esclusione occorre porre un vincolo di ordinamento sulle operazioni che devono quindi seguire una prefissata sequenza (ad esempio nel caso del problema del produttore-consumatore che ora descriveremo, il vincolo di ordinamento è che prima produco, poi consumo, ecc.).

Nei problemi di competizione invece non è rilevante l'ordine in cui le sezioni critiche sono eseguite (nel problema dell'incremento della variabile condivisa ad esempio è evidente che ciò non è rilevante, il risultato finale è lo stesso).

Il problema del buffer condiviso, detto anche problema del produttore e del consumatore, che descriveremo dapprima nella sua forma più semplice, cioè con un buffer costituito da una sola variabile, è un classico problema di comunicazione tra processi. Supponete di avere un buffer che può contenere una stringa lunga ad es. al più 512 caratteri e un processo che scrive solamente una stringa in questo buffer (il produttore), mentre un altro processo concorrente legge solamente il contenuto del buffer (il consumatore).

Se questi processi così interagenti girano concorrentemente senza alcuna misura di sincronizzazione succederà quasi sicuramente un disastro, a meno che non si è proprio fortunati e si verifica una sequenza di elaborazione salva.

C'è un primo problema che è di competizione: il produttore avrà un ciclo di scrittura carattere per carattere nel buffer e similmente il consumatore avrà un ciclo di lettura. Questi due cicli devono essere delle sezioni critiche, altrimenti può accadere, ad esempio, che il produttore si interrompa senza aver finito di scrivere tutta la stringa e che entri poi in gioco il consumatore, che legge una stringa formata da un pezzo iniziale che è la nuova stringa prodotta, seguita da una coda rappresentata dal vecchio contenuto del buffer (si dice che il buffer non è consistente). Questo problema di competizione può essere risolto con un semaforo mutex (cioè un semaforo binario di mutua esclusione, quelli di cui abbiamo parlato finora).

Ma questo non basta: nonostante il semaforo mutex può ancora capitare che il consumatore prelevi due volte lo stesso messaggio o che il produttore sovrascriva un messaggio non ancora consumato con un nuovo messaggio prodotto. Infatti il produttore ha un ciclo del tipo (produci messaggio; inserisci messaggio) e il consumatore ha un ciclo del tipo (preleva messaggio;consuma messaggio), che si ripetono indefinitamente. Può anche succedere che parta il consumatore a prelevare un messaggio prima ancora che il produttore abbia prodotto qualcosa, sicché il consumatore preleva erroneamente il contenuto iniziale del buffer credendolo un messaggio prodotto.

Nel problema del produttore-consumatore generico il buffer è un vettore e può contenere al più N messaggi, tutti dello stesso tipo (ad esempio numero o stringa) e invece di avere un solo processo produttore e un solo processo consumatore, in generale ci sono n processi produttori uguali e m processi consumatori uguali, tuttavia questo non complica la soluzione. Di solito i buffer vengono gestiti come code FIFO circolari e rappresentati tramite array, per consentire un completo utilizzo dell'array evitando costose operazioni di shifting. La seguente è una semplice libreria di funzioni C che permette di gestire un array come buffer circolare:

/* buffer.h */

typedef int buftype; /* buffer element type */

#define N 1024 /* maximum buffer size */

int deposit(buftype x);

int fetch(buftype* x);
/* buffer.c */

#include "buffer.h"

static buftype buf[N]; /* circular buffer */

static int in=0,  /* next location to be written into */
           out=0, /* next location to be read from */
           n=0;   /* number of elements current in the buffer
                     invariant condition: 0<=n<=N */

int deposit(buftype x)
{ if (n==N)
    return -1; /* buffer full */
  n++; /* increase buffer size */
  buf[in]=x;
  in=(in+1) % N;
  return n;
}

int fetch(buftype* x)
{ if (n==0)
    return -1; /* buffer empty */
  n--; /* decrease buffer size */
  *x=buf[out];
  out=(out+1) % N;
  return n;
}

Un programma di test interattivo per questa libreria, che implementa un semplicissimo linguaggio per la manipolazione di code di interi è il seguente:

/* buffertest.c */
#include <stdio.h>
#include <ctype.h>
#include "buffer.h"

int main(int argc, char **argv)
{ buftype x;
  char cmd;

  do
  { if (scanf(" %c", &cmd)!=1)
      break;
    if ((cmd=tolower(cmd))=='q')
      break;

    switch(cmd)
    { case 'h':
        puts("circular buffer");
        puts("(d)eposit");
        puts("(f)etch");
        puts("(h)elp");
        puts("(q)uit");
      break;
 
      case 'd':
        if (scanf("%i", &x)!=1)
          fprintf(stderr, "%s: integer expected after `%c'\n", *argv, cmd);
        else if (deposit(x)==-1)
          fprintf(stderr, "%s: buffer full; cannot deposit `%i'\n", *argv, x);
      break;

      case 'f':
        if (fetch(&x)==-1)
          fprintf(stderr, "%s: buffer empty on fetch\n", *argv);
        else
          printf("%i\n", x);
      break;

      default:
        fprintf(stderr, "%s: command `%c' unknown\n", *argv, cmd);
      break;
    }
  } while (1);
  exit(0);
}

Finchè il buffer non viene condiviso tra più processi, ma le funzioni deposit() e fetch() sono attivate da un singolo processo, che una volta agisce da produttore e un'altra da consumatore, come fa buffertest.c non ci sono problemi. Ma se il buffer circolare viene condiviso da processi concorrenti si verifica il problema di sincronizzazione del produttore-consumatore.

Notate che in quest'ultimo caso non è più un errore per il produttore se la deposit restituisce -1 (che indica che il buffer è pieno). Piuttosto che visualizzare un fastidioso messaggio di errore ripetutamente (ripetutamente perchè il produttore ha un ciclo infinito), il tentativo di inserire in un buffer pieno deve essere considerato del tutto legittimo e in tal caso il produttore deve bloccarsi, andare in stato di wait fino a che non si verifica la condizione di guardia dell'operazione di deposit() che è n<N (buffer non pieno). Quindi l'operazione di inserimento in un buffer pieno non deve essere annullata con errore, bensì solo ritardata fino al momento in cui si libera almeno una posizione nel buffer.

Il vincolo di sincronizzazione duale del precedente è che il consumatore non può prelevare un messaggio dal buffer se questo è vuoto e deve sospendersi, fino al momento in cui un processo produttore inserirà almeno un elemento nel buffer. Anche in questo caso il tentativo di prelevare un messaggio quando il buffer è vuoto non si deve risolvere nella stampa di un messaggio di errore e/o la terminazione prematura del programma, altrimenti i due processi non riuscirebbero a cooperare.

Osservate anche che se non ci fossero le condizioni di guardia n<N per la deposit() e n>0 per la fetch() si andrebbe fuori dai limiti dell'array distruggendo l'organizzazione della memoria del programma e rischiando accessi illegali alla memoria che, come vedremo meglio in seguito in UNIX provocano un'eccezione a runtime o segnale di interrupt detto SIGSEGV, con conseguente terminazione del processo che lo riceve e la creazione di una immagine di memoria salvata su disco del processo impazzito al momento del disastro per consentire più facilmente il debugging del problema da parte del programmatore.

A questo punto la questione è: come imporre i necessari vincoli di sincronizzazione? Scegliamo di imporli dai programmi produttore e consumatore, senza modificare la libreria buffer. Abbiamo già visto che possiamo usare un semaforo di mutua esclusione per assicurare che non si verifichino problemi di competizione nell'accesso al buffer. Ecco un esempio di competizione tra due produttori che stanno facendo una deposit() dei valori 5 e 6, per semplicità supponendo che N sia almeno 2 e il buffer inizialmente vuoto:

processo istruzione n, in, out, buf prima n, in, out, buf dopo
1 n++ n=0, in=0, out=0, buf[0]=? n=1, in=0, out=0, buf[0]=?
1 buf[in]=5 n=1, in=0, out=0, buf[0]=? n=1, in=0, out=0, buf[0]=5
2 n++ n=1, in=0, out=0, buf[0]=5 n=2, in=0, out=0, buf[0]=5
2 buf[in]=6 n=2, in=0, out=0, buf[0]=5 n=2, in=0, out=0, buf[0]=6

Per risolvere questo tipo di problemi, scriveremo il produttore e il consumatore in questo modo, tenendo conto che deposit() e fetch() sono sezioni critiche:

struct semaphore mutex={1, NULL};

void producer(void)
{ buftype message;

  while (1)
  { /* ... message production ... */
    wait(&mutex);
    deposit(message);
    signal(&mutex);
  }
}

void consumer(void)
{ buftype message;

  while (1)
  { wait(&mutex);
    fetch(&message);
    signal(&mutex);
    /* ... message consuming ... */
  }
}

Come abbiamo visto quando abbiamo spiegato i semafori binari, l'idea è la seguente: occorre che il processo produttore, se la risorsa buffer è occupata, aspetti che la risorsa si liberi. Se invece la risorsa è libera la wait() non è bloccante e il processo produttore che la esegue occupa subito la risorsa. Quando il processo produttore ha terminato di lavorare sulla risorsa (la deposit() è terminata), deve segnalare il fatto che la risorsa si è resa disponibile e per questa ragione deve chiamare la signal(), per risvegliare un eventuale altro processo che si era bloccato al semaforo cercando di usare quella risorsa (e in tal caso lo stato della risorsa rimane occupato), o, se non ci sono processi bloccati, per settare come libero lo stato della risorsa. Questo è proprio il comportamento della wait() e della signal() che abbiamo visto.

Analogamente lavora il semaforo binario per il processo consumatore. Notate anche che inizializziamo il semaforo mutex al valore 1: per un problema di mutua esclusione il valore iniziale del semaforo è sempre 1, che sta ad indicare "risorsa condivisa inizialmente libera".

Ma come facciamo per risolvere i problemi di cooperazione? La soluzione probabilmente più semplice da pensare è la seguente:

struct semaphore mutex={1, NULL};

void producer(void)
{ buftype message;
  int ret;

  while (1)
  { /* ... message production ... */
    do
    { wait(&mutex);
      ret=deposit(message);
      signal(&mutex);
    } while (ret!=-1);
  }
}

void consumer(void)
{ buftype message;
  int ret;

  while (1)
  { do
    { wait(&mutex);
      ret=fetch(&message);
      signal(&mutex);
    } while (ret!=-1);
    /* ... message consuming ... */
  }
}

Il busy waiting o polling non è però raccomandabile perché riduce le prestazioni. I processi devono chiedere al sistema operativo di essere sospesi invece di usare dei propri cicli di attesa che impegnano inutilmente la CPU (che invece potrebbe essere assegnata ad altri processi utili).

Siccome la wait() sospende il processo chiamante se necessario, mentre la signal() risveglia un processo sospeso se necessario, e siccome sospensione e risveglio sono le azioni che ci servono, è possibile utilizzare dei semafori per risolvere anche il problema di cooperazione? La risposta è affermativa, ma non si tratta più di semafori binari.

Per imporre il vincolo di sincronizzazione cooperativa sui produttori (che ricordo era che il produttore non può inserire se il buffer è pieno), se guardate come è fatto il codice della wait(), viene naturale introdurre un semaforo condiviso chiamato empty: il valore di questo semaforo indica il numero di posizioni libere nel buffer. Siccome inizialmente il buffer è vuoto, allora il semaforo empty deve essere inizializzato con il valore di N (N è il numero di risorse libere inizialmente, ogni risorsa essendo una locazione del buffer e quindi coincide con la dimensione del buffer). Il produttore, prima di acquisire in modo mutuamente esclusivo la risorsa buffer ed effettuare l'inserimento, esegue la wait() sul semaforo empty per tentare di acquisire una posizione vuota del buffer su cui effettuare l'inserimento o bloccarsi se il buffer è pieno. La signal() sul semaforo empty viene eseguita dal consumatore quando questo ha prelevato un messaggio, per segnalare che si è liberata una risorsa (un elemento del buffer) oppure per risvegliare un processo produttore bloccato per mancanza di elementi liberi nel buffer circolare:

struct semaphore mutex={1, NULL}, empty={N, NULL};

void producer(void)
{ buftype message;

  while (1)
  { /* ... message production ... */
    wait(&empty);
    wait(&mutex);
    deposit(message);
    signal(&mutex);
  }
}

void consumer(void)
{ buftype message;

  while (1)
  { wait(&mutex);
    fetch(&message);
    signal(&mutex);
    signal(&empty);
    /* ... message consuming ... */
  }
}

Il codice precedente non risolve completamente il problema di cooperazione. Occorre naturalmente imporre il vincolo di sincronizzazione sui consumatori (un consumatore non può prelevare se il buffer è vuoto). Questo si impone in modo duale al precedente, introducendo un semaforo che chiamiamo full, il quale col suo valore indica il numero di posizioni occupate nel buffer e quindi inizialmente va inizializzato a zero (supposto il buffer inizialmente vuoto). Il consumatore esegue una wait() su full prima di acquisire il buffer per il prelievo di un elemento. La wait() su full risulterà bloccante per il consumatore se il buffer è vuoto (full vale 0), altrimenti (quando full vale almeno 1) il valore di full viene decrementato di uno per indicare che il processo che ha eseguito la wait() ha acquisito una risorsa (rappresentata da una posizione occupata nel buffer), che si appresterà a consumare. La signal() sul semaforo full viene eseguita invece dal produttore dopo un inserimento, per indicare che vi è una risorsa in più (una posizione piena in più nel buffer) a disposizione di un futuro consumatore oppure per risvegliare un consumatore che si era bloccato perchè non vi erano posizioni libere, ossia elementi da prelevare nel buffer.

La soluzione completa al problema del produttore-comsumatore è quindi la seguente (finalmente!):

struct semaphore mutex={1, NULL}, empty={N, NULL}, full={0, NULL};

void producer(void)
{ buftype message;

  while (1)
  { /* ... message production ... */
    wait(&empty);
    wait(&mutex);
    deposit(message);
    signal(&mutex);
    signal(&full);
  }
}

void consumer(void)
{ buftype message;

  while (1)
  { wait(&full);
    wait(&mutex);
    fetch(&message);
    signal(&mutex);
    signal(&empty);
    /* ... message consuming ... */
  }
}

Quindi abbiamo bisogno di ben tre semafori differenti per risolvere completamente il problema del produttore-consumatore! Infatti abbiamo tre vincoli di sincronizzazione ed usiamo un semaforo separato per soddisfare ciascun vincolo. Uno è un semaforo binario che serve per evitare la competizione tra i processi nell'accesso al buffer ed assume quindi solo i valori 0 e 1. Gli altri due sono semafori (N+1)-ari: il loro valore è un invariante rispetto alla condizione 0≤valore≤N, cioè è sempre compreso tra 0 ed N (estremi inclusi), proprio come la variabile privata n nella libreria buffer.c.

Notate che non è necessario che un processo sia solo consumatore o solo produttore: possiamo avere alcuni processi che a volte agiscono da consumatori e a volte da produttori, purchè utilizzino in entrambi i casi correttamente i tre semafori. Ovviamente se sono tutti produttori e non c'è alcun processo consumatore, o viceversa, la comunicazione perde di significato.


Prec Indice Succ
Problema della variabile condivisa Livello superiore Riepilogo sull'uso dei semafori