Parallelism in One Line

Published: 2015-05-13

Python has a terrible rep when it comes to its parallel processing capabilities. Ignoring the standard arguments about its threads and the GIL (which are mostly valid), the real problem I see with parallelism in Python isn't a technical one, but a pedagogical one. The common tutorials surrounding Threading and Multiprocessing in Python, while generally excellent, are pretty “heavy.” They start in the intense stuff, and stop before they get to the really good, day-to-day useful parts.

Traditional Example

A quick survey of the top DDG results for “Python threading tutorial” shows that just about every single one of them gives the same Class + Queue based example.

The de-facto, intro to threading/multiprocessing, producer/Consumer example code:

#Example.py
'''
Standard Producer/Consumer Threading Pattern
'''

import time
import threading
import Queue

class Consumer(threading.Thread):
  def __init__(self, queue):
    threading.Thread.__init__(self)
    self._queue = queue

  def run(self):
    while True:
      # queue.get() blocks the current thread until
      # an item is retrieved.
      msg = self._queue.get()
      # Checks if the current message is
      # the "Poison Pill"
      if isinstance(msg, str) and msg == 'quit':
        # if so, exists the loop
        break
      # "Processes" (or in our case, prints) the queue item
      print "I'm a thread, and I received %s!!" % msg
    # Always be friendly!
    print 'Bye byes!'


def Producer():
  # Queue is used to share items between
  # the threads.
  queue = Queue.Queue()

  # Create an instance of the worker
  worker = Consumer(queue)
  # start calls the internal run() method to
  # kick off the thread
  worker.start()

  # variable to keep track of when we started
  start_time = time.time()
  # While under 5 seconds..
  while time.time() - start_time < 5:
    # "Produce" a piece of work and stick it in
    # the queue for the Consumer to process
    queue.put('something at %s' % time.time())
    # Sleep a bit just to avoid an absurd number of messages
    time.sleep(1)

  # This the "poison pill" method of killing a thread.
  queue.put('quit')
  # wait for the thread to close down
  worker.join()


if __name__ == '__main__':
  Producer()

Mmm.. Smell those Java roots.

Now, I don’t want to give the impression that I think the Producer / Consumer way of handling threading/multiprocessing is wrong — because it’s definitely not. In fact it is perfect for many kinds of problems. However, what I do think is that it’s not the most useful for day-to-day scripting.

The Problems (as I see them)

For one, you need a boiler-plate class in order to do anything useful. Secondly, you’ll need to maintain a Queue through which you can pipe objects, and to top if all off, you’ll need methods on both ends of the pipe in order to do the actual work (likely involving another queue if you want to communicate two ways or store results).

More workers, more problems.

From here, next thing you’d likely do is make a pool of those worker classes in order to start squeezing some speed out of your Python. Below is a variation of the example code given in the excellent IBM tutorial on threading. It’s a very common scenario in which you spread the task of retrieving web pages across multiple threads.



#Example2.py
'''
A more realistic thread pool example
'''

import time
import threading
import Queue
import urllib2

class Consumer(threading.Thread):
  def __init__(self, queue):
    threading.Thread.__init__(self)
    self._queue = queue

  def run(self):
    while True:
      content = self._queue.get()
      if isinstance(content, str) and content == 'quit':
        break
      response = urllib2.urlopen(content)
    print 'Bye byes!'


def Producer():
  urls = [
    'http://www.python.org', 'http://www.yahoo.com'
    'http://www.scala.org', 'http://www.google.com'
    # etc..
  ]
  queue = Queue.Queue()
  worker_threads = build_worker_pool(queue, 4)
  start_time = time.time()

  # Add the urls to process
  for url in urls:
    queue.put(url) 
  # Add the poison pillv
  for worker in worker_threads:
    queue.put('quit')
  for worker in worker_threads:
    worker.join()

  print 'Done! Time taken: {}'.format(time.time() - start_time)

def build_worker_pool(queue, size):
  workers = []
  for _ in range(size):
    worker = Consumer(queue)
    worker.start()
    workers.append(worker)
  return workers

if __name__ == '__main__':
  Producer()

Works like a charm, but look at all that code! Now we've got setup methods, lists of threads to keep track of, and worst of all, if you’re anywhere as dead-lock prone as I am, a bunch of join statements to issue. And It only gets more complex from here!

What’s been accomplished so far? A whole lotta nothin. Just about everything in the above code is pure plumbing. It’s boiler-plate-y, It’s error prone (Hell, I even forgot to call task_done() on the queue object while writing this), and it’s a lot of work for little payoff. Luckily, there’s a much better way.

Introducing: Map

Map is a cool little function, and the key to easily injecting parallelism into your Python code. For those unfamiliar, map is something lifted from functional languages like Lisp. It is a function which maps another function over a sequence. e.g.


urls = ['http://www.yahoo.com', 'http://www.reddit.com']
results = map(urllib2.urlopen, urls)

This applies the method urlopen, on each item in the passed in sequence and stores all of the results in a list. It is more or less equivalent to

results = []
for url in urls:
    results.append(urllib2.urlopen(url))

Map handles the iteration over the sequence for us, applies the function, and stores all of the results in a handy list at the end.

Why does this matter? Because with the right libraries, map makes running things in parallel completely trivial!

Parallel versions of the map function are provided by two libraries: multiprocessing, and also its little known, but equally fantastic step child: multiprocessing.dummy.

Digression: What’s that? Never heard of the threading clone of multiprocessing library called dummy? I hadn't either until very recently. It has all of ONE sentence devoted to it in the multiprocessing documentation page. And that sentence pretty much boils down to “Oh yeah, and this thing exists.” It’s tragically undersold, I tell you!

Dummy is an exact clone of the multiprocessing module. The only difference is that, whereas multiprocessing works with processes, the dummy module uses threads (which come with all the usual Python limitations). So anything that applies to one, applies to the other. It makes it extremely easy to hop back and forth between the two. Which is especially great for exploratory programming when you’re not quite sure if some framework call is IO or CPU bound.

Getting Started

To access the parallel versions of the map functions the first thing you need to do is import the modules that contain them:

from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool

and instantiate their Pool objects in the code:

pool = ThreadPool()

This single statement handles everything we did in the seven line build_worker_pool function from example2.py. Namely, It creates a bunch of available workers, starts them up so that they’re ready to do some work, and stores all of them in variable so that they’re easily accessed.

The pool objects take a few parameters, but for now, the only one worth noting is the first one: processes. This sets the number of workers in the pool. If you leave it blank, it will default to the number of Cores in your machine.

In the general case, if you’re using the multiprocessing pool for CPU bound tasks, more cores equals more speed (I say that with a lot of caveats). However, when threading and dealing with network bound stuff, things seem to vary wildly, so it’s a good idea to experiment with the exact size of the pool.

pool = ThreadPool(4) # Sets the pool size to 4

If you run too many threads, you’ll waste more time switching between then than doing useful work, so it’s always good to play around a little bit until you find the sweet spot for the task at hand.

So, now with the pool objects created, and simple parallelism at our fingertips, let’s rewrite the url opener from example2.py!

import urllib2
from multiprocessing.dummy import Pool as ThreadPool

urls = [
  'http://www.python.org',
  'http://www.python.org/about/',
  'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
  'http://www.python.org/doc/',
  'http://www.python.org/download/',
  'http://www.python.org/getit/',
  'http://www.python.org/community/',
  'https://wiki.python.org/moin/',
  'http://planet.python.org/',
  'https://wiki.python.org/moin/LocalUserGroups',
  'http://www.python.org/psf/',
  'http://docs.python.org/devguide/',
  'http://www.python.org/community/awards/'
  # etc..
  ]

# Make the Pool of workers
pool = ThreadPool(4)
# Open the urls in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)
#close the pool and wait for the work to finish
pool.close()
pool.join()

Look at that! The code that actually does work is all of 4 lines. 3 of which are simple bookkeeping ones. The map call handles everything our previous 40 line example did with ease! For funzies, I timed both approaches as well as different pool sizes.


results = []
for url in urls:
  result = urllib2.urlopen(url)
  results.append(result)

# # ------- VERSUS ------- #


# # ------- 4 Pool ------- #
pool = ThreadPool(4)
results = pool.map(urllib2.urlopen, urls)

# # ------- 8 Pool ------- #

pool = ThreadPool(8)
results = pool.map(urllib2.urlopen, urls)

# # ------- 13 Pool ------- #

pool = ThreadPool(13)
results = pool.map(urllib2.urlopen, urls)

Pretty awesome! And also shows why it’s good to play around a bit with the pool size. Any pool size greater than 9 quickly lead to diminishing returns on my machine.

Real World Example 2:

Thumbnailing thousands of images

Let’s now do something CPU bound! A pretty common task for me at work is manipulating massive image folders. One of those transformations is creating thumbnails. It is ripe for being run in parallel.

The basic single process setup


from multiprocessing import Pool
from PIL import Image

SIZE = (75,75)
SAVE_DIRECTORY = 'thumbs'

def get_image_paths(folder):
  return (os.path.join(folder, f)
      for f in os.listdir(folder)
      if 'jpeg' in f)

def create_thumbnail(filename):
  im = Image.open(filename)
  im.thumbnail(SIZE, Image.ANTIALIAS)
  base, fname = os.path.split(filename)
  save_path = os.path.join(base, SAVE_DIRECTORY, fname)
  im.save(save_path)

if __name__ == '__main__':
  folder = os.path.abspath(
    '11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
  os.mkdir(os.path.join(folder, SAVE_DIRECTORY))

  images = get_image_paths(folder)

  for image in images:
    create_thumbnail(image)

A little hacked together for example, but in essence, a folder is passed into the program, from that it grabs all of the images in the folder, then finally creates the thumbnails and saves them to their own directory.

On my machine, this took 27.9 seconds to process ~6000 images.

If we replace the for loop with a parallel map call:

from multiprocessing import Pool
from PIL import Image

SIZE = (75,75)
SAVE_DIRECTORY = 'thumbs'

def get_image_paths(folder):
  return (os.path.join(folder, f)
      for f in os.listdir(folder)
      if 'jpeg' in f)

def create_thumbnail(filename):
  im = Image.open(filename)
  im.thumbnail(SIZE, Image.ANTIALIAS)
  base, fname = os.path.split(filename)
  save_path = os.path.join(base, SAVE_DIRECTORY, fname)
  im.save(save_path)

if __name__ == '__main__':
  folder = os.path.abspath(
    '11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840')
  os.mkdir(os.path.join(folder, SAVE_DIRECTORY))

  images = get_image_paths(folder)

  pool = Pool()
    pool.map(create_thumbnail, images)
    pool.close()
    pool.join()

5.6 seconds!

That’s a pretty massive speedup for only changing a few lines of code. The production version of this is even faster by splitting cpu and io tasks into their own respective processes and threads — which is usually a recipe for deadlocked code. However, due to the explicit nature of map, and the lack of manual thread management, it feels remarkably easy to mix and match the two in a way that is clean, reliable, and easy to debug.

So there it is. Parallelism in (almost) one line.