Multithreading (really parallelizing...) Our Script Part 3


Our first coding steps an intro to the Python Multiprocessing module


Page 1 Page 2 Page 3

So when I planned this all out I was thinking I'd go through coding a producer consumer model using message passing and multiple processes. This was based on my previous experiencing programming socket based multiprocessing using C++ code but Inter Process Communication (IPC) mechanisms in python are not the most portable (mostly on Windows). Unfortunately Windows is still the predominant OS out there and fortunately the Python MultiProcessing module not only has usable IPC mechanisms that are more portable, but provides pre-built functionality to make the creation of a producer consumer as simple as pie (mmmm.... pie....).

So what is a producer consumer model? Well if you read the link the classical problem has to do with one thread of an application creating items for processing and another (or multiple other) actually performing the processing. The problem involves synchronization when a shared buffer is in use but a much simpler method is to simply not use a shared buffer (if that is an option). So if you look at our png optimizer we can break what it does down into the steps of:
  1. Identify all png files to be optimized
  2. Run optimization programs on these files in order
So a great way to do this is to have a producer identifying files (filenames representing work units to be "consumed") and a bunch of consumers running optimization programs on them (ie consuming the work units).

Quick look at the multiprocessing module


For in depth information about this module you'll want to take a look at the official documentation but I'm going to do a quick run through focussing on what we want from this module.

Why processes over threads? Processes and threads have a range of advantages and disadvantages. In the case of Python processes are usually much faster than threads depending on what exactly your code does. This is because Python has what is known as a Global Interpreter Lock or GIL. The GIL is one big lock to prevent multiple threads from conflicting in inappropriate ways. The GIL has a long history that I'm not going to get into but suffice to say that no matter what your code does the GIL will not be a factor related to multiprocessing (as every process will have its own GIL) and it might if you are doing threading. There may be other reasons as well, for example the first time I ever did multi process coding was because I was simulating a loosely coupled (no shared memory) supercomputer so I could develop for it on a regular computer. I suppose I could have done this with threads but processes seemed more natural (why risk taking advantage of something that a thread has and won't be available on the target platform (potentially by accident) when you can just better simulate it with processes that have identical capabilities to your target platform?)

The multiprocessing module is very powerful and provides multi processing support at a range of abstraction levels from the ability to create manage and communicate between processes anyway you desire, to the Pool structure which provides a generic reusable way of creating tasking out worker processes. To be perfectly honest I find the Pool structure to be very impressive and I'm not really sure why someone would not make use of it. I mean sure you might be able to create a custom interaction scheme that will be more performant but I don't see why that would be necessary for the types of things I would expect to be programming in Python. We are going to make use of Pools for our optimizer script.

Pools : the really interesting part


The python multiprocessing Pools capability is really powerful and I'm not really sure why you would ever need any more than it provides (well perhaps some way to connect to a remote worker pool using sockets and ip addresses but that is kind of beyond the current scope). The basics of it are that you create a Pool of workers and then give them tasks to do. Lets take a look at a really simple example (keep in mind that using Pools in the python interpreter will almost certainly fail so save this as a file and then run it from a console to see what happens).

#!/usr/bin/env python

import multiprocessing as mp
import os

def consume(x):
return (x*2, os.getpid())

pool = mp.Pool()

result = pool.apply(consume,[3])

print os.getpid()
print result

pool.close()
pool.join()


We start off by importing the multiprocessing module (and naming it mp to reduce typing,) and the os module (so we can use the getpid function for illustration purposes). Then we define a function which I have arbitrarily decided to call consume (you can call it whatever you want). This function has only one requirement in that it must take only a single argument (which can be a tuple or list or dictionary or whatever so you can pack lots of data, it just has to be packed in a single variable). In this example the function takes in x and then returns a tuple containing x*2 as the first element and the process id as the second element. We're going to output quite a few process ids while we work our way through how to use this module as it is a good way to figure out what processes are doing what. Then we create our Pool with the Pool function. Running Pool() with no arguments returns a worker pool with one worker process for every processor core on the system. I am not sure but I think that a processor which supports hyperthreading will end up with two processes for every processor core but it depends on how some other python internals work. You can also specify how many processes you want by passing in an integer to the function. For example if I had wanted only two workers I could have used Pool(2). Then I used the apply method to have one of the worker processes run the consume function on a single data element (3 was the data item for some reason the creator of this method decided you need to wrap the value in a sequence [3], I'm guessing this is to be similar to map?), I then print the current process id and finally print the result so you can see what happened. Finally I close the pool and join it (join will wait until all the processes in the pool complete before continuing, close must be called before join and this should eliminate the possibility of zombie processes hanging around in memory, doing nothing.)

If you save and run this script you'll see that the consume function returns a different process id than the one printed by the main script. This is because the function is being executed in one of the processes in our worker pool.

How to task out worker processes


In the last bit of example code we created a Pool of worker processes and then executed a single task. This isn't all that interesting because even though we ran the task in a different process we never had two or more processes actually working at the same time. The apply method we used to get our work done works by sending a single task to a worker process and then waiting until results are returned but there are a few more ways that we can task things our as well. To be perfectly honest I'm not totally sure why anyone would ever use apply as it is basically identical in functionality and slower in execution than simply running the task in the main process to begin with... If we for example though wanted to run a single task but allow our main process to do other things while that task was being accomplished we could use apply_async. This method takes the same arguments as apply but returns right away and allows you to do other things. If we were to alter the previous code to use apply_async we could do something like this:

#!/usr/bin/env python

import multiprocessing as mp
import os

def consume(x):
return (x*2, os.getpid())

def otherStuff():
return 45/2.0+4

pool = mp.Pool()

result = pool.apply_async(consume,[3])

otherStuff()

print os.getpid()
print result.get()

pool.close()
pool.join()


Of course the other stuff we're doing in the example is trivial and pointless but hopefully you see the idea. While apply would actually finish the calculations and directly return the result when we use apply_async the results may not actually be in yet so when we get to a point in our application where we actually need these results we need to call the get method associated with the return from apply_async. In the example this allows us to do math in both the main and a worker thread at the same time, we could also have multiple tasks and call apply_async for each of them. This would place the tasks in a queue and they would be sent automatically to the worker processes. I can see a use for apply_async when you have more than one kind of task to accomplish. I'm not going to bother with example code but for example if I had a number of different consume functions and each only had to run once or if potentially they had to run many times but I wanted the results by data element and not by function I would probably set up a loop and send a bunch of taskings to apply_async. If however, as in the case of our script, we have many data elements to be acted on but only a single function to call against them there are more powerful methods to get the job done.

The simplest of these methods is map. map will take a dataset, slice it up into chunks, send these chunks out to the worker processes and wait for them all to be finished before returning. Lets look at some code:

#!/usr/bin/env python

import multiprocessing as mp
import os

def consume(x):
return (x*2, os.getpid())

pool = mp.Pool()

dataset = [1,2,3,4,5,6,7,8,9,0,10,11,12,13,14,15,16,17,18,19,20,21,22,24]

result = pool.map(consume,dataset)

print os.getpid()
print result

pool.close()
pool.join()


Here you can see that we have one function we want to run with a dataset of many data elements. map automatically handles cutting the dataset into chunks, sending those chunks to the worker processes and storing the results when they are finished. If you run this you'll see that the results are in order and depending on how many cores you have each process gets approximately the same number of data elements to work on. We could easily use this for our script. We would first generate a list of all png files in the given directory then create a function that optimizes a single passed in file, then just send the dataset and reference to the function to map and move on with our lives. The problem is that map sort of assumes that each data element takes about the same amount of processing time and that you already have the dataset all figured out. The processing time issue really only arises in specific circumstances and can be eliminated using an argument to map. map sends out taskings in chunks but it sends out a new chunk of taskings any time a process finishes. So if one process takes a really long time going through a chunk other processes will do multiple chunks during that time. If however a particular chunk contains data elements that take a really long time to calculate and it happens to be at the end of the dataset and all sent to one chunk, you can have all the other processes finish while a single process goes through the last few data items. This is a concern for us because very large png files and very small ones will each be a single data element so we could end up with a single core going through a bunch of huge pngs while all the other cores sit idle. We can eliminate this concern for map by setting the chunksize, which is the number of data elements to be sent to a core at a time. If we set this chunksize to one ala pool.map(consume,dataset,1) map will only ever task out one data element at a time. This is not always a good idea as it means more communications between processes, but in a case like ours when a single data element can take a really long time to be processed the time taken to do more communications is less than could be wasted if we have poor parallelism. The second issue with map is one that we can't really get around in a nice way. map assumes that we already have our dataset in memory. For the purposes of our script we start off and we scan through to find files and add them to our dataset. It would be awfully nice if we could already be processing files as we identify them instead of having only a single core working until we've completed the entire scan stage. To do this we could make use of map_async. Like apply_async map async is identical to map except it does not wait until all processing is completed. We could make a loop that would identify a number of files and then send them to map_async over and over again (we could also have done this with apply_async) but we'd end up keeping track of an awful lot of result objects (assuming we care about the results) and the solution wouldn't be all that elegant. Luckily the multiprocessing module has a pool method that does exactly what we want it to.

imap is a version of map which is meant to take an iterable object rather than a static dataset. The simplest iterable object to create in python is called a simple generator which is a function that generates more than one value but only one at a time. We can pass a simple generator to map, but map will finish calculating all related values before it starts processing dataelements while imap will only calculate dataelements as they are required to feed worker processes. There is also a version of imap called imap_unordered which functions the same way but does not guarantee that the results returned will be in order. We really have no reason to care if the results are in order or not so we'll use imap_unordered to get any potential speedup it can provide over imap. First though we'll figure out how to create a simple generator. It is remarkably easy, we just have to use yield:

#!/usr/bin/env python

def generator():
for i in range(30):
print("generating")
yield i

def printSequence(seq):
for i in seq:
print("printing")
print i

printSequence(generator())



If you run the above code you'll see that "generating" is always printed just above "printing". This is because with a generator the calculations are "lazy" which means they only happen if they have to. If we put a test in the printSequence function that quit if i were equal to 15 then generator would never be run to calculate any numbers above 15. Generators can be acted on just like sequences but their data is available on demand instead of all up front. That is of course on demand as in when the data is asked for not necessarily when it is used for useful processing ergo why we choose imap instead of map. imap like map has a chunksize argument but the default chunksize is 1 as imap is built for circumstances like ours.

Our Plan


So our plan can be broken down into three steps:
  1. Write a simple generator that generates all filenames we want to process
  2. Write a function that runs optimization programs on a single filename passed as an argument
  3. Tie these two functions together to process png files in a parallel way
My choice of order for steps 1 and 2 is arbitrary of course but unlike my computer I can only think about one thing at a time.

Previous script:


This is the script as we left it after our refactoring and I will be referencing it below
#!/usr/bin/env python

import os,traceback,sys

def optimizeImages(directory):
"run optipng pngout on all png files located in a directory recursively"
path = os.path.normpath(directory)
if not os.path.isdir(path):
raise Error, "Directory %s not found" % path

#we can use a for loop to continually get more data from a walk object until there is none left
for currPath, currDirs, currFiles in os.walk(path):
for file in currFiles:
#no need for us to test and see if the file is a directory as fileGenerator is taking care of that for us. Also no need for recursion.
try:
if(file.split(".")[-1] == 'png'):
print "optipnging "+ currPath + '/'+ file
#we need to append the filename to the path as we haven't changed the current working directory this time
os.spawnlp(os.P_WAIT,'optipng','optipng','-o7','-q',currPath+'/'+file)
print "pngouting" + currPath + '/'+file
os.spawnlp(os.P_WAIT,'pngout','pngout','-q', currPath +'/'+ file)
except:
traceback.print_exc()

optimizeImages(os.getcwd())
sys.exit(0)

Step 1


It may not seem obvious but our generator is pretty much already made for us. If we look at the way we run optimizeImages we get data from the operating system and walk through it. When we hit a file we are interested in we run optimization programs on it. All we have to do is swap out the code that optimizes the files for code that will yield relevant information so it can be passed to a Pool method.

#!/usr/bin/env python

import os,traceback,sys

def generatePNGListing(directory):
"run optipng pngout on all png files located in a directory recursively"
path = os.path.normpath(directory)
if not os.path.isdir(path):
raise Error, "Directory %s not found" % path

#we can use a for loop to continually get more data from a walk object until there is none left
for currPath, currDirs, currFiles in os.walk(path):
for file in currFiles:
#no need for us to test and see if the file is a directory as fileGenerator is taking care of that for us. Also no need for recursion.
if(file.split(".")[-1].lower() == 'png'):
yield(currPath+"/"+file)

def printSeq(s):
for i in s:
print i

printSeq(generatePNGListing("/"))
sys.exit(0)


If you run the above you'll see that it immediately starts printing out the locations of every png file on your entire system (if you're on windows I believe you'll need to replace "/" with a valid windows path "c:\") and does not need to wait for generatePNGListing to finish before the printing begins. On an unrelated note I also added .lower to the filename test so that poorly named files like "something.PNG" will also be accepted.

Step 2


Just as easy we already have code that optimizes a single image we just need to extract it from our old script and make it work with the argument we generate above.
 
def optimizePNG(filenameAndPath):
try:
os.spawnlp(os.P_WAIT,'optipng','optipng','-o7','-q',filenameAndPath)
os.spawnlp(os.P_WAIT,'pngout','pngout','-q', filenameAndPath)
return (True,filenameAndPath,os.getpid())
except:
return (False,filenameAndPath,os.getpid(),traceback.format_exc())

Here I've just taken the code we already used and tweaked it a bit. We never really use the filename separate from the path so I combined them in the previous code and simply use this directly here. I've also kept our try except structure so that if everything goes fine we respond True and if not we return exception information to the main process. I return the process id but this is only to illustrate how well the parallelism worked. I also return the filenameAndPath this is because even though we sent this data to the process in the first place we are using imap_unordered and so we don't really have a way to link the results back to the specific file without adding this to the result.

Step 3


Step 3 while we don' t have any code to work with is quite simple. We have our generator and we have our consume function so we just need to tie them together using what we talked about earlier in this tutorial.

#!/usr/bin/env python

import os,traceback,sys
from multiprocessing import Pool

def optimizePNG(filenameAndPath):
try:
os.spawnlp(os.P_WAIT,'optipng','optipng','-o7','-q',filenameAndPath)
os.spawnlp(os.P_WAIT,'pngout','pngout','-q', filenameAndPath)
return (True,filenameAndPath,os.getpid())
except:
return (False,filenameAndPath,os.getpid(),traceback.format_exc())

def generatePNGListing(directory):
"run optipng pngout on all png files located in a directory recursively"
path = os.path.normpath(directory)
if not os.path.isdir(path):
raise Error, "Directory %s not found" % path

#we can use a for loop to continually get more data from a walk object until there is none left
for currPath, currDirs, currFiles in os.walk(path):
for file in currFiles:
#no need for us to test and see if the file is a directory as fileGenerator is taking care of that for us. Also no need for recursion.
if(file.split(".")[-1].lower() == 'png'):
yield(currPath+"/"+file)

def main(dir):
pool = Pool() #we'll use the default number of processes
results = pool.imap_unordered(optimizePNG,generatePNGListing(dir))
pool.close()
pool.join()
successes = []
failures = []
for result in results:
if result[0] == True:
successes.append(result)
else:
failures.append(result)
for success in successes:
print success[1] + " optimized by "+str(success[2])
for failure in failures:
print failure[1] + " failed due to exception "+failure[3]+ " in process "+str(failure[2])
if __name__ == "__main__":
main(os.getcwd())



Now our script starts up, creates a pool of workers, tasks them to optimize each png file in the current working directory where this script is run from and then prints out a list of all the successes and failures with associated data. I've also moved the script work into an if __name__ == "__main__" block which makes our module reusable. If you run the script the __main__ test will be True however if the script is merely imported the functions are available without having to run the script including the main function which would allow another application to run it on any arbitrary directory. I've tested this script and it works quite well.

Notes


  • There is a little bit of lost parallelism because the main process isn't scanning for new files while the workers work. I could fix this using map_async and a bunch of other code but I don't think it is worth it. The other performance characteristics I brought up to eventually settle on imap_unordered were related to running our optimization programs which could take a really really long time on certain large images (minutes potentially hours, not sure how big an image you might have) while the lost parallelism we have now is more on the millisecond scale per image.
  • This script will shrink pngs but this might slow down your computer if used inappropriately. Sure you can read a smaller png faster and one would expect decode it faster too. Shrinking existing files however will lead to a good deal of fragmentation on your drive and if you have an SSD like me wear out your drive much faster (if your OS doesn't support TRIM or your drive doesn't support it well) The intended use of this script would be for work just before deploying a package of some sort. So for example you could make a local copy of a website, shrink all the pngs and then upload that copy to a web server, or if you created an application you would shrink the app's png resources before packaging it up. Another use could be if you just have a huge library of png files and you need to make space but aren't concerned about performance.
  • This script obviously can be modified to do things other than optimize pngs very easily. If you have an application that works on a particular type of file you just change the test for files and the code which actually optimizes the files and it will work. This will be explored more in future tutorials.
  • You may have problems copying and pasting code examples into an interactive python shell but I've tested them all and they do work. Try pasting into a text editor saving the file and running it (any examples that use a Pool will need to be run this way as they don't work inside an interactive shell).
  • The final script is available on the downloads page.

Page 1 Page 2 Page 3

Discuss on Reddit