Parallel MapReduce in Python in Ten Minutes

Almost everyone has heard of Google’s MapReduce framework, but very few have ever hacked around with the idea of map and reduce. These two idioms are borrowed from functional programming, and form the basis of Google’s framework. Although Python is not a functional programming language, it has built-in support for both of these concepts.

A map operation involves taking a function f and applying it on a per-element basis to a given list L. For example, if f was the square-root function, then the map would take the square of each element in L. A reduce operation (also known as folding) is similar in that it also applies a function g to a given list L, but instead of isolating on each element, g systematically accumulates or collapses the contents of L into a single result. The canonical example of this would g performing a summation of the elements in L.

Now, there’s a lot of documentation on MapReduce so I won’t replicate talk about it much here. I’ll just briefly mention that it is an efficient way of distributing the workload of huge embarrassingly parallel problems over multiple processors, or over a grid of machines. The map operation takes input which has been fragmented into pieces by a master node, and applies its function to the input on each of the worker nodes in the system. The data is then organized and the useful information extracted by the reduce step on various worker nodes in the system. Google provides a diagram of the operation in their documentation:

Pretty straightforward... right?

Anyways, this post isn’t specifically about MapReduce, its about Python. I mentioned above that Python provides built-in map and reduce functionality. Lets say we have a problem where we have several lists, and we’d like to know the number of total elements in all the lists combined. In Python, a solution (using lambdas for brevity) might look like this:

a = [1, 2, 3]
b = [4, 5, 6, 7]
c = [8, 9, 1, 2, 3]
L = map(lambda x:len(x), [a, b, c])

# L == [3, 4, 5]
N = reduce(lambda x, y: x+y, L)
# N == 12
# Or, if we want to be fancy and do it in one line
N = reduce(lambda x, y: x+y, map(lambda x:len(x), [a, b, c]))

Now, this is an interesting way to think about getting the combined lengths of these lists, but it isn’t any more efficient than getting the lengths with any other conventional method. But what if we had a large problem which we could solve in parallel?

Let’s pretend we’re interested in finding the frequencies of title-cased words (proper nouns, starts of sentences) in an enormous corpus, and we’d like to use map and reduce to help compute this problem on our multi-core computer. Most people’s first instinct would be “let’s break out the Python threading module!” which, of course, would be a mistake – thanks to something called the global interpreter lock used in the Python interpreter. The GIL basically handicaps parallelism in the Python interpreter, by serializing execution of user-threads for various complicated reasons. A solution is to use the Python multiprocessing module, which will spawn entirely separate processes to execute common code on. And, as you’d expect from Python, it provides you with a super-easy-to-use Pool class to manage your own worker process pool.

Considerably more frightening than the programming language

Probably the best thing about the Pool class is that it provides a map function of its own – which will automatically partition and distribute input to a user-specified function in pool of worker processes. This is like map in the functional sense, but it doesn’t necessarily mean it’s performing the map step in MapReduce - you’ll see what I mean. So let’s see some code for the problem I mentioned above; we’ll start with the map, partition, and reduce functions:

"""
Given a list of tokens, return a list of tuples of
titlecased (or proper noun) tokens and a count of '1'.
Also remove any leading or trailing punctuation from
each token.
"""
def Map(L):

  results = []
  for w in L:
    # True if w contains non-alphanumeric characters
    if not w.isalnum():
      w = sanitize (w)

    # True if w is a title-cased token
    if w.istitle():
      results.append ((w, 1))

  return results

"""
Group the sublists of (token, 1) pairs into a term-frequency-list
map, so that the Reduce operation later can work on sorted
term counts. The returned result is a dictionary with the structure
{token : [(token, 1), ...] .. }
"""
def Partition(L):
  tf = {}
  for sublist in L:
    for p in sublist:
      # Append the tuple to the list in the map
      try:
        tf[p[0]].append (p)
      except KeyError:
        tf[p[0]] = [p]
  return tf

"""
Given a (token, [(token, 1) ...]) tuple, collapse all the
count tuples from the Map operation into a single term frequency
number for this token, and return a final tuple (token, frequency).
"""
def Reduce(Mapping):
  return (Mapping[0], sum(pair[1] for pair in Mapping[1]))

So, what’s going on here? Well, there’s three steps to this process. First of all, the Map function receives independent lists of tokens from the source text. In parallel, it will attempt to strip off any leading or trailing punctuation (lines 12-13), and if it is a properly capitalized word, adds it to the results list with a count of 1 (lines 16-17). All such tokens are processed in this way, and duplicates are also appended to this list.

After the Map operation is complete, the program needs to organize all the processed data. This is where the Partition function comes in. This function receives a huge list of lists – each constituent list was processed by an instance of Map and thus contains a sequence of (token, 1) pairs. This function organizes all the data into a dictionary, so that now each token key has a list of (token, 1) tuples as a value.

The final step is in the Reduce function. It receives arbitrary key:value pairs from the dictionary built in Partition, which it uses to generate the final term counts for title-cased words. In parallel, the list in each dictionary entry is collapsed, summed, and the final count for each token key is returned by each instance of Reduce. The final result is a list of (token, count) tuples, where count refers to the total count of the token over the entire document.

Let’s get to the Python multiprocessing code, and the other stuff we need to make the above functions work. I’ll create a worker pool of eight processes, and distribute the above workload to those processes via the Pool.map function:

import sys
from multiprocessing import Pool
"""
If a token has been identified to contain
non-alphanumeric characters, such as punctuation,
assume it is leading or trailing punctuation
and trim them off. Other internal punctuation
is left intact.
"""
def sanitize(w):

  # Strip punctuation from the front
  while len(w) > 0 and not w[0].isalnum():
    w = w[1:]

  # String punctuation from the back
  while len(w) > 0 and not w[-1].isalnum():
    w = w[:-1]

  return w
"""
Load the contents the file at the given
path into a big string and return it.
"""
def load(path):

  word_list = []
  f = open(path, "r")
  for line in f:
    word_list.append (line)

  # Efficiently concatenate Python string objects
  return (''.join(word_list)).split ()

"""
A generator function for chopping up a given list into chunks of
length n.
"""
def chunks(l, n):
  for i in xrange(0, len(l), n):
    yield l[i:i+n]

"""
Sort tuples by term frequency, and then alphabetically.
"""
def tuple_sort (a, b):
  if a[1] < b[1]:
    return 1
  elif a[1] > b[1]:
    return -1
  else:
    return cmp(a[0], b[0])

if __name__ == '__main__':

  if (len(sys.argv) != 2):
    print "Program requires path to file for reading!"
    sys.exit(1)

  # Load file, stuff it into a string
  text = load (sys.argv[1])

  # Build a pool of 8 processes
  pool = Pool(processes=8,)

  # Fragment the string data into 8 chunks
  partitioned_text = list(chunks(text, len(text) / 8))

  # Generate count tuples for title-cased tokens
  single_count_tuples = pool.map(Map, partitioned_text)

  # Organize the count tuples; lists of tuples by token key
  token_to_tuples = Partition(single_count_tuples)

  # Collapse the lists of tuples into total term frequencies
  term_frequencies = pool.map(Reduce, token_to_tuples.items())

  # Sort the term frequencies in nonincreasing order
  term_frequencies.sort (tuple_sort)

  for pair in term_frequencies[:20]:
    print pair[0], ":", pair[1]

As you can see, with three calls to the Python multiprocessing.Pool class, I can create a worker pool of 8 processes, and perform a distributed map and a distributed reduce operation (lines 64, 70, 76). Pool.map takes care of the partitioning and distribution of the data over the worker pool by itself. Don’t get confused by the two calls to map though – although it is technically a map operation, the first call is mapping a second map function to the data, and the second is mapping a reduce function to the data.

Finally, it prints out the top twenty token:count pairs. As you can see, the Python multiprocessing code is really easy to use. You don’t need to organize your problem as a MapReduce problem in order to properly use the process pool class either – I just used this opportunity for illustration. This definitely isn’t the most efficient example of how to use map and reduce to solve big problems, but I hope it’s a reasonably clear introduction. Happy Hacking!

About these ads

8 responses to “Parallel MapReduce in Python in Ten Minutes

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Follow

Get every new post delivered to your Inbox.

%d bloggers like this: