Skip to content

Documentation for Parallelize

andrzeng edited this page Sep 5, 2022 · 15 revisions

Frequently in our line of work, for loops take longer than needed to run because they do not take advantage of every processor core. For example, consider the following loop:

processed_data = []
for subject in census:
   subject_data = some_function(subject)
   processed_data.append(subject_data)

Snippets such as these pop up frequently in neurophysiological work, e.g. when running masking operations. If our subject count runs into the thousands, we would have to twiddle our thumbs and make small talk for a while. That is because Python does not automatically recruit multiple cores for you. When we have over 100+ cores at our disposal, we are not making the most out of our resources! This is where the Parallelize utility comes into play. Parallelize will enlist the help of every core available while reducing code clutter. With Parallelize, the above loop reduces to:

processed_data = Parallelize(census, some_function)

Significant time savings are achieved this way. Empirically, we have reduced the runtime for a loop from 7 minutes to less than 20 seconds. Let's dive into the details.

Below is the full implementation of the Parallelize utility:

import multiprocessing
from multiprocessing import Pool
import numpy as np

def __processor_procedure(processor_index, ranges, f):

    '''
    Reference the global variable defined in Parallelize(). This is a "hacky" trick to grab the input data without 
    directly passing a copy of it to each function, which would waste time and resources
    '''
    global __48162789682__
    data = __48162789682__
    
    #Create the dictionary of outputs
    __processed = {}
    
    #If one passes the "processor_index" variable to the dictionary "ranges" as a key, the value received is an array
    #of element indices that specify which elements in the data the function must process
    for index in ranges[processor_index]:
        #Process the data by running it through the f() function
        _out = f(data[index])
        __processed[index] = _out

    return __processed



def Parallelize(data, f, num_cores=None):
    
    
    #Get the number of "subjects" i.e. the length of the data
    num_subjects = len(data)
    if(num_cores == None): #If the number of cores in unspecified, get the maximum number of cores available
        num_cores = multiprocessing.cpu_count()
    
    #The number of cores we use cannot exceed the number of subjects there are
    num_cores = min(num_cores, num_subjects, multiprocessing.cpu_count())
    
    
    #Creating a dictionary that has the core indices as keys and ranges of data element indices as the values
    cores_to_use = num_cores - 1
    range_dict = {}
    for i in range(cores_to_use):
        range_dict[i] = np.arange(i*int(num_subjects/(cores_to_use)), (i+1)*int(num_subjects/(cores_to_use)))
    range_dict[cores_to_use] = np.arange(num_subjects - num_subjects%cores_to_use, num_subjects)
    
    #Checking that there is an entry of the dictionary for every core that we will use
    assert(num_cores == len(range_dict))
    
    #Creating a list of tuples. Each tuple will serve as the arguments passed to the __processor_procedure helper function
    pass_data = []
    
    for core_index in range(num_cores):
        pass_data.append((core_index, range_dict, f))
    
    #Create a global variable and copy the data to it. The variable name was chosen in a way such that it hopefully would not
    #overwrite a user-defined variable. By creating such a global variable, each of the helper functions can easily access the
    #data without having it explicitly passed to them, which would waste time and resources
    global __48162789682__
    __48162789682__ = data
    
    #Initialize a pool of workers. 
    with Pool(num_cores) as p:
        _returns = p.starmap(__processor_procedure, pass_data)
    
    #Delete the global variable that we defined
    del __48162789682__
    
    #Process the _returns dictionary into a NumPy array
    final_dict = {}
    for d in _returns:
        final_dict = {**final_dict, **d}
    
    return np.array(list(final_dict.values())).reshape(len(data),-1)
Clone this wiki locally