Lab Handout 3: Parallel Processing


Get Started

Clone the lab starter code by using the command below. This command creates a lab3 directory containing the project files.

git clone /afs/ir/class/cs110/repos/lab3/shared lab3

Next, pull up the online lab checkoff and have it open in a browser so you can jot things down as you go.

Problem 1: Analyzing parallel mergesort

Consider the architecturally interesting portion of the mergesort executable, which launches 128 peer processes to cooperatively sort an array of 128 randomly generated numbers.  The implementations of createSharedArray and freeSharedArray are omitted for the time being. You can also find this code, with comments (omitted below for brevity), in mergesort.cc.

static bool shouldKeepMerging(size_t startIndex, size_t reach, size_t length) {
  return startIndex % reach == 0 && reach <= length;
}

static void repeatedlyMerge(int numbers[], size_t length, size_t startIndex) {
  int *base = numbers + startIndex;
  for (size_t reach = 2; shouldKeepMerging(startIndex, reach, length); reach *= 2) {
    raise(SIGSTOP);
    inplace_merge(base, base + reach / 2, base + reach);
  }
  exit(0);
}

static void createMergers(int numbers[], pid_t workers[], size_t length) {
  for (size_t workerIndex = 0; workerIndex < length; workerIndex++) {
    workers[workerIndex] = fork();
    if (workers[workerIndex] == 0) {
      repeatedlyMerge(numbers, length, workerIndex);
    }
  }
}

static void orchestrateMergers(int numbers[], pid_t workers[], size_t length) {
  size_t step = 1;
  while (step <= length) {

    // Wait for all still-remaining workers to stop or terminate
    for (size_t start = 0; start < length; start += step) {
      waitpid(workers[start], NULL, WUNTRACED);
    }

    // Continue half of the workers
    step *= 2;
    for (size_t start = 0; start < length; start += step) {
      kill(workers[start], SIGCONT);
    }
  }
}

static void mergesort(int numbers[], size_t length) {
  pid_t workers[length];
  createMergers(numbers, workers, length);
  orchestrateMergers(numbers, workers, length);
}

int main(int argc, char *argv[]) {
  for (size_t trial = 1; trial <= kNumTrials; trial++) {
    int *numbers = createSharedRandomArray(kNumElements);    
    mergesort(numbers, kNumElements);

    // Check if the resulting array is in fact sorted
    bool sorted = is_sorted(numbers, numbers + kNumElements);
    cout << "\rTrial #" << setw(5) << setfill('0') << trial << ": ";
    cout << (sorted ? "\033[1;34mSUCCEEDED!\033[0m" : "\033[1;31mFAILED!   \033[0m") << flush;
    freeSharedArray(numbers, kNumElements);
    if (!sorted) { 
      cout << endl << "mergesort is \033[1;31mBROKEN.... please fix!\033[0m" << flush; 
      break; 
    }
  }
  cout << endl;
  return 0;
}

The program presented above is a nod to concurrent programming and whether parallelism can reduce the asymptotic running time of an algorithm (in this case, mergesort).  We’ll lead you through a series of questions to reinforce your multiprocessing and signal skills and to understand why the asymptotic running time of an algorithm can sometimes be improved in a parallel programming world.   For reasons discussed below, this program works because the address in the numbers variable is cloned across the 128 fork calls, and this particular address maps to the same set of physical addresses in all 128 processes (and that’s different than what usually happens).   The program successfully sorts any array of length 128 by relying on 128 independent processes.  In a nutshell, the above program works because:

  • All even numbered workers (e.g. workers[0], workers[2], etc.) self-halt, while all odd numbered workers terminate immediately.
  • Once all even numbered workers have self-halted, each is instructed to carry on to call inplace_merge (a C++ built-in) to potentially update the sequence so that numbers[0] <= numbers[1], numbers[2] <= numbers[3], etc.  In general, inplace_merge(first, mid, last) assumes the two ranges [first, mid) and [mid, last) are already sorted in non-decreasing order, and places the merged result in [first, last).
  • Once all neighboring pairs have been merged into sorted sub-arrays of length 2, workers[0], workers[4], workers[8], etc. all self-halt while workers[2], workers[6], workers[10], etc. all exit.
  • Once all remaining workers self-halt, each is instructed to continue to merge the 64 sorted sub-arrays of length 2 into 32 sorted sub-arrays of length 4.
  • The algorithm continues as above, where half of the remaining workers terminate while the other half continue to repeatedly merge larger and larger sub-arrays until only workers[0] remains, at which point workers[0] does one final merge before exiting.  The end product is a sorted array of length 128, and that's pretty awesome.

Truth be told, the mergesort algorithm we've implemented is more of theoretical interest than practical.  But it's still a novel example of parallel programming that rings much more relevant and real-world than parent-and-pentuplets-go-to-disney.

Use the following short answer questions to guide the discussion.

  • Why is the raise(SIGSTOP) line within the implementation of repeatedlyMerge necessary?
  • When the implementation of orchestrateMergers executes the step *= 2; line the very first time, all worker processes have either terminated or self-halted.  Explain why that’s guaranteed.
  • The repeatedlyMerge function relies on a reach variable, and the orchestrateMergers function relies on a step variable.  Each of the two variables doubles with each iteration.  What are the two variables accomplishing?
  • Had we replaced the one use of WUNTRACED with a 0, would the overall program still correctly sort an arbitrary array of length 128?  Why or why not?
  • Had we instead replaced the one use of WUNTRACED with WUNTRACED | WNOHANG instead, would the overall program still correctly sort an arbitrary array of length 128?  Why or why not?
  • Assume the following implementation of orchestrateMergers replaces the original version.  Would the overall program always successfully sort an arbitrary array of length 128?  Why or why not?
static void orchestrateMergers(int numbers[], pid_t workers[], size_t length) {
  for (size_t step = 1; step <= length; step *= 2) {
    for (size_t start = 0; start < length; start += step) {
      int status;
      waitpid(workers[start], &status, WUNTRACED);
      if (WIFSTOPPED(status)) {
          kill(workers[start], SIGCONT);
        }
    }
  }
}  
  • Now assume the following implementation of orchestrateMergers replaces the original version.  Note the inner for loop counts down instead of up.  Would the overall program always successfully sort an arbitrary array of length 128?  Why or why not?
static void orchestrateMergers(int numbers[], pid_t workers[], size_t length) {
  for (size_t step = 1; step <= length; step *= 2) {
    for (ssize_t start = length - step; start >= 0; start -= step) {
      int status;
      waitpid(workers[start], &status, WUNTRACED);
      if (WIFSTOPPED(status)) {
          kill(workers[start], SIGCONT);
        }
    }
  }
}  

The createSharedRandomArray function (defined in memory.h and memory.cc in your lab3 repo) sets aside space for an array of length integers and seeds it with random numbers.  It does so using the mmap function you've seen in Assignment 1 and 2, and you also saw it if you tried out strace last week during discussion section.

int *createSharedRandomArray(size_t length) {
    // Allocate space for the array
  int *numbers = static_cast<int *>(mmap(NULL, length * sizeof(int), 
    PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0));

  // Fill the array with random numbers
  RandomGenerator rgen;
  for (size_t i = 0; i < length; i++) {
    numbers[i] = rgen.getNextInt(kMinValue, kMaxValue);
  }

  return numbers;
}

The mmap function takes the place of malloc here, because it sets up space not in the heap, but in an undisclosed segment that other processes can see and touch (that’s what MAP_ANONYMOUS and MAP_SHARED mean).

  • Normally virtual address spaces are private and inaccessible to other processes, but that’s clearly not the case here.  Given what we have discussed about virtual-to-physical address mapping, explain what the operating system must do to support this so that only the mergers have shared access but arbitrary, unrelated processes don’t?
  • Virtual memory is one form of virtualization used so that the above program works.  Describe one other form of virtualization you see.
  • Assuming the implementation of inplace_merge is O(n), explain why the running time of our parallel mergesort is O(n) instead of the O(n log n) normally ascribed to the sequential version.  (Your explanation should be framed mathematically; it’s not enough to just say it’s parallel.)