#!/usr/local/env python

import pp
import glob
import collections

def googlebooks_counts(filename):
    """The lines in these files have the following format:
    word TAB year TAB match_count TAB volume_count NEWLINE"""
    counts = collections.defaultdict(int)
    for line in gzip.open(filename):
        fields = line.split("\t")
        counts[fields[0]] += int(fields[2])
    return counts

def get_counts(filename):
    """Count distribution of the words in the file. In the MapReduce framework, this is like the "map" function."""
    counts = collections.defaultdict(int)
    for line in open(filename):
        for w in tokenizer(line):
            counts[w] += 1
    return counts

def tokenizer(s):
    return s.strip().lower().split()

def merge_counts(results, d):
    """Merge the results from the jobs. In the MapReduce framework, this is like the "reduce" function."""
    for key, val in d.iteritems():
        results[key] += val
    return results

def parallel_wc(filenames, 
                map_func=get_counts, 
                reduce_func=merge_counts,
                additional_args_for_map_func=[],
                libraries=('collections',), # Libraries needed by map_func, as strings.
                externalities=(tokenizer,)  # Functions needed by get_counts, as the names of functions.                
                ):

    job_server = pp.Server(ppservers=())

    # Build the jobs:
    jobs = []
    for filename in filenames:        
        args_for_map_function = [filename]
        args_for_map_function += additional_args_for_map_func
        args_for_map_function = tuple(args_for_map_function)
        job = job_server.submit(map_func, args_for_map_function, externalities, libraries)
        jobs.append(job)
    print "Built %s jobs; running them ..." % len(jobs)
    
    # Run the first job to get the type of the objects involved:
    results = jobs[0]() 
    print 'Job 1 completed and merged'
    # Run the remaining jobs, merging as we go:
    for job_num, job in enumerate(jobs[1: ]):
        # Call the job like a method:
        batch_results = job()
        results = reduce_func(results, batch_results)
        print 'Job %s completed and merged' % (job_num+2)    
        
    # Merged results:
    return results

def sequential_wc(filenames, 
                  map_func=get_counts, 
                  reduce_func=merge_counts):
    """Get the counts file-by-file, for comparison with the parallel version."""
    results = collections.defaultdict(int)
    for file_num, filename in enumerate(filenames):
        counts = map_func(filename)
        results = reduce_func(results, counts)
        print 'Filename %s completed and merged' % (file_num+1)
    return results
    
        
if __name__ == '__main__':

    # Some POS tagged data:
    filenames = glob.glob('tagged/*.txt')

    # Test parallel:
    counts = parallel_wc(filenames)

    # Test with sequential processing:
    counts = sequential_wc(filenames)

    # Parallel processing of two big Google Books filesm using googlebooks_counts as the map_func:
    # gb_filenames = ['googlebooks-eng-all-1gram-20120701-a.gz',
    #                 'googlebooks-eng-all-1gram-20120701-b.gz']
    # parallel_wc(gb_filenames, 
    #             map_func=googlebooks_counts, 
    #             reduce_func=merge_counts, 
    #             libraries=('collections', 'gzip'), 
    #             externalities=())
    

