Slide 1: CS110 Lecture 11: Condition Variables and Semaphores

Principles of Computer Systems

Winter 2020

Stanford University

Computer Science Department

Instructors: Chris Gregg and

                            Nick Troccoli

PDF of this presentation

Slide 2: CS110 Topic 3: How can we have concurrency within a single process?

Slide 3: Learning About Processes

Introduction to Threads

Threads and Mutexes

Condition Variables and Semaphores

Multithreading Patterns

2/3

2/5

Today

2/12

Slide 4: Today's Learning Goals

Slide 5: Plan For Today

Slide 6: Plan For Today

// images.cc
int main(int argc, const char *argv[]) {
  thread processors[10];
  size_t remainingImages = 250;
  for (size_t i = 0; i < 10; i++)
    processors[i] = thread(process, 101 + i, ref(remainingImages));
  for (thread& proc: processors) proc.join();
  cout << "Images done!" << endl;
  return 0;
}

Slide 7: Race Conditions and Mutexes

There is a race condition here!

 

 

 

Slide 8: Race Conditions and Mutexes

static void process(size_t id, size_t& remainingImages) {
    while (remainingImages > 0) {
        sleep_for(500);  // simulate "processing image"
        remainingImages--;
        ...
    }
    ...
}

 

 

 

 

 

Slide 9: Race Conditions and Mutexes

// gets remainingImages
0x0000000000401a9b <+36>:    mov    -0x20(%rbp),%rax
0x0000000000401a9f <+40>:    mov    (%rax),%eax

// Decrements by 1
0x0000000000401aa1 <+42>:    lea    -0x1(%rax),%edx

// Saves updated value
0x0000000000401aa4 <+45>:    mov    -0x20(%rbp),%rax
0x0000000000401aa8 <+49>:    mov    %edx,(%rax)

Slide 10: Mutex

https://www.flickr.com/photos/ofsmallthings/8220574255

A mutex is a variable type that represents something like a "locked door".

You can lock the door:

- if it's unlocked, you go through the door and lock it

- if it's locked, you ​wait for it to unlock first

 

If you most recently locked the door, you can unlock the door:

- door is now unlocked, another may go in now

class mutex {
public:
  mutex();        // constructs the mutex to be in an unlocked state
  void lock();    // acquires the lock on the mutex, blocking until it's unlocked
  void unlock();  // releases the lock and wakes up another threads trying to lock it
};

Slide 11: Mutex - Mutual Exclusion

Goal: keep critical sections as small as possible to maximize concurrent execution

Note: we don't need to lock around printing out remainingImages - reading a size_t has no risk of corruption

static void process(size_t id, size_t& remainingImages, mutex& counterLock) {
  while (true) {
    size_t myImage;
    
    counterLock.lock();
    if (remainingImages == 0) {
      counterLock.unlock();
      break;
    } else {
      myImage = remainingImages;
      remainingImages--;
      counterLock.unlock();

      processImage(myImage);
      
      cout << oslock << "Thread#" << id << " processed an image (" << remainingImages 
      << " remain)." << endl << osunlock;
    }
  }
  cout << oslock << "Thread#" << id << " sees no remaining images and exits." 
  << endl << osunlock;
}

Slide 12: Critical Sections Can Be Bottlenecks

Hardware provides atomic memory operations to build on top of: e.g. "compare and swap"

 

Caches add an additional challenge:

Slide 13: How Do Mutexes Work?

Slide 14: Plan For Today

Slide 15: Dining Philosophers Problem

Goal: we must encode resource constraints into our program.

Example: how many philosophers can hold a fork at the same time? One.

How can we encode this into our program?  Let's make a mutex for each fork.

static void philosopher(size_t id, mutex& left, mutex& right) {
  ...
}

int main(int argc, const char *argv[]) {
  mutex forks[5];
  thread philosophers[5];
  for (size_t i = 0; i < 5; i++) {
    mutex& left = forks[i], & right = forks[(i + 1) % 5];
    philosophers[i] = thread(philosopher, i, ref(left), ref(right));
  }
  for (thread& p: philosophers) p.join();
  return 0;
}

Slide 16: Dining Philosophers Problem

A philosopher thinks, then eats, and repeats this three times.  

static void think(size_t id) {
  cout << oslock << id << " starts thinking." << endl << osunlock;
  sleep_for(getThinkTime());
  cout << oslock << id << " all done thinking. " << endl << osunlock;
}

static void eat(size_t id, mutex& left, mutex& right) {
  ...
}

static void philosopher(size_t id, mutex& left, mutex& right) {
  for (size_t i = 0; i < kNumMeals; i++) {
    think(id);
    eat(id, left, right);
  }
}

Slide 17: Dining Philosophers Problem

A philosopher thinks, then eats, and repeats this three times.  

static void eat(size_t id, mutex& left, mutex& right) {
  left.lock();
  right.lock();
  cout << oslock << id << " starts eating om nom nom nom." << endl << osunlock;
  sleep_for(getEatTime());
  cout << oslock << id << " all done eating." << endl << osunlock;
  left.unlock();
  right.unlock();
}

Slide 18: Dining Philosophers Problem

What if: all philosophers grab their left fork and then go off the CPU?

static void eat(size_t id, mutex& left, mutex& right) {
  left.lock();
  sleep_for(5000);  // artificially force off the processor
  right.lock();
  cout << oslock << id << " starts eating om nom nom nom." << endl << osunlock;
  sleep_for(getEatTime());
  cout << oslock << id << " all done eating." << endl << osunlock;
  left.unlock();
  right.unlock();
}

Slide 19: Food For Thought

When coding with threads, you need to ensure that:

Slide 20: Race Conditions and Deadlock

Slide 21: Plan For Today

Goal: we must encode resource constraints into our program.

Example: how many philosophers can try to eat at the same time? Four.

How can we encode this into our program?

 

What does this look like in code?

Slide 22: Race Conditions and Deadlock

int main(int argc, const char *argv[]) {
  // NEW
  size_t permits = 4;
  mutext permitsLock;
  
  mutex forks[5];
  thread philosophers[5];
  for (size_t i = 0; i < 5; i++) {
    mutex& left = forks[i], & right = forks[(i + 1) % 5];
    philosophers[i] = thread(philosopher, i, ref(left), ref(right), ref(permits), ref(permitsLock));
  }
  for (thread& p: philosophers) p.join();
  return 0;
}

Slide 23: Tickets, Please...

static void philosopher(size_t id, mutex& left, mutex& right,
                        size_t& permits, mutex& permitsLock) {
  for (size_t i = 0; i < kNumMeals; i++) {
    think(id);
    eat(id, left, right, permits, permitsLock);
  }
}

Slide 24: Tickets, Please...

static void eat(size_t id, mutex& left, mutex& right, size_t& permits, mutex& permitsLock) {
  // NEW
  waitForPermission(permits, permitsLock);
  
  left.lock();
  right.lock();
  cout << oslock << id << " starts eating om nom nom nom." << endl << osunlock;
  sleep_for(getEatTime());
  cout << oslock << id << " all done eating." << endl << osunlock;
  
  // NEW
  grantPermission(permits, permitsLock);
  
  left.unlock();
  right.unlock();
}

Slide 25: Tickets, Please...

static void grantPermission(size_t& permits, mutex& permitsLock) {
  permitsLock.lock();
  permits++;
  permitsLock.unlock();
}

Slide 26: grantPermission

static void waitForPermission(size_t& permits, mutex& permitsLock) {
  while (true) {
    permitsLock.lock();
    if (permits > 0) break;
    permitsLock.unlock();
    sleep_for(10);
  }
  permits--;
  permitsLock.unlock();
}

Slide 27: waitForPermission

Problem: ​this is busy waiting!

Slide 28: It would be nice if....someone could let us know when they return their permit.  Then, we can sleep until this happens.

Slide 29: Plan For Today

Slide 30: Plan For Today

A ​condition variable is a variable that can be shared across threads and used for one thread to notify to another thread when something happens.  A thread can also use this to wait until it is notified by another thread.

 

 

 

 

 

class condition_variable_any {
public:
   void wait(mutex& m);
   template <typename Pred> void wait(mutex& m, Pred pred);
   void notify_one();
   void notify_all();
};

 

Idea:

Slide 31: waitForPermission

Full program: here

int main(int argc, const char *argv[]) {
  size_t permits = 4;
  mutex forks[5], m;
  
  // NEW
  condition_variable_any cv;
  
  thread philosophers[5];
  for (size_t i = 0; i < 5; i++) {
    mutex& left = forks[i], & right = forks[(i + 1) % 5];
    philosophers[i] = 
       thread(philosopher, i, ref(left), ref(right), ref(permits), ref(cv), ref(m));
  }
  for (thread& p: philosophers) p.join();
  return 0;
}

Slide 32: Condition Variables

Full program: here

static void grantPermission(size_t& permits, condition_variable_any& cv, mutex& m) {
  m.lock();
  permits++;
  if (permits == 1) cv.notify_all();
  m.unlock();
}


Slide 33: grantPermission

Full program: here

 

 

 

 

Here's what cv.wait does:

static void waitForPermission(size_t& permits, condition_variable_any& cv, mutex& m) {
  m.lock();
  while (permits == 0) cv.wait(m);
  permits--;
  m.unlock();
}

Slide 34: waitForPermission

template <Predicate pred>
void condition_variable_any::wait(mutex& m, Pred pred) {
  while (!pred()) wait(m);
}

static void waitForPermission(size_t& permits, condition_variable_any& cv, mutex& m) {
  lock_guard<mutex> lg(m);
  cv.wait(m, [&permits] { return permits > 0; });
  permits--;
}

Slide 35: Lecture 11: Multithreading and Condition Variables

Slide 36: Plan For Today

Slide 37: Announcements

Midterm This Friday

Slide 38: Plan For Today

static void waitForPermission(size_t& permits, condition_variable_any& cv, mutex& m) {
  lock_guard<mutex> lg(m);
  while (permits == 0) cv.wait(m);
  permits--;
}

static void grantPermission(size_t& permits, condition_variable_any& cv, mutex& m) {
  lock_guard<mutex> lg(m);
  permits++;
  if (permits == 1) cv.notify_all();
}

Slide 39: Lock Guards

Slide 40: Lecture 11: Multithreading and Condition Variables

void semaphore::wait() {
  lock_guard<mutex> lg(m);
  cv.wait(m, [this] { return value > 0; })
  value--;
}
void semaphore::signal() {
  lock_guard<mutex> lg(m);
  value++;
  if (value == 1) cv.notify_all();
}

Slide 41: Lecture 11: Multithreading and Condition Variables

static void philosopher(size_t id, mutex& left, mutex& right, semaphore& permits) {
  for (size_t i = 0; i < 3; i++) {
    think(id);
    eat(id, left, right, permits);
  }
}

int main(int argc, const char *argv[]) {
  semaphore permits(4);
  mutex forks[5];
  thread philosophers[5];
  for (size_t i = 0; i < 5; i++) {
    mutex& left = forks[i], & right = forks[(i + 1) % 5];
    philosophers[i] = thread(philosopher, i, ref(left), ref(right), ref(permits));
  }
  for (thread& p: philosophers) p.join();
  return 0;
}

Slide 42: Lecture 11: Multithreading and Condition Variables

static void eat(size_t id, mutex& left, mutex& right, semaphore& permits) {
  permits.wait();
  left.lock();
  right.lock();
  cout << oslock << id << " starts eating om nom nom nom." << endl << osunlock;
  sleep_for(getEatTime());
  cout << oslock << id << " all done eating." << endl << osunlock;
  permits.signal();
  left.unlock();
  right.unlock();
}

Slide 43: Lecture 11: Multithreading and Condition Variables

Slide 44: semaphore

permits.wait(); // if five other threads currently hold permits, this will block

// only five threads can be here at once

permits.signal(); // if other threads are waiting, a permit will be available

Slide 45: semaphore

Slide 46: semaphore

 

void writer(int i, semaphore &s) {
    cout << oslock << "Sending signal " << i << endl << osunlock;
    s.signal();
}

void read_after_ten(semaphore &s) {
    s.wait();
    cout << oslock << "Got enough signals to continue!" << endl << osunlock;
}

int main(int argc, const char *argv[]) {
    semaphore negSemaphore(-9);
    thread readers[10];
    for (size_t i = 0; i < 10; i++) {
        readers[i] = thread(writer, i, ref(negSemaphore));
    }
    thread r(read_after_ten, ref(negSemaphore));
    for (thread &t : readers) t.join();
    r.join();
    return 0;
}

Slide 47: semaphore

Slide 48: Plan For Today

Slide 49: semaphore

static void writer(char buffer[]) {
  cout << oslock << "Writer: ready to write." << endl << osunlock;
  for (size_t i = 0; i < 320; i++) { // 320 is 40 cycles around the circular buffer of length 8
    char ch = prepareData();
    buffer[i % 8] = ch;
    cout << oslock << "Writer: published data packet with character '" 
         << ch << "'." << endl << osunlock;
  }
}

static void reader(char buffer[]) {
  cout << oslock << "\t\tReader: ready to read." << endl << osunlock;
  for (size_t i = 0; i < 320; i++) { // 320 is 40 cycles around the circular buffer of length 8 
    char ch = buffer[i % 8];
    processData(ch);
    cout << oslock << "\t\tReader: consumed data packet " << "with character '" 
         << ch << "'." << endl << osunlock;
  }
}

int main(int argc, const char *argv[]) {
  char buffer[8];
  thread w(writer, buffer);
  thread r(reader, buffer);
  w.join();
  r.join();
  return 0;
}

Slide 50: semaphore

Slide 51: Plan For Today

Slide 52: semaphore

int main(int argc, const char *argv[]) {
  char buffer[8];
  semaphore fullBuffers, emptyBuffers(8);
  thread w(writer, buffer, ref(fullBuffers), ref(emptyBuffers));
  thread r(reader, buffer, ref(fullBuffers), ref(emptyBuffers));
  w.join();
  r.join();
  return 0;
}

Slide 53: semaphore

static void writer(char buffer[], semaphore& full, semaphore& empty) {
  cout << oslock << "Writer: ready to write." << endl << osunlock;
  for (size_t i = 0; i < 320; i++) { // 320 is 40 cycles around the circular buffer of length 8
    char ch = prepareData();
    empty.wait();   // don't try to write to a slot unless you know it's empty                                                                                                         
    buffer[i % 8] = ch;
    full.signal();  // signal reader there's more stuff to read                                                                                                                        
    cout << oslock << "Writer: published data packet with character '" 
         << ch << "'." << endl << osunlock;
  }
}

static void reader(char buffer[], semaphore& full, semaphore& empty) {
  cout << oslock << "\t\tReader: ready to read." << endl << osunlock;
  for (size_t i = 0; i < 320; i++) { // 320 is 40 cycles around the circular buffer of length 8
    full.wait();    // don't try to read from a slot unless you know it's full                                                                                                         
    char ch = buffer[i % 8];
    empty.signal(); // signal writer there's a slot that can receive data                                                                                                              
    processData(ch);
    cout << oslock << "\t\tReader: consumed data packet " << "with character '" 
         << ch << "'." << endl << osunlock;
  }
}

Slide 54: semaphore

static const char *kCS110StudentIDsFile = "studentsunets.txt";
int main(int argc, char *argv[]) {
  unordered_set<string> cs110Students;
  readStudentFile(cs110Students, argv[1] != NULL ? argv[1] : kCS110StudentIDsFile);
  map<int, int> processCountMap;
  compileCS110ProcessCountMap(cs110Students, processCountMap);
  publishLeastLoadedMachineInfo(processCountMap);
  return 0;
}

Slide 55: semaphore

static const char *kCS110StudentIDsFile = "studentsunets.txt";
int main(int argc, char *argv[]) {
  unordered_set<string> cs110Students;
  readStudentFile(cs110Students, argv[1] != NULL ? argv[1] : kCS110StudentIDsFile);
  map<int, int> processCountMap;
  compileCS110ProcessCountMap(cs110Students, processCountMap);
  publishLeastLoadedMachineInfo(processCountMap);
  return 0;
}

Slide 56: semaphore

static const int kMinMythMachine = 51;
static const int kMaxMythMachine = 66;
static void compileCS110ProcessCountMap(const unordered_set<string>& sunetIDs,
                                        map<int, int>& processCountMap) {
  for (int num = kMinMythMachine; num <= kMaxMythMachine; num++) {
    int numProcesses = getNumProcesses(num, sunetIDs);
    if (numProcesses >= 0) {
      processCountMap[num] = numProcesses;
      cout << "myth" << num << " has this many CS110-student processes: " << numProcesses << endl;
    }
  }
}

int getNumProcesses(int num, const unordered_set<std::string>& sunetIDs);

Slide 57: semaphore

 

 

 

 

poohbear@myth61$ time ./myth-buster-sequential 
myth51 has this many CS110-student processes: 62
myth52 has this many CS110-student processes: 133
myth53 has this many CS110-student processes: 116
myth54 has this many CS110-student processes: 90
myth55 has this many CS110-student processes: 117
myth56 has this many CS110-student processes: 64
myth57 has this many CS110-student processes: 73
myth58 has this many CS110-student processes: 92
myth59 has this many CS110-student processes: 109
myth60 has this many CS110-student processes: 145
myth61 has this many CS110-student processes: 106
myth62 has this many CS110-student processes: 126
myth63 has this many CS110-student processes: 317
myth64 has this many CS110-student processes: 119
myth65 has this many CS110-student processes: 150
myth66 has this many CS110-student processes: 133
Machine least loaded by CS110 students: myth51
Number of CS110 processes on least loaded machine: 62
poohbear@myth61$
poohbear@myth61$ time ./myth-buster-sequential 
myth51 has this many CS110-student processes: 59
myth52 has this many CS110-student processes: 135
myth53 has this many CS110-student processes: 112
myth54 has this many CS110-student processes: 89
myth55 has this many CS110-student processes: 107
myth56 has this many CS110-student processes: 58
myth57 has this many CS110-student processes: 70
myth58 has this many CS110-student processes: 93
myth59 has this many CS110-student processes: 107
myth60 has this many CS110-student processes: 145
myth61 has this many CS110-student processes: 105
myth62 has this many CS110-student processes: 126
myth63 has this many CS110-student processes: 314
myth64 has this many CS110-student processes: 119
myth65 has this many CS110-student processes: 156
myth66 has this many CS110-student processes: 144
Machine least loaded by CS110 students: myth56
Number of CS110 processes on least loaded machine: 58
poohbear@myth61$

Slide 58: semaphore

static void countCS110Processes(int num, const unordered_set<string>& sunetIDs,
                                map<int, int>& processCountMap, mutex& processCountMapLock, 
                                semaphore& permits) {
  int count = getNumProcesses(num, sunetIDs);
  if (count >= 0) {
    lock_guard<mutex> lg(processCountMapLock);
    processCountMap[num] = count;
    cout << "myth" << num << " has this many CS110-student processes: " << count << endl;
  }
  permits.signal(on_thread_exit);
}

static void compileCS110ProcessCountMap(const unordered_set<string> sunetIDs, 
                                        map<int, int>& processCountMap) {  
  vector<thread> threads;
  mutex processCountMapLock;
  semaphore permits(8); // limit the number of threads to the number of CPUs
  for (int num = kMinMythMachine; num <= kMaxMythMachine; num++) {
    permits.wait();
    threads.push_back(thread(countCS110Processes, num, ref(sunetIDs),
                             ref(processCountMap), ref(processCountMapLock), ref(permits)));
  }
  for (thread& t: threads) t.join();
}

Slide 59: semaphore

Slide 60: semaphore

Slide 61: Recap

 

Next time: more threads