Mutexes are very good for preventing multiple threads from executing a critical section concurrently. However, they don't allow for more detailed control over when threads actually execute.
Sometimes, we want threads to execute a piece of code in a specific order. Other times we want particular threads to communicate with each other.
Another communication construct, the semaphore, can help in these situations.
Say we would like to have a critical section that is executed by threads in order. Some times we may want this:
The following program contains a call to printf that can be executed in any thread order:
/* the function called for each thread */
void* worker(void* idp) {
/* get our thread id */
int id = * (int*) idp;
printf("Thread %d says hello!\n", id);
return NULL;
}
Using a mutex makes no difference here:
/* the function called for each thread */
void* worker(void* idp) {
/* get our thread id */
int id = * (int*) idp;
pthread_mutex_lock(&mutex);
printf("Thread %d says hello!\n", id);
pthread_mutex_unlock(&mutex);
return NULL;
}
If we want to enforce that a critical section is executed in thread order, we can use busy waiting where each thread waits its turn based off a global variable:
/* whose turn it is */
int turn = 0;
/* the function called for each thread */
void* worker(void* idp) {
/* get our thread id */
int id = * (int*) idp;
/* busy wait until it is our turn */
while (turn != id)
;
/* our turn! */
printf("Thread %d says hello!\n", id);
/* next thread's turn */
turn++;
return NULL;
}
What is the down side of this?
If we compile the above program normally, it should work correctly. However, if we compile it with optimizations:
$ gcc busy-wait.c -O3 -pthread
It may no longer work!
This is because the compiler has made assumptions about our code that are violated by the presence of multiple threads. From the compiler's point of view this code:
/* busy wait until it is our turn */
while (turn != id)
;
Is either a no-op or an infinite loop.
If we want some optimizations applied, but not ones that will break our program, we should declare any variables that can be changed across threads as volatile:
volatile int turn = 0;
With that change, the program should work as expected, even with optimizations turned on.
Another solution to this problem is the semaphore. Semaphores are not part of Pthreads. However, they are part of the POSIX standard.
A semaphore is similar to a mutex except for the following:
To use semaphores, we need:
#include <semaphore.h>
Semaphores are created by declaring a "sem_t" object:
sem_t semaphore;
They are initialized with sem_init:
int sem_init(sem_t* semaphore, int shared, unsigned value);
Instead of lock and unlock, semaphores support "wait" and "post":
int sem_wait(sem_t* semaphore);
int sem_post(sem_t* semaphore);
wait blocks if the semaphore value is 0. When the semaphore becomes positive, it unblocks, and then decrements the semaphore value.
post increments the semaphore value.
The thread ordering can be handled efficiently with semaphores. We create one semaphore for each thread, and initialize them to 0 (locked).
Each thread waits for its semaphore to become unlocked, does its job, then unlocks the semaphore of the next thread in the sequence.
main unlocks the semaphore of thread 0 to start.
Together, this scheme orders the threads predictably:
/* a semaphore for each thread */
sem_t* semaphores;
/* the function called for each thread */
void* worker(void* idp) {
/* get our thread id */
int id = * (int*) idp;
/* wait for our semaphore to become unlocked */
sem_wait(&semaphores[id]);
/* do our work */
printf("Thread %d says hello!\n", id);
/* unlock the semaphore belonging to the next thread (unless last) */
if (id < (num_threads - 1)) {
sem_post(&semaphores[id + 1]);
}
return NULL;
}
int main (int argc, char** argv) {
/* create the semaphores */
sem_t s[num_threads];
semaphores = s;
/* initialize the semaphores to 0 */
for (int i = 0; i < num_threads; i++) {
sem_init(&semaphores[i], 0, 0);
}
/* unlock thread 0 */
sem_post(&semaphores[0]);
/* an array of threads */
pthread_t threads[num_threads];
int ids[num_threads];
/* spawn all threads */
for (i = 0; i < num_threads; i++) {
ids[i] = i;
pthread_create(&threads[i], NULL, worker, &ids[i]);
}
/* join all threads */
for (i = 0; i < num_threads; i++) {
pthread_join(threads[i], NULL);
}
return 0;
}
Semaphores can also be used to pass messages between threads by id. The semaphore is the signal from one thread to another that the message (stored in some global structure) is ready to be read.
A common pattern where one thread must signal another is the producer/consumer pattern. Here, one thread is producing values that are input to another thread.
Consider a video playing application where we split the process into the following threads:
This is also an example of task parallelism as opposed to data parallelism.
In order to stay synchronized, we can create a buffer between threads 0/1 and 1/2 which stores the data for the next thread. Then we create 4 semaphores representing the different conditions we need to track:
Both buffers are initially marked as empty and not full.
The download thread:
The decompress thread:
Finally, the playing thread:
Semaphores provide a convenient way to communicate between threads, in a variety of situations.
The following program illustrates this producer/consumer program. To simplify matters:
Each function loops until the "download" thread hits the end of file. In your shell, this is done by hitting Ctrl-D.
#include <string.h>
#include <ctype.h>
#include <stdlib.h>
#include <pthread.h>
#include <stdio.h>
#include <semaphore.h>
/* buffer of downloaded data */
char download_buffer[512];
sem_t download_empty;
sem_t download_full;
/* buffer of decompressed data */
char decompress_buffer[512];
sem_t decompress_empty;
sem_t decompress_full;
/* when to quit the program */
volatile int quit = 0;
/* the function called for the first thread, downloading the video frames */
void* download(void* p) {
/* loop forever */
while (!quit) {
/* wait until the buffer is empty */
sem_wait(&download_empty);
/* fill it with data (really just a string from stdin) */
scanf("%s", download_buffer);
/* if EOF, quit */
if (feof(stdin)) {
quit = 1;
}
/* tell the decompress that it's full */
sem_post(&download_full);
}
return NULL;
}
/* the function called for the second thread, decompressing the video */
void* decompress(void* p) {
/* loop forever */
while (!quit) {
/* wait for the download buffer to be full */
sem_wait(&download_full);
/* wait for the decompress buffer to be empty */
sem_wait(&decompress_empty);
/* move from download buffer to decompress */
strcpy(decompress_buffer, download_buffer);
/* tell download thread the buffer is empty */
sem_post(&download_empty);
/* capitalize it */
for (int i = 0; i < strlen(decompress_buffer); i++) {
decompress_buffer[i] = toupper(decompress_buffer[i]);
}
/* tell display thread the buffer is full */
sem_post(&decompress_full);
}
return NULL;
}
/* the function called for the third thread, displaying the video */
void* display(void* p) {
/* loop forever */
while (!quit) {
/* wait for the decompress buffer to be full */
sem_wait(&decompress_full);
/* display it */
printf("%s\n", decompress_buffer);
/* tell the thread it's empty */
sem_post(&decompress_empty);
}
return NULL;
}
int main() {
/* the threads */
pthread_t a, b, c;
/* setup the semaphores to show all buffers empty */
sem_init(&download_empty, 0, 1);
sem_init(&download_full, 0, 0);
sem_init(&decompress_empty, 0, 1);
sem_init(&decompress_full, 0, 0);
/* spawn all threads */
pthread_create(&a, NULL, download, NULL);
pthread_create(&b, NULL, decompress, NULL);
pthread_create(&c, NULL, display, NULL);
/* join all threads */
pthread_join(a, NULL);
pthread_join(b, NULL);
pthread_join(c, NULL);
return 0;
}
Copyright © 2024 Ian Finlayson | Licensed under a Creative Commons BY-NC-SA 4.0 License.