Parallel computing using Python

This is a simple tutorial of how to utilize Python’s multiprocessing library to perform parallel tasks (using multiple CPUs).

#import the libraries
import pandas as pd
import multiprocessing

The example dataset is samplesTests.csv, which contains the testing timestamps and sample IDs, and it has 886413 tests in total.

dfTest = pd.read_csv("./samplesTests.csv")
dfTest.head(10)
TestTime SampleID
0 8/10/2004 1:15 S00802
1 8/10/2004 1:15 S00802
2 8/10/2004 1:16 S00802
3 8/10/2004 1:24 S00802
4 8/10/2004 1:24 S00802
5 8/10/2004 0:46 S96030
6 8/10/2004 0:46 S96030
7 8/10/2004 0:46 S96030
8 8/10/2004 0:46 S96030
9 8/10/2004 4:41 S00619
dfTest.shape
(886413, 2)

What I want to achieve here is to count how many tests have been performed for each sample, and I like to distribute this work into multiple tasks based on sample IDs. So the first thing is to get the list of sample IDs.

samples = dfTest['SampleID'].unique()
samples
array(['S00802', 'S96030', 'S00619', ..., 'S00267', 'S00014', 'S01194'], dtype=object)

Then define a function test_counter(), which is the job that each thread of the parallel tasks will be performing. It counts the size of dataframe that grouped by the sampleID, and stores that count into a dictionary.

#initiate a test_counts dictionary
test_counts = {}
def test_counter(sample):
    #Subset a dataframe for selected sampleID
    dfTest_ = dfTest[dfTest['sampleID'] == sample]
    #get the rows of the selected subset of the dataframe
    test_count = dfTest_.shape[0]
    #add the count into the test_counts dictionary
    test_counts[sample] = test_count
    return test_counts

Now, the parallel computing needs another function, which serves as multiple task handler. It assigns the number of threads, and arranges the parallel tasks. map function takes two parameters:
- a function: the task
- a list: list of inputs for the task function.
For example:

#initiate an instance of Pool
p = multiprocessing.Pool()
#run the parallel tasks
result_L = p.map(test_counter, samples)

There is a tiny side effect of this particular parallel computing. result_L ends as a list of dictionaries, which makes sense… Pool will collect all parallel tasks’ results (dictionaries) into a list. So, I have to merge the list of dictionaries into a large dictionary.

#merge all the dictionaries in the list into a large dictionary
for d in result_L:
    results.update(d)

So put it all together, following is the full code. python test_counts.py

import multiprocessing
import pandas as pd

# read in the data
dfTest = pd.read_csv("./samplesTests.csv")
test_counts = {}

#get the list of all sampleIDs
samples = dfTest['SampleID'].unique()

#initiate a test_counts dictionary
test_counts = {}
def test_counter(sample):
    #Subset a dataframe for selected sampleID
    dfTest_ = dfTest[dfTest['sampleID'] == sample]
    #get the rows of the selected subset of the dataframe
    test_count = dfTest_.shape[0]
    #add the count into the test_counts dictionary
    test_counts[sample] = test_count
    return test_counts

def mp_handler():
    results = {}
    #initiate an instance of Pool
    p = multiprocessing.Pool()
    #run the parallel tasks
    result_L = p.map(test_counter, samples)
    
    #merge all the dictionaries in the list into a large dictionary
    for d in result_L:
        results.update(d)

    for k, v in results.iteritems():
        print k, v

if __name__ == '__main__':
    mp_handler()

Note Functionality within the package of multiprocessing requires that the __main__ module be importable by the children 1. This means that some examples, such as the Pool examples do not work in the interactive interpreter (so does jupyter notebook). For example:

>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
...     return x*x
...
>>> p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'

For further reading, you can go to here 2

reference

  1. multiprocessing documentation
  2. Communication Between Processes