Skip to content

Documentation for Parallelize

andrzeng edited this page Sep 5, 2022 · 15 revisions

Frequently in our line of work, for loops take longer than needed to run because they do not take advantage of every processor core. For example, consider the following loop:

processed_data = []
for subject in census:
   subject_data = some_function(subject)
   processed_data.append(subject_data)

Snippets such as these pop up frequently in neurophysiological work, e.g. when running masking operations. If our subject count runs into the thousands, we would have to twiddle our thumbs and make small talk for a while. That is because Python does not automatically recruit multiple cores for you. When we have over 100+ cores at our disposal, we are not making the most out of our resources! This is where the Parallelize utility comes into play. Parallelize will enlist the help of every core available while reducing code clutter. With Parallelize, the above loop reduces to:

processed_data = Parallelize(census, some_function)

Significant time savings are achieved this way. Empirically, we have reduced the runtime for a loop from 7 minutes to less than 20 seconds. Let's dive into the details.

Below is the full implementation of the Parallelize utility:

import multiprocessing
from multiprocessing import Pool
import numpy as np

def __processor_procedure(processor_index, ranges, f):

    global __48162789682__
    data = __48162789682__
    
    __processed = {}
    
    for index in ranges[processor_index]:
        _out = f(data[index])
        __processed[index] = _out

    return __processed



def Parallelize(data, f, num_cores=None):

    num_subjects = len(data)
    if(num_cores == None): 
        num_cores = multiprocessing.cpu_count()
    
    num_cores = min(num_cores, num_subjects, multiprocessing.cpu_count())
    
    cores_to_use = num_cores - 1
    range_dict = {}
    for i in range(cores_to_use):
        range_dict[i] = np.arange(i*int(num_subjects/(cores_to_use)), (i+1)*int(num_subjects/(cores_to_use)))
    range_dict[cores_to_use] = np.arange(num_subjects - num_subjects%cores_to_use, num_subjects)
    
    assert(num_cores == len(range_dict))
    
    pass_data = []
    
    for core_index in range(num_cores):
        pass_data.append((core_index, range_dict, f))
    
    global __48162789682__
    __48162789682__ = data
    
    with Pool(num_cores) as p:
        _returns = p.starmap(__processor_procedure, pass_data)
    
    del __48162789682__
    
    final_dict = {}
    for d in _returns:
        final_dict = {**final_dict, **d}
    
    return np.array(list(final_dict.values())).reshape(len(data),-1)

Similar to eigengame, in Parallelize there is a main function Parallelize() and a helper function __processor_procedure() which the main function calls.

The main function Parallelize() takes three arguments, one of them optional:

data which is an array (NumPy or otherwise; it is accepted as long as it can be iterated over) containing a list of items that we want to process,

f which is a function that takes in a single item from data and returns a processed version of it.

num_cores which is an optional parameter specifying how many cores to use. If none is specified, or if it exceeds the number of existing cores, the function defaults to min{number of cores on the machine, length of data}

The first step is to determine how many cores to use if none was given:

num_subjects = len(data)
if(num_cores == None): #If the number of cores in unspecified, get the maximum number of cores available
num_cores = multiprocessing.cpu_count()
    
#The number of cores we use cannot exceed the number of subjects there are
num_cores = min(num_cores, num_subjects, multiprocessing.cpu_count())

Next is a more complex operation. We create a dictionary with processor core indices as keys, and arrays containing indices of items/subjects in the data parameter as values:

Key (core index) Value

0 [0,1,2,3,4]
1 [5,6,7,8,9]
...
99 [495,496,497,498,499]

The idea is to partition the indices of the data array into equal sub-arrays and assign a sub-array to each processor. Each processor will now be responsible for the items corresponding to the indices of its sub-array.

The code is shown below:

cores_to_use = num_cores - 1
range_dict = {}
for i in range(cores_to_use):
    range_dict[i] = np.arange(i*int(num_subjects/(cores_to_use)), (i+1)*int(num_subjects/(cores_to_use)))
range_dict[cores_to_use] = np.arange(num_subjects - num_subjects%cores_to_use, num_subjects)

Next, we create an array called pass_data whose purpose is to hold the auxiliary data required by the helper function.

pass_data = []
    
for core_index in range(num_cores):
    pass_data.append((core_index, range_dict, f))

Next comes a trick. One problem that came up during testing was that the data array would often be too large to be copied efficiently in pass_data. There was a workaround however. If we copied data into a global variable, it no longer needed to be passed, since global variables are accessible everywhere. Thus, I created a global variable with a random name, in the hope of it not interfering with any existing global variable.

global __48162789682__
__48162789682__ = data

Once that is done, we send each of our processor cores the data and our helper function which processes it.

#Initialize a pool of workers. 
with Pool(num_cores) as p:
   _returns = p.starmap(__processor_procedure, pass_data)

Let's take a look at the helper function.

def __processor_procedure(processor_index, ranges, f):
  
    '''
    Reference the global variable defined in Parallelize(). This is a "hacky" trick to grab the input data without 
    directly passing a copy of it to each function, which would waste time and resources
    '''
    global __48162789682__
    data = __48162789682__
    
    #Create the dictionary of outputs
    __processed = {}
    
    #If one passes the "processor_index" variable to the dictionary "ranges" as a key, the value received is an array
    #of element indices that specify which elements in the data the function must process
    for index in ranges[processor_index]:
        #Process the data by running it through the f() function
        _out = f(data[index])
        __processed[index] = _out

    return __processed

Once each processor finishes its batch of items/subjects, the returns of every processor are collected and stored in the __returns array. Now, all we have to do is manipulate this array into a user-friendlier form:

Once that is done, Parallelize() concludes.

#Process the _returns dictionary into a NumPy array
final_dict = {}
for d in _returns:
   final_dict = {**final_dict, **d}
    
return np.array(list(final_dict.values())).reshape(len(data),-1)
Clone this wiki locally