Tag Archives: concurrency

Thoughts on the Go Programming Language

Part of the description of Go from its homepage states that it’s a “compiled language that feels like a dynamically typed, interpreted language”. In my relatively brief experience with the language, that is a statement I can attest to. I feel like an expert in Go could build natively-compiled applications at a rate comparable to Python over C++ or Java. Part of this has to do with its extensive library, and partly because the language is just not particularly complicated. Not to mention that the language’s concurrency model is very intuitive.

My initial interest in Go stemmed from goroutines. These implement concurrency in an extremely simple and highly-abstracted fashion: concurrently-executing functions. There isn’t much more to it to than that; goroutines are apparently multiplexed over a pool of host threads which are hidden from the programmer. You can let them spin off on their own and terminate, or  synchronize and communicate with them using message-passing in the form of channels. In this sense, concurrency in Go feels a lot like Erlang. Some other cool features include closures and array slices. Technically Go isn’t object-oriented, but you’re able mimic it by using interfaces and embedding methods into structures. Also, Go is garbage-collected.

Actually, the features of Go make it feel like a combination of a bunch of different languages. There’s obvious inspiration from C, and some from what feels like Java (interfaces, GC), Erlang (message-passing and concurrency), functional languages in general (closures) and Python. Lets dive into an example which illustrates a few of the things I’ve mentioned so far.

Here’s the (highly synthetic) situation: We’d like to compute the square roots of a list of floating-point numbers in parallel using our own hand-rolled function implementing Newton’s Method. We’re going to compute each square root in a separate goroutine and collect each value through a separate channel to that goroutine. To make things more interesting, we’ll package a bunch of it up using a closure.

/* Compute and return the square root of x. Note
 * that return type comes after function parameters */
func newtons_method (x float64) float64 {

	/* No parentheses around conditional */
	if x == 0.0 {
		panic ("Divide by zero!") // throw exception
	}

	/* "Initialize" statement - variable type inferred by
 	 * compiler */
	last := float64(0.0)
	y := float64(x / 2.0)

	/* No "while" loops - iterate "for as long as" condition */
	for math.Fabs(last - y) > 0.0001 {
		last = y
		y = y - ((y * y - x) / (2 * y))
	}

	return y
}

/* Given a slice of floats, compute their square roots in
 * parallel and print the results. Note variable name
 * precedes the type. */
func compute_sqrts (L []float64) {

	/* Type defined at compile-time */
	n := len (L)

	/* A "slice" definition - used here as expandable array */
	var chans []chan float64

	/* Fill the slice with channels */
	for i := 0; i < n; i++ {
		chans = append(chans, make(chan float64))
	}

	/* For index, element in the list */
	for i, x := range L {

		/* Copy the range variables so the closures
		 * reference the right values */
		__x := x
		__i := i

		/* Define a quick closure and execute it in parallel
		 * as a goroutine via the go () statement */
		go func() {
			/* Calculate square root */
			var root float64 = newtons_method (float64(__x))
			chans[__i] <- root /* Write result to channel */
			close(chans[__i])  /* Close the channel */
		} ()
	}

	for i := range chans {
		x := <- chans[i] /* Read from channel */
		fmt.Println ("Square root of ", L[i], " is ", x)
	}
}

Things to note:

  1. Yes, there are like ten different ways to declare variables
  2. Yes, there are like ten different ways to declare for loop conditions (and no while loops)
  3. Yes, opening braces (‘{‘) must go on the same line as function definition or condition
  4. There is a new allocation directive called make(…) in addition to new(…)
  5. This program is primarily imperative
  6. Even though there is flexibility in how you define variables, they are still very much strongly typed

Obviously, the most interesting part of this little program is what’s going on in lines [50-55]. This is a function literal and in Go, all function literals are also closures; meaning, variables referenced by the function literal are in-scope for as long as necessary for the literal to complete. In addition, this function literal is preceded and followed by go and (). This executes the literal inside a goroutine, which as discussed earlier, will run concurrently with the main thread and all other goroutines (within reason).

The literal itself calls the newtons_method() function, and writes the return value into the closure’s channel. It then closes the channel, which would normally indicate that the closure has completed its work and executed. The values written to each of the respective channels will wait to be read, even after the goroutine has exited.

I mentioned earlier that you can play around with some pseudo-OO in Go, so I’ll give a brief example:

/* Interface definition - any structure which
 * implements newtons_method (x float64)
 * also implements this interface */
type SqrtInterface interface {
	newtons_method ()
}

/* Structure definition - this is also
 * a SqrtInterface type */
type SqrtPair struct {
	x float64
	sqrt_x float64
}

/* Overloaded newtons_method() which is bound
 * to pointers of type SqrtPair. Will reference
 * the members of the 'calling' struct. */
func (sp *SqrtPair) newtons_method () {

	/* ... */

	sp.sqrt_x = newtons_method (sp.x)
}

/* This does nothing except call member functions
 * of SqrtInterface interface objects */
func compute_for_pair (sp SqrtInterface) {
	sp.newtons_method ()
}

func main () {

	/* Note sp has type *SqrtPair */
	sp := new (SqrtPair)
	sp.x = 17
	compute_for_pair (sp)
	fmt.Println ("Square root of ", sp.x, " is ", sp.sqrt_x)
}

I hope that blows your mind, because it blew mine. Let me illustrate what’s going on:

  • Define an interface of type SqrtInterface which is basically empty except for a member function called newtons_method()
  • Define a structure of type SqrtPair with a couple of floats
  • Embed a function called newtons_method() into structures of type SqrtPair*
  • SqrtPair now implements the interface SqrtInterface
  • SqrtPair.newtons_method() can now be called on instantiated SqrtPair structures acting as SqrtInterface objects which modifies their internal members even though the method wasn’t present in the initial structure definition

So obviously there’s a lot of cool stuff happening inside Go. TIOBE ranks Go as the world’s 21st most popular programming language at the time of this post (interpret these rankings however you wish), beating out D, arguably one of its closest competitors, at 33rd. GitHub has a huge pile of Go projects. There’s no doubt that Go is a very powerful language, which is even supported by a GCC backend in addition to its standard compiler. It also comes with an enormous standard library with built-in items like HTTP servers. I like Go a lot, but there are some nagging (and probably petty) issues I have with it which probably means I won’t be using it as my language of choice in the near future:

  • Although minimalistic, I find the syntax kind of random. In some sense it can beneficial to mix the best aspects of Python and Erlang and C syntax together, I think it makes the end result kind of messy. For example, spliceVar = append(spliceVar, item) vs. mapVar[key] = item
  • I am frustrated that I can define variables ten different ways, including not needing to specify variable type, but I can’t put the type before the variable definition.
  • Similarly, for a language so flexible, I don’t know why it’s required to put braces on opening lines, and wrap one-line loops and if-statements in braces.
  • No templates. How can there be no templates you ask? There just aren’t. I don’t know why.
  • Although technically not object-oriented, you can mix in elements of OO as you see fit. I feel that this can lead to the same mix of paradigms that drives C++ people insane.
  • “Exported” variables and functions are denoted by title-case, getting rid of ‘extern’ and ‘public’. I don’t know what sort of problem this is trying to solve, but I suspect it can just lead to more confusion.

But to end on a positive note, a list of Go features I love playing around with:

  • Goroutines and channels
  • Closures / function literals
  • Huge standard library, including built-in RPC
  • Slices
  • Multiple return values (so good!)

Parallel MapReduce in Python in Ten Minutes

Almost everyone has heard of Google’s MapReduce framework, but very few have ever hacked around with the idea of map and reduce. These two idioms are borrowed from functional programming, and form the basis of Google’s framework. Although Python is not a functional programming language, it has built-in support for both of these concepts.

A map operation involves taking a function f and applying it on a per-element basis to a given list L. For example, if f was the square-root function, then the map would take the square of each element in L. A reduce operation (also known as folding) is similar in that it also applies a function g to a given list L, but instead of isolating on each element, g systematically accumulates or collapses the contents of L into a single result. The canonical example of this would g performing a summation of the elements in L.

Now, there’s a lot of documentation on MapReduce so I won’t replicate talk about it much here. I’ll just briefly mention that it is an efficient way of distributing the workload of huge embarrassingly parallel problems over multiple processors, or over a grid of machines. The map operation takes input which has been fragmented into pieces by a master node, and applies its function to the input on each of the worker nodes in the system. The data is then organized and the useful information extracted by the reduce step on various worker nodes in the system. Google provides a diagram of the operation in their documentation:

Pretty straightforward... right?

Anyways, this post isn’t specifically about MapReduce, its about Python. I mentioned above that Python provides built-in map and reduce functionality. Lets say we have a problem where we have several lists, and we’d like to know the number of total elements in all the lists combined. In Python, a solution (using lambdas for brevity) might look like this:

a = [1, 2, 3]
b = [4, 5, 6, 7]
c = [8, 9, 1, 2, 3]
L = map(lambda x:len(x), [a, b, c])

# L == [3, 4, 5]
N = reduce(lambda x, y: x+y, L)
# N == 12
# Or, if we want to be fancy and do it in one line
N = reduce(lambda x, y: x+y, map(lambda x:len(x), [a, b, c]))

Now, this is an interesting way to think about getting the combined lengths of these lists, but it isn’t any more efficient than getting the lengths with any other conventional method. But what if we had a large problem which we could solve in parallel?

Let’s pretend we’re interested in finding the frequencies of title-cased words (proper nouns, starts of sentences) in an enormous corpus, and we’d like to use map and reduce to help compute this problem on our multi-core computer. Most people’s first instinct would be “let’s break out the Python threading module!” which, of course, would be a mistake – thanks to something called the global interpreter lock used in the Python interpreter. The GIL basically handicaps parallelism in the Python interpreter, by serializing execution of user-threads for various complicated reasons. A solution is to use the Python multiprocessing module, which will spawn entirely separate processes to execute common code on. And, as you’d expect from Python, it provides you with a super-easy-to-use Pool class to manage your own worker process pool.

Considerably more frightening than the programming language

Probably the best thing about the Pool class is that it provides a map function of its own – which will automatically partition and distribute input to a user-specified function in pool of worker processes. This is like map in the functional sense, but it doesn’t necessarily mean it’s performing the map step in MapReduce – you’ll see what I mean. So let’s see some code for the problem I mentioned above; we’ll start with the map, partition, and reduce functions:

"""
Given a list of tokens, return a list of tuples of
titlecased (or proper noun) tokens and a count of '1'.
Also remove any leading or trailing punctuation from
each token.
"""
def Map(L):

  results = []
  for w in L:
    # True if w contains non-alphanumeric characters
    if not w.isalnum():
      w = sanitize (w)

    # True if w is a title-cased token
    if w.istitle():
      results.append ((w, 1))

  return results

"""
Group the sublists of (token, 1) pairs into a term-frequency-list
map, so that the Reduce operation later can work on sorted
term counts. The returned result is a dictionary with the structure
{token : [(token, 1), ...] .. }
"""
def Partition(L):
  tf = {}
  for sublist in L:
    for p in sublist:
      # Append the tuple to the list in the map
      try:
        tf[p[0]].append (p)
      except KeyError:
        tf[p[0]] = [p]
  return tf

"""
Given a (token, [(token, 1) ...]) tuple, collapse all the
count tuples from the Map operation into a single term frequency
number for this token, and return a final tuple (token, frequency).
"""
def Reduce(Mapping):
  return (Mapping[0], sum(pair[1] for pair in Mapping[1]))

So, what’s going on here? Well, there’s three steps to this process. First of all, the Map function receives independent lists of tokens from the source text. In parallel, it will attempt to strip off any leading or trailing punctuation (lines 12-13), and if it is a properly capitalized word, adds it to the results list with a count of 1 (lines 16-17). All such tokens are processed in this way, and duplicates are also appended to this list.

After the Map operation is complete, the program needs to organize all the processed data. This is where the Partition function comes in. This function receives a huge list of lists – each constituent list was processed by an instance of Map and thus contains a sequence of (token, 1) pairs. This function organizes all the data into a dictionary, so that now each token key has a list of (token, 1) tuples as a value.

The final step is in the Reduce function. It receives arbitrary key:value pairs from the dictionary built in Partition, which it uses to generate the final term counts for title-cased words. In parallel, the list in each dictionary entry is collapsed, summed, and the final count for each token key is returned by each instance of Reduce. The final result is a list of (token, count) tuples, where count refers to the total count of the token over the entire document.

Let’s get to the Python multiprocessing code, and the other stuff we need to make the above functions work. I’ll create a worker pool of eight processes, and distribute the above workload to those processes via the Pool.map function:

import sys
from multiprocessing import Pool
"""
If a token has been identified to contain
non-alphanumeric characters, such as punctuation,
assume it is leading or trailing punctuation
and trim them off. Other internal punctuation
is left intact.
"""
def sanitize(w):

  # Strip punctuation from the front
  while len(w) > 0 and not w[0].isalnum():
    w = w[1:]

  # String punctuation from the back
  while len(w) > 0 and not w[-1].isalnum():
    w = w[:-1]

  return w
"""
Load the contents the file at the given
path into a big string and return it.
"""
def load(path):

  word_list = []
  f = open(path, "r")
  for line in f:
    word_list.append (line)

  # Efficiently concatenate Python string objects
  return (''.join(word_list)).split ()

"""
A generator function for chopping up a given list into chunks of
length n.
"""
def chunks(l, n):
  for i in xrange(0, len(l), n):
    yield l[i:i+n]

"""
Sort tuples by term frequency, and then alphabetically.
"""
def tuple_sort (a, b):
  if a[1] < b[1]:
    return 1
  elif a[1] > b[1]:
    return -1
  else:
    return cmp(a[0], b[0])

if __name__ == '__main__':

  if (len(sys.argv) != 2):
    print "Program requires path to file for reading!"
    sys.exit(1)

  # Load file, stuff it into a string
  text = load (sys.argv[1])

  # Build a pool of 8 processes
  pool = Pool(processes=8,)

  # Fragment the string data into 8 chunks
  partitioned_text = list(chunks(text, len(text) / 8))

  # Generate count tuples for title-cased tokens
  single_count_tuples = pool.map(Map, partitioned_text)

  # Organize the count tuples; lists of tuples by token key
  token_to_tuples = Partition(single_count_tuples)

  # Collapse the lists of tuples into total term frequencies
  term_frequencies = pool.map(Reduce, token_to_tuples.items())

  # Sort the term frequencies in nonincreasing order
  term_frequencies.sort (tuple_sort)

  for pair in term_frequencies[:20]:
    print pair[0], ":", pair[1]

As you can see, with three calls to the Python multiprocessing.Pool class, I can create a worker pool of 8 processes, and perform a distributed map and a distributed reduce operation (lines 64, 70, 76). Pool.map takes care of the partitioning and distribution of the data over the worker pool by itself. Don’t get confused by the two calls to map though – although it is technically a map operation, the first call is mapping a second map function to the data, and the second is mapping a reduce function to the data.

Finally, it prints out the top twenty token:count pairs. As you can see, the Python multiprocessing code is really easy to use. You don’t need to organize your problem as a MapReduce problem in order to properly use the process pool class either – I just used this opportunity for illustration. This definitely isn’t the most efficient example of how to use map and reduce to solve big problems, but I hope it’s a reasonably clear introduction. Happy Hacking!