Slide 1: CS110 Lecture 12: Semaphores and Multithreading Patterns

Principles of Computer Systems

Winter 2020

Stanford University

Computer Science Department

Instructors: Chris Gregg and

                            Nick Troccoli

PDF of this presentation

Slide 2: CS110 Topic 3: How can we have concurrency within a single process?

Slide 3: Learning About Processes

Introduction to Threads

Threads and Mutexes

Condition Variables and Semaphores

Multithreading Patterns

2/3

2/5

2/10

Today

Slide 4: Today's Learning Goals

Slide 5: Plan For Today

Slide 6: Plan For Today

Slide 7: Dining Philosophers Problem

A philosopher thinks, then eats, and repeats this three times.  

static void think(size_t id) {
  cout << oslock << id << " starts thinking." << endl << osunlock;
  sleep_for(getThinkTime());
  cout << oslock << id << " all done thinking. " << endl << osunlock;
}

Slide 8: Dining Philosophers Problem

A philosopher thinks, then eats, and repeats this three times.  

static void eat(size_t id, mutex& left, mutex& right) {
  left.lock();
  right.lock();
  cout << oslock << id << " starts eating om nom nom nom." << endl << osunlock;
  sleep_for(getEatTime());
  cout << oslock << id << " all done eating." << endl << osunlock;
  left.unlock();
  right.unlock();
}

Slide 9: Dining Philosophers Problem

A philosopher thinks, then eats, and repeats this three times.  

static void eat(size_t id, mutex& left, mutex& right, size_t& permits, mutex& permitsLock) {
  waitForPermission(permits, permitsLock);  

  left.lock();
  right.lock();
  cout << oslock << id << " starts eating om nom nom nom." << endl << osunlock;
  sleep_for(getEatTime());
  cout << oslock << id << " all done eating." << endl << osunlock;
  
  grantPermission(permits, permitsLock);

  left.unlock();
  right.unlock();
}

Slide 10: Dining Philosophers Problem

To return a permit, increment by 1 and continue

static void grantPermission(size_t& permits, mutex& permitsLock) {
  permitsLock.lock();
  permits++;
  permitsLock.unlock();
}

Slide 11: grantPermission

static void waitForPermission(size_t& permits, mutex& permitsLock) {
  while (true) {
    permitsLock.lock();
    if (permits > 0) break;
    permitsLock.unlock();
    sleep_for(10);
  }
  permits--;
  permitsLock.unlock();
}

Slide 12: waitForPermission

Problem: ​this is busy waiting!

Slide 13: It would be nice if....someone could let us know when they return their permit.  Then, we can sleep until this happens.

Slide 14: Plan For Today

A ​condition variable is a variable that can be shared across threads and used for one thread to notify to another thread when something happens.  A thread can also use this to wait until it is notified by another thread.

 

 

 

 

 

class condition_variable_any {
public:
   void wait(mutex& m);
   template <typename Pred> void wait(mutex& m, Pred pred);
   void notify_one();
   void notify_all();
};

Full program: here

static void grantPermission(size_t& permits, condition_variable_any& cv, mutex& m) {
  m.lock();
  permits++;
  if (permits == 1) cv.notify_all();
  m.unlock();
}


Slide 15: grantPermission

Full program: here

 

 

 

 

Here's what cv.wait does:

static void waitForPermission(size_t& permits, condition_variable_any& cv, mutex& m) {
  m.lock();
  while (permits == 0) cv.wait(m);
  permits--;
  m.unlock();
}

Slide 16: waitForPermission

Full program: here

 

 

 

 

 

 

 

static void waitForPermission(size_t& permits, condition_variable_any& cv, mutex& m) {
  m.lock();
  // while (permits == 0) cv.wait(m);
  cv.wait(m, [&permits] { return permits > 0; });
  permits--;
  m.unlock();
}

Slide 17: waitForPermission

template <Predicate pred>
void condition_variable_any::wait(mutex& m, Pred pred) {
  while (!pred()) wait(m);
}

static void waitForPermission(size_t& permits, condition_variable_any& cv, mutex& m) {
  lock_guard<mutex> lg(m);
  while (permits == 0) cv.wait(m);
  permits--;
}

static void grantPermission(size_t& permits, condition_variable_any& cv, mutex& m) {
  lock_guard<mutex> lg(m);
  permits++;
  if (permits == 1) cv.notify_all();
}

Slide 18: Lock Guards

Slide 19: Plan For Today

Slide 20: Semaphore

This "permission slip" pattern with signaling is a very common pattern:

Slide 21: Semaphore

A semaphore is a variable type that lets you manage a count of finite resources.

 

class semaphore {
 public:
  semaphore(int value = 0);
  void wait();
  void signal();
  
 private:
  int value;
  std::mutex m;
  std::condition_variable_any cv;
}

Slide 22: Semaphore

A semaphore is a variable type that lets you manage a count of finite resources.

 

 

 

permits.wait(); // if five other threads currently hold permits, this will block

// only five threads can be here at once

permits.signal(); // if other threads are waiting, a permit will be available

Slide 23: Semaphore - wait

A semaphore is a variable type that lets you manage a count of finite resources.

 

 

 

 

 

void semaphore::wait() {
  lock_guard<mutex> lg(m);
  cv.wait(m, [this]{ return value > 0; });
  value--;
}
class semaphore {
 public:
  semaphore(int value = 0);
  void wait();
  void signal();
  
 private:
  int value;
  std::mutex m;
  std::condition_variable_any cv;
}

Slide 24: Semaphore - signal

A semaphore is a variable type that lets you manage a count of finite resources.

 

void semaphore::signal() {
  lock_guard<mutex> lg(m);
  value++;
  if (value == 1) cv.notify_all();
}
class semaphore {
 public:
  semaphore(int value = 0);
  void wait();
  void signal();
  
 private:
  int value;
  std::mutex m;
  std::condition_variable_any cv;
}

Here's our final version of the dining-philosophers.

static void philosopher(size_t id, mutex& left, mutex& right, semaphore& permits) {
  for (size_t i = 0; i < 3; i++) {
    think(id);
    eat(id, left, right, permits);
  }
}

int main(int argc, const char *argv[]) {
  // NEW
  semaphore permits(4);
  
  mutex forks[5];
  thread philosophers[5];
  for (size_t i = 0; i < 5; i++) {
    mutex& left = forks[i], & right = forks[(i + 1) % 5];
    philosophers[i] = thread(philosopher, i, ref(left), ref(right), ref(permits));
  }
  for (thread& p: philosophers) p.join();
  return 0;
}

Slide 25: And Now....We Eat!

eat now relies on the semaphore instead of calling waitForPermission and grantPermission.

 

 

 

 

 

 

Thought Questions:

static void eat(size_t id, mutex& left, mutex& right, semaphore& permits) {
  // NEW
  permits.wait();
  
  left.lock();
  right.lock();
  cout << oslock << id << " starts eating om nom nom nom." << endl << osunlock;
  sleep_for(getEatTime());
  cout << oslock << id << " all done eating." << endl << osunlock;
  
  // NEW
  permits.signal();
  
  left.unlock();
  right.unlock();
}

Slide 26: And Now....We Eat!

 

 

 

 

 

 

Thought Questions:

static void eat(size_t id, mutex& left, mutex& right, semaphore& permits) {
  permits.wait();
  left.lock();
  right.lock();
  cout << oslock << id << " starts eating om nom nom nom." << endl << osunlock;
  sleep_for(getEatTime());
  cout << oslock << id << " all done eating." << endl << osunlock;
  permits.signal();
  left.unlock();
  right.unlock();
}

Slide 27: And Now....We Eat!

Question: what would a semaphore initialized with 0 mean?

Slide 28: More On Semaphores

Question: what would a semaphore initialized with a negative number mean?

Negative semaphores example (full program here): 

void writer(int i, semaphore &s) {
    cout << oslock << "Sending signal " << i << endl << osunlock;
    s.signal();
}

void read_after_ten(semaphore &s) {
    s.wait();
    cout << oslock << "Got enough signals to continue!" << endl << osunlock;
}

int main(int argc, const char *argv[]) {
    semaphore negSemaphore(-9);
    thread readers[10];
    for (size_t i = 0; i < 10; i++) {
        readers[i] = thread(writer, i, ref(negSemaphore));
    }
    thread r(read_after_ten, ref(negSemaphore));
    for (thread &t : readers) t.join();
    r.join();
    return 0;
}

Slide 29: More On Semaphores

Slide 30: Plan For Today

Slide 31: Thread Coordination

Slide 32: Plan For Today

Slide 33: Announcements

Midterm This Friday

 

Assignment 5 Out Tomorrow

Slide 34: Mid-Lecture Checkin

We can now answer the following questions:

Slide 35: Plan For Today

Slide 36: Reader-Writer

Let's implement a program that requires thread rendezvous with semaphores.  First, we'll look at a version without semaphores to see why they are necessary.

The full program

is right here

static void writer(char buffer[]) {
  cout << oslock << "Writer: ready to write." << endl << osunlock;
  for (size_t i = 0; i < 320; i++) { // 320 is 40 cycles around the circular buffer of length 8
    char ch = prepareData();
    buffer[i % 8] = ch;
    cout << oslock << "Writer: published data packet with character '" 
         << ch << "'." << endl << osunlock;
  }
}

static void reader(char buffer[]) {
  cout << oslock << "\t\tReader: ready to read." << endl << osunlock;
  for (size_t i = 0; i < 320; i++) { // 320 is 40 cycles around the circular buffer of length 8 
    char ch = buffer[i % 8];
    processData(ch);
    cout << oslock << "\t\tReader: consumed data packet " << "with character '" 
         << ch << "'." << endl << osunlock;
  }
}

int main(int argc, const char *argv[]) {
  char buffer[8];
  thread w(writer, buffer);
  thread r(reader, buffer);
  w.join();
  r.join();
  return 0;
}

Slide 37: Reader-Writer

Slide 38: Reader-Writer

Goal: we must encode resource constraints into our program.

What constraint(s) should we add to our program?

 

How can we model these constraint(s)?

Slide 39: Reader-Writer Constraints

What might this look like in code?

Slide 40: Reader-Writer Constraints

Slide 41: Plan For Today

static const char *kCS110StudentIDsFile = "studentsunets.txt";
int main(int argc, char *argv[]) {
  unordered_set<string> cs110Students;
  readStudentFile(cs110Students, argv[1] != NULL ? argv[1] : kCS110StudentIDsFile);
  map<int, int> processCountMap;
  compileCS110ProcessCountMap(cs110Students, processCountMap);
  publishLeastLoadedMachineInfo(processCountMap);
  return 0;
}

Slide 42: semaphore

static const char *kCS110StudentIDsFile = "studentsunets.txt";
int main(int argc, char *argv[]) {
  unordered_set<string> cs110Students;
  readStudentFile(cs110Students, argv[1] != NULL ? argv[1] : kCS110StudentIDsFile);
  map<int, int> processCountMap;
  compileCS110ProcessCountMap(cs110Students, processCountMap);
  publishLeastLoadedMachineInfo(processCountMap);
  return 0;
}

Slide 43: semaphore

static const int kMinMythMachine = 51;
static const int kMaxMythMachine = 66;
static void compileCS110ProcessCountMap(const unordered_set<string>& sunetIDs,
                                        map<int, int>& processCountMap) {
  for (int num = kMinMythMachine; num <= kMaxMythMachine; num++) {
    int numProcesses = getNumProcesses(num, sunetIDs);
    if (numProcesses >= 0) {
      processCountMap[num] = numProcesses;
      cout << "myth" << num << " has this many CS110-student processes: " << numProcesses << endl;
    }
  }
}

int getNumProcesses(int num, const unordered_set<std::string>& sunetIDs);

Slide 44: semaphore

 

 

 

 

poohbear@myth61$ time ./myth-buster-sequential 
myth51 has this many CS110-student processes: 62
myth52 has this many CS110-student processes: 133
myth53 has this many CS110-student processes: 116
myth54 has this many CS110-student processes: 90
myth55 has this many CS110-student processes: 117
myth56 has this many CS110-student processes: 64
myth57 has this many CS110-student processes: 73
myth58 has this many CS110-student processes: 92
myth59 has this many CS110-student processes: 109
myth60 has this many CS110-student processes: 145
myth61 has this many CS110-student processes: 106
myth62 has this many CS110-student processes: 126
myth63 has this many CS110-student processes: 317
myth64 has this many CS110-student processes: 119
myth65 has this many CS110-student processes: 150
myth66 has this many CS110-student processes: 133
Machine least loaded by CS110 students: myth51
Number of CS110 processes on least loaded machine: 62
poohbear@myth61$
poohbear@myth61$ time ./myth-buster-sequential 
myth51 has this many CS110-student processes: 59
myth52 has this many CS110-student processes: 135
myth53 has this many CS110-student processes: 112
myth54 has this many CS110-student processes: 89
myth55 has this many CS110-student processes: 107
myth56 has this many CS110-student processes: 58
myth57 has this many CS110-student processes: 70
myth58 has this many CS110-student processes: 93
myth59 has this many CS110-student processes: 107
myth60 has this many CS110-student processes: 145
myth61 has this many CS110-student processes: 105
myth62 has this many CS110-student processes: 126
myth63 has this many CS110-student processes: 314
myth64 has this many CS110-student processes: 119
myth65 has this many CS110-student processes: 156
myth66 has this many CS110-student processes: 144
Machine least loaded by CS110 students: myth56
Number of CS110 processes on least loaded machine: 58
poohbear@myth61$

Slide 45: semaphore

static void countCS110Processes(int num, const unordered_set<string>& sunetIDs,
                                map<int, int>& processCountMap, mutex& processCountMapLock, 
                                semaphore& permits) {
  int count = getNumProcesses(num, sunetIDs);
  if (count >= 0) {
    lock_guard<mutex> lg(processCountMapLock);
    processCountMap[num] = count;
    cout << "myth" << num << " has this many CS110-student processes: " << count << endl;
  }
  permits.signal(on_thread_exit);
}

static void compileCS110ProcessCountMap(const unordered_set<string> sunetIDs, 
                                        map<int, int>& processCountMap) {  
  vector<thread> threads;
  mutex processCountMapLock;
  semaphore permits(8); // limit the number of threads to the number of CPUs
  for (int num = kMinMythMachine; num <= kMaxMythMachine; num++) {
    permits.wait();
    threads.push_back(thread(countCS110Processes, num, ref(sunetIDs),
                             ref(processCountMap), ref(processCountMapLock), ref(permits)));
  }
  for (thread& t: threads) t.join();
}

Slide 46: semaphore

Slide 47: semaphore

Slide 48: Recap

 

Next time: a trip to the ice cream store