#!/usr/bin/ruby

# Simple code to run streaming map-reduce applications on a single node
# Works on Unix/Unix-like environments with a ruby installation

#Parse the input
require 'optparse'
def parse_inputs(options = Hash.new)
  options[:mapper] = "cat"
  options[:reducer] = "cat"
  options[:shuffler] = "shuffle_single_node.rb"
  options[:inputfile] = nil
  options[:outputfile] = nil
  options[:mapperparams] = []
  options[:reducerparams] = []
  options[:verbose] = nil
  
  optparse = OptionParser.new do |opts|
    opts.banner = "Usage: #{$0} [options]"
    
    opts.on("-m", "--mapper EXECUTABLE", "Use EXECUTABLE as the command line
  mapper; the default is the identity function") do |executable| 
      options[:mapper] = executable
    end
    
    opts.on("-r", "--reducer EXECUTABLE", "Use EXECUTABLE as the command line
    reducer; default is the identity function") do |executable| 
      options[:reducer] = executable
    end
    
    opts.on("-s", "--shuffler EXECUTABLE", "Use EXECUTABLE as the command line
    shuffler; default is shuffle_single_node.rb") do |executable| 
      options[:shuffler] = executable
    end
    
    opts.on("-i", "--inputfile file", "Use FILE as input; default is stdin") do |file| 
      options[:inputfile] = file
    end
    
    opts.on("--mapperparams STRING", "Passes the fields in STRING as
parameters to the mapper") do |string|
      options[:mapperparams] += string.split
    end

    opts.on("--reducerparams STRING", "Passes the fields in STRING as
parameters to the reducer") do |string|
      options[:reducerparams] += string.split
    end
    
    opts.on("-o", "--outputfile file", "Use FILE as output; default is stdout") do |file| 
      options[:outputfile] = file
    end
        
    opts.on("-v", "--verbose", "Display the actual command that gets run") do
      options[:verbose] = true
    end

    opts.on("-h", "--help", "Display this screen") do
      puts opts
      exit
    end
  end
  
  optparse.parse!
  return options
end


options = parse_inputs

#Assemble the shell command
command = "#{options[:mapper]}"
options[:mapperparams].each {|x| command += " #{x}"}
command = command + " < #{options[:inputfile]}" if options[:inputfile]

command = command + " | #{options[:shuffler]}"

command = command + " | #{options[:reducer]}"
options[:reducerparams].each {|x| command += " #{x}"}

command = command + " > #{options[:outputfile]}" if options[:outputfile]
puts "\nExecuting: #{command}" if options[:verbose]

#Run
system command

