Interestingly, the producer-consumer code presented previously
works if there is
only one producer and
one consumer thread. However, if there are multiple producers or multiple
consumers, this code fails. For example, if two producers try to produce
to a full queue, both of them may start waiting on not_full
.
If a consumer consumes one element, and calls notify
, both
producers will wakeup. Each of them will, in turn, try to acquire
qlock
and produce an element. However, the consumer only
consumed one element, and the two producers will produce two elements,
causing a queue overflow!
The problem is that the producer threads, do not check the condition
after waking up from a wait()
, and simply assume that the
condition is true. This problem can be fixed by requiring the threads
to check the condition again after returning from wait
. This
can be done by replacing the if
condition with a while
condition, as follows:
char queue[MAX]; //global int head = 0, tail = 0; //global struct cv not_full, not_empty; struct lock qlock; void produce(char data) { acquire(&qlock); while ((head + 1) % MAX == tail) { wait(¬_full, &qlock); } queue[head] = data; head = (head + 1) % MAX; notify(¬_full); release(&qlock); } char consume(void) { acquire(&qlock); while (tail == head) { wait(¬_empty, &qlock); } e = queue[tail]; tail = (tail + 1) % MAX; notify(¬_empty); release(&qlock); return e; }A thread that has just woken up from
wait()
will again
check the condition (because of the while
construct). If
the condition is true, it will go back to sleep (by another call
to wait()
). If the condition is false, it will fall through
and perform its operation. This code is correct, even for multiple
producers and multiple consumers.
In general, usage of condition variables has the following pattern:
struct lock mutex; struct cv condition_variable; acquire(&mutex); while (!condition) { wait(&condition_variable, &mutex); } ... release(&mutex);Here, the
condition_variable
is associated with the
condition
becoming true. The mutex variable is a lock used
for mutual exclusion.
So far, we have studied locks, which are used for mutual exclusion, and condition variables, that are associated with conditions. Semaphores are usually used for "counting". Here are the semantics of a semaphore.
struct sema;
) and
three functions:
void sema_init(struct sema *, int val); void P(struct sema *); // also called wait() void V(struct sema *); // also called signal()The names 'P' and 'V' stand for the dutch names of these operations. The semaphore type maintains an integer state, called
count
, which
is a non-negative counter.
The sema_init
function initializes the counter to val
,
which must be a non-negative integer.
The P
function
decrements the counter by one, if it is strictly positive. If the counter
is 0 (before the decrement operation), then the P
function
waits for the counter to become
positive (greater than zero), before decrementing it.
The V
function increments the counter by one. Also, it
wakes up one process that is waiting on the semaphore to become positive.
Most importantly, all these operations, sema_init
,
P
, and V
are atomic with respect to each other.
Here is a sample implementation of a semaphore using locks and condition variables:
struct sema { int count; struct lock lock; struct cv cv; }; void sema_init(struct sema *sema, int val) { sema->count = val; lock_init(&sema->lock); cv_init(&sema->cv); } void P(struct sema *sema) { acquire(&sema->lock); while (sema->count == 0) { wait(&sema->cv, &sema->lock); } sema->count--; release(&sema->lock); } void V(struct sema *sema) { acquire(&sema->lock); sema->count++; if (sema->count == 1) { notify(&sema->cv); } release(&sema->lock); }
Notice that a semaphore maintains state. This is unlike a condition
variable which maintains no state. For condition
variables, the effect of a notify()
operation is to wakeup any currently waiting threads. If a thread has
not yet gone to sleep, notify()
will have no effect. On
the other hand, the V()
operation (that is somewhat analogous
to notify()
) will increment the counter, even if no thread
is currently waiting inside the P()
operation (which is
somewhat analogous to wait()
). Let's see the implications
of this using concrete examples.
nchars
), and another for the
number of empty slots in the queue (nholes
). We use
one semaphore for each counter, as follows.
char q[MAX]; int head = 0, tail = 0; struct sema nchars, nholes; struct lock mutex; sema_init(&nchars, 0); sema_init(&nholes, MAX); void produce(char c) { P(&nholes); acquire(&mutex); //produce an element. release(&mutex); V(&nchars); } char consume(void) { P(&nchars); acquire(&mutex); //consume an element. release(&mutex); V(&nholes); }Notice that this code is much simpler, as it subsumes much of the counting and waiting logic within
P()
and V()
functions.
Because this pattern is common, semaphores are a convenient abstraction.
Here is an example where multiple threads may call the print()
function, but you want to restrict the maximum number of concurrent
prints to N.
sema_init(&nprinters, N); void print(void) { P(&nprinters); //print command V(&nprinters); }This will ensure that the system has at most N simultaneous print requests. Also, as print requests get completed, they allow other print requests to be admitted.
struct lock { struct sema sema; }; void lock_init(struct lock *l) { sema_init(&l->mutex, 1); } void acquire(struct lock *l) { P(&l->sema); } void release(struct lock *l) { V(&l->sema); }
x
, a second
thread computes y
, a third thread computes
z = f(x, y)
,
and finally a fourth thread prints z
. You may want to enforce
an execution order (schedule) such that the computation of z
happens only after the computation of x
and y
.
Similarly, the print statement should execute only after the computation
of z
. This can be achieved by using semaphores sx
,
sy
, and sz
, initialized to zero. A one value in
these semaphore counters will be used to indicate that the corresponding
value has been computed.
thread1() { .... x = ...; V(sx); } thread2() { .... y = ...; V(sy); } thread3() { .... P(sx); P(sy); z = f(x, y); V(sz); } thread4() { P(sz); print z; }
Because concurrent programs are quite common, some languages provide intrinsic support for handling concurrency. One example of such support is a monitor. A monitor is defined as a type construct (e.g., class), which may declare private shared-memory objects and routines that access those objects. The monitor enforces mutual exclusion between the monitor routines, by using an "implicit lock". Here is an example of the producer-consumer queue implemented as a monitor:
monitor Q { char buf[MAX]; int head = 0, tail = 0; void produce(char c) { while ((head + 1) % MAX == 0) { wait(¬_full); } q[head] = c; head = (head + 1) % MAX; notify(¬_empty); } char consume(void) { ... } }Notice that because the monitor guarantees mutual exclusion between its routines (only one thread can be inside a monitor routine at any time), we did not have to use a mutex ourselves. Instead, the compiler will implicitly generate a mutex for us. One way to implement monitors, is for the compiler to instrument each function with
acquire()
and release()
calls on the implicit lock, before and after
the routine respectively. Also, notice that the wait()
function does not need the second argument; instead the second argument
is automatically understood to be the monitor's implicit lock, i.e., a thread
calling wait() will internally release the monitor's implicit lock
before going to sleep.
Monitors make it simpler to write concurrent programs, and avoid common bugs like unbracketed acquires or releases. In Java, the "synchronized" keyword can be used to specify a monitor.