Slide 1: CS110 Lecture 10: Threads and Mutexes

Principles of Computer Systems

Winter 2020

Stanford University

Computer Science Department

Instructors: Chris Gregg and

                            Nick Troccoli

PDF of this presentation

Slide 2: CS110 Topic 3: How can we have concurrency within a single process?

Slide 3: Learning About Processes

Introduction to Threads

Mutexes and Condition Variables

Condition Variables and Semaphores

Multithreading Patterns

2/3

Today

2/10

2/12

Slide 4: Today's Learning Goals

Slide 5: Plan For Today

Slide 6: Plan For Today

A thread is an independent execution sequence within a single process.

Slide 7: Threads

Processes:

Threads:

Slide 8: C++ thread

A thread object can be spawned to run the specified function with the given arguments.

 

To pass objects by reference to a thread, use the ref() function:

void myFunc(int& x, int& y) {...}

thread myThread(myFunc, ref(arg1), ref(arg2));

Slide 9: C++ thread

We can also initialize an array of threads as follows (note the loop by reference):

thread friends[5];
for (thread& currFriend : friends) {
    currFriend = thread(myFunc, arg1, arg2);
}   
// declare array of empty thread handles
thread friends[5];

// Spawn threads
for (size_t i = 0; i < 5; i++) {
	friends[i] = thread(myFunc, arg1, arg2); 
}

We can make an array of threads as follows:

Slide 10: C++ thread

For multiple threads, we must wait on a specific thread one at a time:

thread friends[5];
// spawn here
// now we wait for each to finish
for (size_t i = 0; i < 5; i++) {
	friends[i].join();
}

To wait on a thread to finish, use the .join() method:

thread myThread(myFunc, arg1, arg2);

... // do some work

// Wait for thread to finish (blocks)
myThread.join();

Slide 11: Thread Safety

A thread-safe function is one that will always execute correctly, even when called concurrently from multiple threads.

Slide 12: Threads Share Memory

static void greeting(size_t& i) {
    cout << oslock << "Hello, world! I am thread " << i << endl << osunlock; 
}
    
static const size_t kNumFriends = 6;
int main(int argc, char *argv[]) {
  cout << "Let's hear from " << kNumFriends << " threads." << endl;  

  thread friends[kNumFriends]; // declare array of empty thread handles

  // Spawn threads
  for (size_t i = 0; i < kNumFriends; i++) {
      friends[i] = thread(greeting, ref(i)); 
  }   

  // Wait for threads
  for (size_t i = 0; i < kNumFriends; i++) {
     friends[i].join();    
  }

  cout << "Everyone's said hello!" << endl;
  return 0;
}
$ ./friends
Let's hear from 6 threads.
Hello, world! I am thread 2
Hello, world! I am thread 2
Hello, world! I am thread 3
Hello, world! I am thread 5
Hello, world! I am thread 5
Hello, world! I am thread 6
Everyone's said hello!

Output

for (size_t i = 0; i < kNumFriends; i++) {
    friends[i] = thread(greeting, ref(i)); 
}  

_start

greeting

main

argc

argv

i

args

args

args

args

args

args

created thread stacks

main stack

Solution: pass a copy of i (not by reference) so it does not change.

Slide 13: Threads Share Memory

Slide 14: Plan For Today

// images.cc
int main(int argc, const char *argv[]) {
  thread processors[10];
  size_t remainingImages = 250;
  for (size_t i = 0; i < 10; i++)
    processors[i] = thread(process, 101 + i, ref(remainingImages));
  for (thread& proc: processors) proc.join();
  cout << "Images done!" << endl;
  return 0;
}

Slide 15: Thread-Level Parallelism

There is a race condition here!

 

 

 

Slide 16: Thread-Level Parallelism

static void process(size_t id, size_t& remainingImages) {
    while (remainingImages > 0) {
        sleep_for(500);  // simulate "processing image"
        remainingImages--;
        ...
    }
    ...
}
0x0000000000401a9b <+36>:    mov    -0x20(%rbp),%rax
0x0000000000401a9f <+40>:    mov    (%rax),%eax
0x0000000000401aa1 <+42>:    lea    -0x1(%rax),%edx
0x0000000000401aa4 <+45>:    mov    -0x20(%rbp),%rax
0x0000000000401aa8 <+49>:    mov    %edx,(%rax)

Slide 17: Why Test and Decrement Is REALLY NOT Thread-Safe

Slide 18: Plan For Today

Slide 19: Mutex

https://www.flickr.com/photos/ofsmallthings/8220574255

A mutex is a variable type that represents something like a "locked door".

You can lock the door:

- if it's unlocked, you go through the door and lock it

- if it's locked, you ​wait for it to unlock first

 

If you most recently locked the door, you can unlock the door:

- door is now unlocked, another may go in now

class mutex {
public:
  mutex();        // constructs the mutex to be in an unlocked state
  void lock();    // acquires the lock on the mutex, blocking until it's unlocked
  void unlock();  // releases the lock and wakes up another threads trying to lock it
};

Slide 20: Mutex - Mutual Exclusion

static void process(size_t id, size_t& remainingImages, mutex& counterLock) {
  while (true) {
    counterLock.lock();
    if (remainingImages == 0) {
      counterLock.unlock(); 
      break;
    }
    processImage(remainingImages);
    remainingImages--;
    cout << oslock << "Thread#" << id << " processed an image (" << remainingImages 
     << " remain)." << endl << osunlock;
    counterLock.unlock();
  }
  cout << oslock << "Thread#" << id << " sees no remaining images and exits." 
  << endl << osunlock;
}

// Create single mutex in main, pass by reference

Slide 21: Critical Sections With Mutexes

static void process(size_t id, size_t& remainingImages, mutex& counterLock) {
  while (true) {
    size_t myImage;
    
    counterLock.lock();    // Start of critical section
    if (remainingImages == 0) {
      counterLock.unlock(); // Rather keep it here, easier to check
      break;
    } else {
      myImage = remainingImages;
      remainingImages--;
      counterLock.unlock(); // end of critical section

      processImage(myImage);
      cout << oslock << "Thread#" << id << " processed an image (" << remainingImages 
      << " remain)." << endl << osunlock;
    }
  }
  cout << oslock << "Thread#" << id << " sees no remaining images and exits." 
  << endl << osunlock;
}

Slide 22: Critical Sections Can Be Bottlenecks

Slide 23: Problems That Might Arise

Slide 24: Some Types of Mutexes

Slide 25: How Do Mutexes Work?

Slide 26: Plan For Today

Slide 27: Announcements

Midterm Next Friday

Slide 28: Plan For Today

Slide 29: Lecture 11: Multithreading and Condition Variables

static void philosopher(size_t id, mutex& left, mutex& right) {
  for (size_t i = 0; i < 3; i++) {
    think(id);
    eat(id, left, right);
  }
}

int main(int argc, const char *argv[]) {
  mutex forks[5];
  thread philosophers[5];
  for (size_t i = 0; i < 5; i++) {
    mutex& left = forks[i], & right = forks[(i + 1) % 5];
    philosophers[i] = thread(philosopher, i, ref(left), ref(right));
  }
  for (thread& p: philosophers) p.join();
  return 0;
}

Slide 30: Lecture 11: Multithreading and Condition Variables

static void think(size_t id) {
  cout << oslock << id << " starts thinking." << endl << osunlock;
  sleep_for(getThinkTime());
  cout << oslock << id << " all done thinking. " << endl << osunlock;
}

static void eat(size_t id, mutex& left, mutex& right) {
  left.lock();
  right.lock();
  cout << oslock << id << " starts eating om nom nom nom." << endl << osunlock;
  sleep_for(getEatTime());
  cout << oslock << id << " all done eating." << endl << osunlock;
  left.unlock();
  right.unlock();
}

Slide 31: Lecture 11: Multithreading and Condition Variables

static void eat(size_t id, mutex& left, mutex& right) {
  left.lock();
  sleep_for(5000);  // artificially force off the processor
  right.lock();
  cout << oslock << id << " starts eating om nom nom nom." << endl << osunlock;
  sleep_for(getEatTime());
  cout << oslock << id << " all done eating." << endl << osunlock;
  left.unlock();
  right.unlock();
}

Slide 32: Lecture 11: Multithreading and Condition Variables

Slide 33: Lecture 11: Multithreading and Condition Variables

int main(int argc, const char *argv[]) {
  size_t permits = 4;
  mutex forks[5], permitsLock;
  thread philosophers[5];
  for (size_t i = 0; i < 5; i++) {
    mutex& left = forks[i],
         & right = forks[(i + 1) % 5];
    philosophers[i] = 
        thread(philosopher, i, ref(left), ref(right), ref(permits), ref(permitsLock));
  }
  for (thread& p: philosophers) p.join();
  return 0;
}

Slide 34: Lecture 11: Multithreading and Condition Variables

static void eat(size_t id, mutex& left, mutex& right, size_t& permits, mutex& permitsLock) {
  waitForPermission(permits, permitsLock); // on next slide
  left.lock(); right.lock();
  cout << oslock << id << " starts eating om nom nom nom." << endl << osunlock;
  sleep_for(getEatTime());
  cout << oslock << id << " all done eating." << endl << osunlock;
  grantPermission(permits, permitsLock); // on next slide
  left.unlock(); right.unlock();
}

static void philosopher(size_t id, mutex& left, mutex& right,
                        size_t& permits, mutex& permitsLock) {
  for (size_t i = 0; i < kNumMeals; i++) {
    think(id);
    eat(id, left, right, permits, permitsLock);
  }
}

Slide 35: Lecture 11: Multithreading and Condition Variables

static void waitForPermission(size_t& permits, mutex& permitsLock) {
  while (true) {
    permitsLock.lock();
    if (permits > 0) break;
    permitsLock.unlock();
    sleep_for(10);
  }
  permits--;
  permitsLock.unlock();
}

static void grantPermission(size_t& permits, mutex& permitsLock) {
  permitsLock.lock();
  permits++;
  permitsLock.unlock();
}

Slide 36: Lecture 11: Multithreading and Condition Variables

class condition_variable_any {
public:
   void wait(mutex& m);
   template <typename Pred> void wait(mutex& m, Pred pred);
   void notify_one();
   void notify_all();
};

Slide 37: Lecture 11: Multithreading and Condition Variables

int main(int argc, const char *argv[]) {
  size_t permits = 4;
  mutex forks[5], m;
  condition_variable_any cv;
  thread philosophers[5];
  for (size_t i = 0; i < 5; i++) {
    mutex& left = forks[i], & right = forks[(i + 1) % 5];
    philosophers[i] = 
       thread(philosopher, i, ref(left), ref(right), ref(permits), ref(cv), ref(m));
  }
  for (thread& p: philosophers) p.join();
  return 0;
}

Slide 38: Lecture 11: Multithreading and Condition Variables

static void waitForPermission(size_t& permits, condition_variable_any& cv, mutex& m) {
  lock_guard<mutex> lg(m);
  while (permits == 0) cv.wait(m);
  permits--;
}

static void grantPermission(size_t& permits, condition_variable_any& cv, mutex& m) {
  lock_guard<mutex> lg(m);
  permits++;
  if (permits == 1) cv.notify_all();
}

Slide 39: Lecture 11: Multithreading and Condition Variables

static void waitForPermission(size_t& permits, condition_variable_any& cv, mutex& m) {
  lock_guard<mutex> lg(m);
  while (permits == 0) cv.wait(m);
  permits--;
}

static void grantPermission(size_t& permits, condition_variable_any& cv, mutex& m) {
  lock_guard<mutex> lg(m);
  permits++;
  if (permits == 1) cv.notify_all();
}

Slide 40: Lecture 11: Multithreading and Condition Variables

template <Predicate pred>
void condition_variable_any::wait(mutex& m, Pred pred) {
  while (!pred()) wait(m);
}

static void waitForPermission(size_t& permits, condition_variable_any& cv, mutex& m) {
  lock_guard<mutex> lg(m);
  cv.wait(m, [&permits] { return permits > 0; });
  permits--;
}

Slide 41: Lecture 11: Multithreading and Condition Variables

Slide 42: Lecture 11: Multithreading and Condition Variables

void semaphore::wait() {
  lock_guard<mutex> lg(m);
  cv.wait(m, [this] { return value > 0; })
  value--;
}
void semaphore::signal() {
  lock_guard<mutex> lg(m);
  value++;
  if (value == 1) cv.notify_all();
}

Slide 43: Lecture 11: Multithreading and Condition Variables

static void philosopher(size_t id, mutex& left, mutex& right, semaphore& permits) {
  for (size_t i = 0; i < 3; i++) {
    think(id);
    eat(id, left, right, permits);
  }
}

int main(int argc, const char *argv[]) {
  semaphore permits(4);
  mutex forks[5];
  thread philosophers[5];
  for (size_t i = 0; i < 5; i++) {
    mutex& left = forks[i], & right = forks[(i + 1) % 5];
    philosophers[i] = thread(philosopher, i, ref(left), ref(right), ref(permits));
  }
  for (thread& p: philosophers) p.join();
  return 0;
}

Slide 44: Lecture 11: Multithreading and Condition Variables

static void eat(size_t id, mutex& left, mutex& right, semaphore& permits) {
  permits.wait();
  left.lock();
  right.lock();
  cout << oslock << id << " starts eating om nom nom nom." << endl << osunlock;
  sleep_for(getEatTime());
  cout << oslock << id << " all done eating." << endl << osunlock;
  permits.signal();
  left.unlock();
  right.unlock();
}

Slide 45: Lecture 11: Multithreading and Condition Variables

Slide 46: Lecture 11: Multithreading and Condition Variables

Slide 47: Recap

 

Next time: more about concurrency directives